#![allow(unused)] // // Copyright (c) 2025 murilo ijanc' // // Permission to use, copy, modify, and distribute this software for any // purpose with or without fee is hereby granted, provided that the above // copyright notice and this permission notice appear in all copies. // // THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES // WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF // MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR // ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES // WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN // ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF // OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. // use std::collections::HashMap; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; use aws_sdk_cognitoidentityprovider::Client as CognitoClient; use aws_sdk_cognitoidentityprovider::types::UserType; use clap::{ArgAction, Parser, Subcommand}; use indicatif::{ProgressBar, ProgressStyle}; use tokio::fs::File; use tokio::io::{self, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader}; use tokio::sync::Semaphore; use tokio::task::JoinHandle; use tracing::{debug, error, info}; use tracing_subscriber::EnvFilter; const VERSION: &str = concat!( env!("CARGO_PKG_VERSION"), " (", env!("GIT_HASH", "unknown"), " ", env!("BUILD_DATE", "unknown"), ")", ); /// Batch operations for AWS Cognito user pools. #[derive(Debug, Parser)] #[command( name = env!("CARGO_PKG_NAME"), about = "Batch operations for AWS Cognito user pools", version = VERSION, author, )] struct Cli { /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX). #[arg(long = "pool-id")] pub pool_id: String, /// Maximum number of users to process concurrently. /// /// This controls how many users are handled in parallel when calling /// the AdminAddUserToGroup API. #[arg(long = "concurrency", value_name = "N", default_value_t = 4)] pub concurrency: usize, /// Global timeout for the sync operation, in seconds. #[arg(long)] timeout: Option, /// Increase verbosity (use -v, -vv, ...). /// /// When no RUST_LOG is set, a single -v switches the log level to DEBUG. #[arg(short, long, action = ArgAction::Count)] verbose: u8, #[command(subcommand)] command: Commands, } /// Available batch operations. /// /// These map directly to high-level Cognito workflows: /// - sync: synchronize users from a source into Cognito. /// - add: add users to one or more Cognito groups. /// - del: remove users from one or more Cognito groups. #[derive(Debug, Subcommand)] enum Commands { /// Synchronize users with a Cognito user pool. Sync(SyncArgs), /// Add users to one or more Cognito groups. Add(AddArgs), // /// Remove users from one or more Cognito groups. // Del(GroupOperationArgs), } /// Arguments for the `add` operation. /// /// High-level flow: /// - `sync_file` is the CSV produced by `sync` (username,email). /// - `emails_file` is a plain text file with one e-mail per line, /// representing the users that should be added to the given groups. /// - `groups` is the list of Cognito group names. /// - `pool_id` is the Cognito User Pool ID. /// - `concurrency` defines how many users are processed in parallel. #[derive(Debug, Parser)] pub struct AddArgs { /// CSV file generated by the `sync` command (username,email). /// /// This file is loaded into a HashMap and used /// as the source of truth for resolving usernames from e-mail. #[arg( long = "sync-file", value_name = "CSV_PATH", help = "CSV file produced by `sync` containing username,email columns" )] pub sync_file: PathBuf, /// Text file containing one e-mail per line. /// /// Each e-mail will be normalized (trim + lowercase) and then /// looked up in the sync CSV map to obtain the corresponding username. #[arg( long = "emails-file", value_name = "TXT_PATH", help = "Plain text file with one e-mail per line to be added to the groups" )] pub emails_file: PathBuf, /// One or more Cognito group names. /// /// All resolved users will be added to every group listed here. #[arg( long = "group", alias = "groups", value_name = "GROUP", num_args = 1.., required = true )] pub groups: Vec, } /// Arguments for the `sync` operation. #[derive(Debug, Parser)] struct SyncArgs { /// CSV file generated by the `sync` command (username,email). /// /// This file is loaded into a HashMap and used /// as the source of truth for resolving usernames from e-mail. #[arg( long = "sync-file", value_name = "CSV_PATH", help = "CSV file produced by `sync` containing username,email columns" )] sync_file: PathBuf, } /// Arguments shared by `add` and `del` group operations. #[derive(Debug, Parser)] struct GroupOperationArgs { /// One or more Cognito group names. /// /// All users found in the input file will be added to or removed from /// these groups, depending on the chosen subcommand. #[arg(long = "group", alias = "groups")] groups: Vec, /// File containing user e-mails, one per line. /// /// Every e-mail read from this file will be processed for the /// selected group operation. #[arg(long = "emails-file")] emails_file: PathBuf, } pub struct CommonOperationArgs { pub pool_id: String, pub concurrency: usize, pub timeout: Option, } #[tokio::main] async fn main() -> Result<()> { let cli = Cli::parse(); init_tracing(cli.verbose); debug!("parsed CLI arguments: {cli:?}"); let common_args = CommonOperationArgs { pool_id: cli.pool_id, concurrency: cli.concurrency, timeout: cli.timeout, }; match cli.command { Commands::Sync(args) => { run_sync(&common_args, &args).await?; } Commands::Add(args) => { // run_add_groups(args).await?; println!("add"); } // Commands::Del(args) => { // let common = CommonOperationArgs { // pool_id: args.pool_id, // groups: args.groups, // emails_file: Some(args.emails_file), // concurrency: args.concurrency, // timeout: args.timeout, // }; // run_remove_groups(common).await?; // } } Ok(()) } /// Initialize tracing based on RUST_LOG and the CLI verbosity. /// /// Rules: /// - If RUST_LOG is set, it is fully respected. /// - If RUST_LOG is not set and verbose == 0 -> INFO level. /// - If RUST_LOG is not set and verbose > 0 -> DEBUG level. fn init_tracing(verbose: u8) { if std::env::var_os("RUST_LOG").is_some() { tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .init(); return; } let filter = if verbose > 0 { EnvFilter::new("debug") } else { EnvFilter::new("info") }; tracing_subscriber::fmt().with_env_filter(filter).init(); } /// Synchronize Cognito users from a given user pool into a local CSV file. /// /// The CSV format is: /// ```text /// username,email /// Google_123,user1@example.com /// Google_456,user2@example.com /// ... /// ``` /// /// Source of truth is Cognito: this command dumps all users from the pool. /// /// Behavior: /// /// - Paginates over all Cognito users in the pool. /// - Extracts the `username` field and the `email` attribute (if present). /// - Writes the data as `username,email` to the given output file or stdout. /// - Respects the optional `timeout` passed in `CommonOperationArgs`. async fn run_sync( common_args: &CommonOperationArgs, cmd_args: &SyncArgs, ) -> Result<()> { info!( pool_id = %common_args.pool_id, "Starting users sync from Cognito user pool" ); let config = aws_config::load_from_env().await; let client = CognitoClient::new(&config); let timeout = common_args.timeout.map(Duration::from_secs); let sync_future = sync_users_to_csv(&client, common_args, cmd_args); if let Some(duration) = timeout { match tokio::time::timeout(duration, sync_future).await { Ok(result) => { result?; } Err(_) => { return Err(anyhow::anyhow!( "sync operation timed out after {:?}", duration )); } } } else { sync_future.await?; } info!("Users sync completed successfully"); Ok(()) } // async fn run_add_groups(args: AddArgs) -> Result<()> { // // info!( // // pool_id = %args.pool_id, // // emails_file = ?args.emails_file, // // concurrency = ?args.concurrency, // // timeout = ?args.timeout, // // "add groups operation requested (not implemented yet)" // // ); // let config = aws_config::load_from_env().await; // let client = CognitoClient::new(&config); // add_users_to_groups_from_files( // &args.pool_id, // &args.sync_file, // &args.emails_file, // &args.groups, // args.concurrency, // ) // .await?; // Ok(()) // } // #[allow(dead_code)] // async fn run_remove_groups(args: CommonOperationArgs) -> Result<()> { // info!( // pool_id = %args.pool_id, // emails_file = ?args.emails_file, // concurrency = ?args.concurrency, // timeout = ?args.timeout, // "remove groups operation requested (not implemented yet)" // ); // if let Some(seconds) = args.timeout { // let _timeout = Duration::from_secs(seconds); // debug!(?seconds, "remove operation timeout configured"); // } // // TODO: implement remove-from-groups logic. // Ok(()) // } /// Fetch all users from Cognito and write `username,email` to a CSV destination. /// /// If `args.emails_file` is set, the CSV is written to that file. /// Otherwise, the CSV is written to stdout. pub(crate) async fn sync_users_to_csv( client: &CognitoClient, common_args: &CommonOperationArgs, cmd_args: &SyncArgs, ) -> Result<()> { let mut writer = File::create(&cmd_args.sync_file).await.with_context(|| { format!( "failed to create output file at '{}'", &cmd_args.sync_file.display() ) })?; // let mut writer: Box = // if let Some(path) = &cmd_args.sync_file { // let file = File::create(path).await.with_context(|| { // format!("failed to create output file at '{}'", path.display()) // })?; // Box::new(file) // } else { // Box::new(io::stdout()) // }; // CSV header writer .write_all(b"username,email\n") .await .context("failed to write CSV header")?; let mut total_users = 0usize; let mut pagination_token: Option = None; loop { let mut request = client .list_users() .user_pool_id(&common_args.pool_id) // https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_ListUsers.html#CognitoUserPools-ListUsers-request-Limit .limit(60); if let Some(ref token) = pagination_token { request = request.pagination_token(token); } let response = request .send() .await .context("failed to call Cognito ListUsers")?; for user in response.users() { let (username, email) = extract_username_and_email(user); let line = format!("{username},{email}\n"); writer .write_all(line.as_bytes()) .await .context("failed to write CSV row")?; total_users += 1; } pagination_token = response.pagination_token().map(|token| token.to_owned()); if pagination_token.is_none() { break; } } writer.flush().await.context("failed to flush writer")?; info!(total_users, "Finished exporting Cognito users to CSV"); Ok(()) } /// Extract the `username` and `email` attribute from a Cognito `UserType`. fn extract_username_and_email(user: &UserType) -> (String, String) { let username = user.username().unwrap_or_default().to_string(); // Here I'm assuming that the user doesn't have an email attribute, so the // username is the email itself. let email = user .attributes() .iter() .find(|attr| attr.name() == "email") .and_then(|attr| attr.value()) .unwrap_or(&username) .to_string(); (username, email) } /// Read a Cognito sync CSV file ("username,email") and return a /// HashMap. /// /// Expected CSV format: /// ```text /// username,email /// johndoe,john@example.com /// alice,alice@example.com /// ... /// ``` /// /// Notes: /// - The first line is treated as a header and skipped. /// - Lines missing either username or email are ignored. /// - Email is lowercased and trimmed to allow normalized lookups. pub async fn read_sync_file_to_map>( path: P, ) -> Result> { let file = File::open(&path).await.with_context(|| { format!("failed to open sync file '{}'", path.as_ref().display()) })?; let reader = BufReader::new(file); let mut lines = reader.lines(); let mut map = HashMap::::new(); // Skip header: "username,email" if let Some(line) = lines.next_line().await? { let _header = line.trim(); // We can ignore header validation for now. } while let Some(line) = lines.next_line().await? { let trimmed = line.trim(); if trimmed.is_empty() { continue; } let parts: Vec<&str> = trimmed.split(',').collect(); if parts.len() < 2 { // malformed line, skip continue; } let username = parts[0].trim(); let email = parts[1].trim().to_lowercase(); if !email.is_empty() && !username.is_empty() { map.insert(email, username.to_string()); } } Ok(map) } /// Load a plain-text file containing one e-mail per line. /// /// Each line is trimmed and lowercased. Empty lines are skipped. pub async fn load_email_list>(path: P) -> Result> { let file = File::open(&path).await.with_context(|| { format!( "failed to open e-mail list file '{}'", path.as_ref().display() ) })?; let reader = BufReader::new(file); let mut lines = reader.lines(); let mut emails = Vec::new(); while let Some(line) = lines.next_line().await? { let email = line.trim().to_lowercase(); if email.is_empty() { continue; } emails.push(email); } Ok(emails) } // /// Add a Cognito user to one or more groups using the Admin API. // /// // /// This function assumes AWS credentials and permissions allow admin operations. // pub async fn admin_add_user_to_groups( // client: &CognitoClient, // pool_id: &str, // username: &str, // groups: &[String], // ) -> Result<()> { // for group in groups { // info!(%username, %group, %pool_id, "adding user to Cognito group"); // client // .admin_add_user_to_group() // .user_pool_id(pool_id) // .username(username) // .group_name(group) // .send() // .await // .with_context(|| { // format!( // "failed to add user '{}' to group '{}'", // username, group // ) // })?; // info!(%username, %group, "user successfully added to group"); // } // Ok(()) // } // /// High-level flow: // /// 1. Load sync CSV into HashMap. // /// 2. Load target e-mails (one per line). // /// 3. For each email, find username in HashMap. // /// 4. Call `admin_add_user_to_groups` with concurrency limit. // /// 5. Log success or failure for each email. // /// // /// `sync_csv_path`: CSV generated by `sync` (username,email). // /// `emails_list_path`: TXT file with one e-mail per line (users to be added). // pub async fn add_users_to_groups_from_files( // client: &CognitoClient, // pool_id: &str, // sync_csv_path: &Path, // emails_list_path: &Path, // groups: &[String], // concurrency: usize, // ) -> Result<()> { // let concurrency = std::cmp::max(1, concurrency); // // 1. Load sync CSV into map: email -> username // let email_to_username = read_sync_file_to_map(sync_csv_path).await?; // info!( // total_entries = email_to_username.len(), // "loaded sync map (email -> username)" // ); // // 2. Load target e-mails // let emails = load_email_list(emails_list_path).await?; // let total_emails = emails.len(); // info!(total_emails, "loaded target e-mail list"); // let pb = Arc::new({ // let bar = ProgressBar::new(total_emails as u64); // bar.set_style( // ProgressStyle::with_template( // "[{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}", // ) // .unwrap_or_else(|_| ProgressStyle::default_bar()), // ); // bar.set_message("processing users"); // bar // }); // let semaphore = Arc::new(Semaphore::new(concurrency)); // let mut handles: Vec> = Vec::with_capacity(emails.len()); // for email in emails { // let username = match email_to_username.get(&email) { // Some(u) => u.clone(), // None => { // error!(%email, "e-mail not found in sync map, skipping"); // continue; // } // }; // let permit = semaphore.clone().acquire_owned().await?; // let client_clone = client.clone(); // let pool_id = pool_id.to_string(); // let groups = groups.to_vec(); // let email_clone = email.clone(); // let pb_clone = pb.clone(); // let handle = tokio::spawn(async move { // let _permit = permit; // keep permit alive for the duration of this task // if let Err(err) = admin_add_user_to_groups( // &client_clone, // &pool_id, // &username, // &groups, // ) // .await // { // error!( // email = %email_clone, // %username, // error = ?err, // "failed to add user to one or more groups" // ); // } else { // info!( // email = %email_clone, // %username, // groups = ?groups, // "user successfully processed for all groups" // ); // } // pb_clone.inc(1); // }); // handles.push(handle); // } // // Wait for all tasks to complete // for handle in handles { // // Ignore panics here, just surface as error logs. // if let Err(join_err) = handle.await { // error!(error = ?join_err, "join error while processing a user"); // } // } // pb.finish_with_message("done"); // info!("finished processing all users for add-groups operation"); // Ok(()) // }