// // Copyright (c) 2025 murilo ijanc' // // Permission to use, copy, modify, and distribute this software for any // purpose with or without fee is hereby granted, provided that the above // copyright notice and this permission notice appear in all copies. // // THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES // WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF // MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR // ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES // WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN // ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF // OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. // use std::collections::HashMap; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; use aws_sdk_cognitoidentityprovider::Client as CognitoClient; use aws_sdk_cognitoidentityprovider::types::UserType; use clap::{ArgAction, Parser, Subcommand}; use indicatif::{ProgressBar, ProgressStyle}; use tokio::fs::File; use tokio::io::{self, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader}; use tokio::sync::Semaphore; use tokio::task::JoinHandle; use tracing::{debug, error, info}; use tracing_subscriber::EnvFilter; const VERSION: &str = concat!( env!("CARGO_PKG_VERSION"), " (", env!("GIT_HASH", "unknown"), " ", env!("BUILD_DATE", "unknown"), ")", ); /// Batch operations for AWS Cognito user pools. #[derive(Debug, Parser)] #[command( name = env!("CARGO_PKG_NAME"), about = "Batch operations for AWS Cognito user pools", version = VERSION, author, propagate_version = true )] struct Cli { /// Increase verbosity (use -v, -vv, ...). /// /// When no RUST_LOG is set, a single -v switches the log level to DEBUG. #[arg(short, long, global = true, action = ArgAction::Count)] verbose: u8, #[command(subcommand)] command: Commands, } /// Available batch operations. /// /// These map directly to high-level Cognito workflows: /// - sync: synchronize users from a source into Cognito. /// - add: add users to one or more Cognito groups. /// - del: remove users from one or more Cognito groups. #[derive(Debug, Subcommand)] enum Commands { /// Synchronize users with a Cognito user pool. Sync(SyncArgs), /// Add users to one or more Cognito groups. Add(AddArgs), /// Remove users from one or more Cognito groups. Del(GroupOperationArgs), } /// Arguments for the `add` operation. /// /// High-level flow: /// - `sync_file` is the CSV produced by `sync` (username,email). /// - `emails_file` is a plain text file with one e-mail per line, /// representing the users that should be added to the given groups. /// - `groups` is the list of Cognito group names. /// - `pool_id` is the Cognito User Pool ID. /// - `concurrency` defines how many users are processed in parallel. #[derive(Debug, Parser)] pub struct AddArgs { /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX). #[arg(long = "pool-id")] pub pool_id: String, /// CSV file generated by the `sync` command (username,email). /// /// This file is loaded into a HashMap and used /// as the source of truth for resolving usernames from e-mail. #[arg( long = "sync-file", value_name = "CSV_PATH", help = "CSV file produced by `sync` containing username,email columns" )] pub sync_file: PathBuf, /// Text file containing one e-mail per line. /// /// Each e-mail will be normalized (trim + lowercase) and then /// looked up in the sync CSV map to obtain the corresponding username. #[arg( long = "emails-file", value_name = "TXT_PATH", help = "Plain text file with one e-mail per line to be added to the groups" )] pub emails_file: PathBuf, /// One or more Cognito group names. /// /// All resolved users will be added to every group listed here. #[arg( long = "group", alias = "groups", value_name = "GROUP", num_args = 1.., required = true )] pub groups: Vec, /// Maximum number of users to process concurrently. /// /// This controls how many users are handled in parallel when calling /// the AdminAddUserToGroup API. #[arg(long = "concurrency", value_name = "N", default_value_t = 4)] pub concurrency: usize, /// Global timeout for the operation, in seconds. /// /// When set, the entire add operation is bounded by this timeout. #[arg(long = "timeout", value_name = "SECONDS")] pub timeout: Option, } /// Common arguments shared by group-based operations. #[derive(clap::Args, Debug, Clone)] pub struct CommonOperationArgs { /// Cognito User Pool ID to operate on. #[arg(long = "pool-id", env = "COGNITO_USER_POOL_ID")] pub pool_id: String, /// File path used by the operation. /// For `sync`, this is the output file where usernames and emails are stored as CSV. #[arg( short = 'f', long = "file", value_name = "PATH", help = "File path. For `sync`, this is the output CSV file." )] pub emails_file: Option, /// Maximum duration (in seconds) allowed for the operation. #[arg(long = "timeout", value_name = "SECONDS")] pub timeout: Option, /// Concurrency level for operations that need it. #[arg(long = "concurrency", value_name = "N", default_value_t = 1)] pub concurrency: usize, } /// Arguments for the `sync` operation. #[derive(Debug, Parser)] struct SyncArgs { /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX). #[arg(long = "pool-id")] pool_id: String, /// Optional file containing user e-mails, one per line. /// /// Depending on the design, this can represent the source of truth /// to be synchronized with the Cognito user pool. #[arg(long = "emails-file")] emails_file: Option, /// Optional list of Cognito group names used during synchronization. /// /// These can be used to ensure users are added/removed from specific /// groups during the sync process. #[arg(long = "group", alias = "groups")] groups: Vec, /// Maximum number of concurrent operations. #[arg(long)] concurrency: Option, /// Global timeout for the sync operation, in seconds. #[arg(long)] timeout: Option, } /// Arguments shared by `add` and `del` group operations. #[derive(Debug, Parser)] struct GroupOperationArgs { /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX). #[arg(long = "pool-id")] pool_id: String, /// One or more Cognito group names. /// /// All users found in the input file will be added to or removed from /// these groups, depending on the chosen subcommand. #[arg(long = "group", alias = "groups")] groups: Vec, /// File containing user e-mails, one per line. /// /// Every e-mail read from this file will be processed for the /// selected group operation. #[arg(long = "emails-file")] emails_file: PathBuf, /// Maximum number of concurrent operations. #[arg(long)] concurrency: Option, /// Global timeout for the operation, in seconds. #[arg(long)] timeout: Option, } #[tokio::main] async fn main() -> Result<()> { let cli = Cli::parse(); init_tracing(cli.verbose); debug!("parsed CLI arguments: {cli:?}"); match cli.command { Commands::Sync(args) => { let common = CommonOperationArgs { pool_id: args.pool_id, emails_file: args.emails_file, concurrency: 1, //args.concurrency, timeout: args.timeout, }; run_sync(&common).await?; } Commands::Add(args) => { run_add_groups(args).await?; } // Commands::Del(args) => { // let common = CommonOperationArgs { // pool_id: args.pool_id, // groups: args.groups, // emails_file: Some(args.emails_file), // concurrency: args.concurrency, // timeout: args.timeout, // }; // run_remove_groups(common).await?; // } _ => unimplemented!(), } Ok(()) } /// Initialize tracing based on RUST_LOG and the CLI verbosity. /// /// Rules: /// - If RUST_LOG is set, it is fully respected. /// - If RUST_LOG is not set and verbose == 0 -> INFO level. /// - If RUST_LOG is not set and verbose > 0 -> DEBUG level. fn init_tracing(verbose: u8) { if std::env::var_os("RUST_LOG").is_some() { tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .init(); return; } let filter = if verbose > 0 { EnvFilter::new("debug") } else { EnvFilter::new("info") }; tracing_subscriber::fmt().with_env_filter(filter).init(); } /// Synchronize Cognito users from a given user pool into a local CSV file. /// /// The CSV format is: /// ```text /// username,email /// user1@example.com,user1@example.com /// user2@example.com,user2@example.com /// ... /// ``` /// /// Source of truth is Cognito: this command dumps all users from the pool. /// /// Behavior: /// - Paginates over all Cognito users in the pool. /// - Extracts the `username` field and the `email` attribute (if present). /// - Writes the data as `username,email` to the given output file or stdout. /// - Respects the optional `timeout` passed in `CommonOperationArgs`. pub async fn run_sync(args: &CommonOperationArgs) -> Result<()> { info!( pool_id = %args.pool_id, "Starting users sync from Cognito user pool" ); let config = aws_config::load_from_env().await; // .context("failed to load AWS configuration")?; let client = CognitoClient::new(&config); let timeout = args.timeout.map(Duration::from_secs); let sync_future = sync_users_to_csv(&client, args); if let Some(duration) = timeout { match tokio::time::timeout(duration, sync_future).await { Ok(result) => { result?; } Err(_) => { return Err(anyhow::anyhow!( "sync operation timed out after {:?}", duration )); } } } else { sync_future.await?; } info!("Users sync completed successfully"); Ok(()) } async fn run_add_groups(args: AddArgs) -> Result<()> { // info!( // pool_id = %args.pool_id, // emails_file = ?args.emails_file, // concurrency = ?args.concurrency, // timeout = ?args.timeout, // "add groups operation requested (not implemented yet)" // ); let config = aws_config::load_from_env().await; let client = CognitoClient::new(&config); add_users_to_groups_from_files( &client, &args.pool_id, &args.sync_file, &args.emails_file, &args.groups, args.concurrency, ) .await?; Ok(()) } #[allow(dead_code)] async fn run_remove_groups(args: CommonOperationArgs) -> Result<()> { info!( pool_id = %args.pool_id, emails_file = ?args.emails_file, concurrency = ?args.concurrency, timeout = ?args.timeout, "remove groups operation requested (not implemented yet)" ); if let Some(seconds) = args.timeout { let _timeout = Duration::from_secs(seconds); debug!(?seconds, "remove operation timeout configured"); } // TODO: implement remove-from-groups logic. Ok(()) } /// Fetch all users from Cognito and write `username,email` to a CSV destination. /// /// If `args.emails_file` is set, the CSV is written to that file. /// Otherwise, the CSV is written to stdout. pub(crate) async fn sync_users_to_csv( client: &CognitoClient, args: &CommonOperationArgs, ) -> Result<()> { let mut writer: Box = if let Some(path) = &args.emails_file { let file = File::create(path).await.with_context(|| { format!("failed to create output file at '{}'", path.display()) })?; Box::new(file) } else { Box::new(io::stdout()) }; // CSV header writer .write_all(b"username,email\n") .await .context("failed to write CSV header")?; let mut total_users = 0usize; let mut pagination_token: Option = None; loop { let mut request = client .list_users() .user_pool_id(&args.pool_id) // 60 is the documented default max page size for Cognito ListUsers. .limit(60); if let Some(ref token) = pagination_token { request = request.pagination_token(token); } let response = request .send() .await .context("failed to call Cognito ListUsers")?; for user in response.users() { let (username, email) = extract_username_and_email(user); // If you prefer to skip users without email, you can check `email.is_empty()`. let line = format!("{username},{email}\n"); writer .write_all(line.as_bytes()) .await .context("failed to write CSV row")?; total_users += 1; } pagination_token = response.pagination_token().map(|token| token.to_owned()); if pagination_token.is_none() { break; } } writer.flush().await.context("failed to flush writer")?; info!(total_users, "Finished exporting Cognito users to CSV"); Ok(()) } /// Extract the `username` and `email` attribute from a Cognito `UserType`. fn extract_username_and_email(user: &UserType) -> (String, String) { let username = user.username().unwrap_or_default().to_string(); let email = user .attributes() .iter() .find(|attr| attr.name() == "email") .and_then(|attr| attr.value()) .unwrap_or_default() .to_string(); (username, email) } /// Read a Cognito sync CSV file ("username,email") and return a /// HashMap. /// /// Expected CSV format: /// ```text /// username,email /// johndoe,john@example.com /// alice,alice@example.com /// ... /// ``` /// /// Notes: /// - The first line is treated as a header and skipped. /// - Lines missing either username or email are ignored. /// - Email is lowercased and trimmed to allow normalized lookups. pub async fn read_sync_file_to_map>( path: P, ) -> Result> { let file = File::open(&path).await.with_context(|| { format!("failed to open sync file '{}'", path.as_ref().display()) })?; let reader = BufReader::new(file); let mut lines = reader.lines(); let mut map = HashMap::::new(); // Skip header: "username,email" if let Some(line) = lines.next_line().await? { let _header = line.trim(); // We can ignore header validation for now. } while let Some(line) = lines.next_line().await? { let trimmed = line.trim(); if trimmed.is_empty() { continue; } let parts: Vec<&str> = trimmed.split(',').collect(); if parts.len() < 2 { // malformed line, skip continue; } let username = parts[0].trim(); let email = parts[1].trim().to_lowercase(); if !email.is_empty() && !username.is_empty() { map.insert(email, username.to_string()); } } Ok(map) } /// Load a plain-text file containing one e-mail per line. /// /// Each line is trimmed and lowercased. Empty lines are skipped. pub async fn load_email_list>(path: P) -> Result> { let file = File::open(&path).await.with_context(|| { format!( "failed to open e-mail list file '{}'", path.as_ref().display() ) })?; let reader = BufReader::new(file); let mut lines = reader.lines(); let mut emails = Vec::new(); while let Some(line) = lines.next_line().await? { let email = line.trim().to_lowercase(); if email.is_empty() { continue; } emails.push(email); } Ok(emails) } /// Add a Cognito user to one or more groups using the Admin API. /// /// This function assumes AWS credentials and permissions allow admin operations. pub async fn admin_add_user_to_groups( client: &CognitoClient, pool_id: &str, username: &str, groups: &[String], ) -> Result<()> { for group in groups { info!(%username, %group, %pool_id, "adding user to Cognito group"); client .admin_add_user_to_group() .user_pool_id(pool_id) .username(username) .group_name(group) .send() .await .with_context(|| { format!( "failed to add user '{}' to group '{}'", username, group ) })?; info!(%username, %group, "user successfully added to group"); } Ok(()) } /// High-level flow: /// 1. Load sync CSV into HashMap. /// 2. Load target e-mails (one per line). /// 3. For each email, find username in HashMap. /// 4. Call `admin_add_user_to_groups` with concurrency limit. /// 5. Log success or failure for each email. /// /// `sync_csv_path`: CSV generated by `sync` (username,email). /// `emails_list_path`: TXT file with one e-mail per line (users to be added). pub async fn add_users_to_groups_from_files( client: &CognitoClient, pool_id: &str, sync_csv_path: &Path, emails_list_path: &Path, groups: &[String], concurrency: usize, ) -> Result<()> { let concurrency = std::cmp::max(1, concurrency); // 1. Load sync CSV into map: email -> username let email_to_username = read_sync_file_to_map(sync_csv_path).await?; info!( total_entries = email_to_username.len(), "loaded sync map (email -> username)" ); // 2. Load target e-mails let emails = load_email_list(emails_list_path).await?; let total_emails = emails.len(); info!(total_emails, "loaded target e-mail list"); let pb = Arc::new({ let bar = ProgressBar::new(total_emails as u64); bar.set_style( ProgressStyle::with_template( "[{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}", ) .unwrap_or_else(|_| ProgressStyle::default_bar()), ); bar.set_message("processing users"); bar }); let semaphore = Arc::new(Semaphore::new(concurrency)); let mut handles: Vec> = Vec::with_capacity(emails.len()); for email in emails { let username = match email_to_username.get(&email) { Some(u) => u.clone(), None => { error!(%email, "e-mail not found in sync map, skipping"); continue; } }; let permit = semaphore.clone().acquire_owned().await?; let client_clone = client.clone(); let pool_id = pool_id.to_string(); let groups = groups.to_vec(); let email_clone = email.clone(); let pb_clone = pb.clone(); let handle = tokio::spawn(async move { let _permit = permit; // keep permit alive for the duration of this task if let Err(err) = admin_add_user_to_groups( &client_clone, &pool_id, &username, &groups, ) .await { error!( email = %email_clone, %username, error = ?err, "failed to add user to one or more groups" ); } else { info!( email = %email_clone, %username, groups = ?groups, "user successfully processed for all groups" ); } pb_clone.inc(1); }); handles.push(handle); } // Wait for all tasks to complete for handle in handles { // Ignore panics here, just surface as error logs. if let Err(join_err) = handle.await { error!(error = ?join_err, "join error while processing a user"); } } pb.finish_with_message("done"); info!("finished processing all users for add-groups operation"); Ok(()) }