aboutsummaryrefslogtreecommitdiffstats
path: root/src/main.rs
diff options
context:
space:
mode:
authormurilo ijanc2025-11-18 15:42:18 -0300
committermurilo ijanc2025-11-18 15:42:18 -0300
commit6116f744308210e8f722a63b08a5d714ec7b7c40 (patch)
tree637a7643bddd9c1e6a12f8c941a2b853a6e2f929 /src/main.rs
parent159646d3ac75937fe0b83a2f97f52ce9418510eb (diff)
downloadcogops-6116f744308210e8f722a63b08a5d714ec7b7c40.tar.gz
Add users to groups
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs370
1 files changed, 322 insertions, 48 deletions
diff --git a/src/main.rs b/src/main.rs
index 896804d..aaac2ae 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -21,21 +21,24 @@
// accept timeout
// accept poolid
-
mod helper;
+use std::collections::HashMap;
+use std::path::Path;
use std::path::PathBuf;
+use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
-use clap::{ArgAction, Parser, Subcommand};
-use tracing::{debug, info};
-use tracing_subscriber::EnvFilter;
-use tokio::fs::File;
-use tokio::io::{self, AsyncWrite, AsyncWriteExt};
use aws_sdk_cognitoidentityprovider::Client as CognitoClient;
use aws_sdk_cognitoidentityprovider::types::UserType;
-
+use clap::{ArgAction, Parser, Subcommand};
+use tokio::fs::File;
+use tokio::io::{self, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader};
+use tokio::sync::Semaphore;
+use tokio::task::JoinHandle;
+use tracing::{debug, error, info};
+use tracing_subscriber::EnvFilter;
const LONG_VERSION: &str = concat!(
env!("CARGO_PKG_NAME"),
@@ -81,12 +84,75 @@ enum Commands {
Sync(SyncArgs),
/// Add users to one or more Cognito groups.
- Add(GroupOperationArgs),
+ Add(AddArgs),
/// Remove users from one or more Cognito groups.
Del(GroupOperationArgs),
}
+/// Arguments for the `add` operation.
+///
+/// High-level flow:
+/// - `sync_file` is the CSV produced by `sync` (username,email).
+/// - `emails_file` is a plain text file with one e-mail per line,
+/// representing the users that should be added to the given groups.
+/// - `groups` is the list of Cognito group names.
+/// - `pool_id` is the Cognito User Pool ID.
+/// - `concurrency` defines how many users are processed in parallel.
+#[derive(Debug, Parser)]
+pub struct AddArgs {
+ /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX).
+ #[arg(long = "pool-id")]
+ pub pool_id: String,
+
+ /// CSV file generated by the `sync` command (username,email).
+ ///
+ /// This file is loaded into a HashMap<email, username> and used
+ /// as the source of truth for resolving usernames from e-mail.
+ #[arg(
+ long = "sync-file",
+ value_name = "CSV_PATH",
+ help = "CSV file produced by `sync` containing username,email columns"
+ )]
+ pub sync_file: PathBuf,
+
+ /// Text file containing one e-mail per line.
+ ///
+ /// Each e-mail will be normalized (trim + lowercase) and then
+ /// looked up in the sync CSV map to obtain the corresponding username.
+ #[arg(
+ long = "emails-file",
+ value_name = "TXT_PATH",
+ help = "Plain text file with one e-mail per line to be added to the groups"
+ )]
+ pub emails_file: PathBuf,
+
+ /// One or more Cognito group names.
+ ///
+ /// All resolved users will be added to every group listed here.
+ #[arg(
+ long = "group",
+ alias = "groups",
+ value_name = "GROUP",
+ num_args = 1..,
+ required = true
+ )]
+ pub groups: Vec<String>,
+
+ /// Maximum number of users to process concurrently.
+ ///
+ /// This controls how many users are handled in parallel when calling
+ /// the AdminAddUserToGroup API.
+ #[arg(long = "concurrency", value_name = "N", default_value_t = 4)]
+ pub concurrency: usize,
+
+ /// Global timeout for the operation, in seconds.
+ ///
+ /// When set, the entire add operation is bounded by this timeout.
+ #[arg(long = "timeout", value_name = "SECONDS")]
+ pub timeout: Option<u64>,
+}
+
/// Common arguments shared by group-based operations.
#[derive(clap::Args, Debug, Clone)]
pub struct CommonOperationArgs {
@@ -175,6 +241,11 @@ struct GroupOperationArgs {
#[tokio::main]
async fn main() -> Result<()> {
+ // let users = read_sync_file_to_map(".mail_sync").await?;
+
+ // for (email, username) in &users {
+ // println!("{email} => {username}");
+ // }
let cli = Cli::parse();
init_tracing(cli.verbose);
@@ -191,18 +262,9 @@ async fn main() -> Result<()> {
run_sync(&common).await?;
}
- _ => unimplemented!(),
- // Commands::Add(args) => {
- // let common = CommonOperationArgs {
- // pool_id: args.pool_id,
- // groups: args.groups,
- // emails_file: Some(args.emails_file),
- // concurrency: args.concurrency,
- // timeout: args.timeout,
- // };
-
- // run_add_groups(common).await?;
- // }
+ Commands::Add(args) => {
+ run_add_groups(args).await?;
+ }
// Commands::Del(args) => {
// let common = CommonOperationArgs {
// pool_id: args.pool_id,
@@ -214,6 +276,7 @@ async fn main() -> Result<()> {
// run_remove_groups(common).await?;
// }
+ _ => unimplemented!(),
}
Ok(())
@@ -265,9 +328,8 @@ pub async fn run_sync(args: &CommonOperationArgs) -> Result<()> {
"Starting users sync from Cognito user pool"
);
- let config = aws_config::load_from_env()
- .await;
- // .context("failed to load AWS configuration")?;
+ let config = aws_config::load_from_env().await;
+ // .context("failed to load AWS configuration")?;
let client = CognitoClient::new(&config);
let timeout = args.timeout.map(Duration::from_secs);
@@ -294,21 +356,28 @@ pub async fn run_sync(args: &CommonOperationArgs) -> Result<()> {
Ok(())
}
-async fn run_add_groups(args: CommonOperationArgs) -> Result<()> {
- info!(
- pool_id = %args.pool_id,
- emails_file = ?args.emails_file,
- concurrency = ?args.concurrency,
- timeout = ?args.timeout,
- "add groups operation requested (not implemented yet)"
- );
+async fn run_add_groups(args: AddArgs) -> Result<()> {
+ // info!(
+ // pool_id = %args.pool_id,
+ // emails_file = ?args.emails_file,
+ // concurrency = ?args.concurrency,
+ // timeout = ?args.timeout,
+ // "add groups operation requested (not implemented yet)"
+ // );
- if let Some(seconds) = args.timeout {
- let _timeout = Duration::from_secs(seconds);
- debug!(?seconds, "add operation timeout configured");
- }
+ let config = aws_config::load_from_env().await;
+ let client = CognitoClient::new(&config);
+
+ add_users_to_groups_from_files(
+ &client,
+ &args.pool_id,
+ &args.sync_file,
+ &args.emails_file,
+ &args.groups,
+ args.concurrency,
+ )
+ .await?;
- // TODO: implement add-to-groups logic.
Ok(())
}
@@ -334,15 +403,19 @@ async fn run_remove_groups(args: CommonOperationArgs) -> Result<()> {
///
/// If `args.emails_file` is set, the CSV is written to that file.
/// Otherwise, the CSV is written to stdout.
-pub(crate)async fn sync_users_to_csv(client: &CognitoClient, args: &CommonOperationArgs) -> Result<()> {
- let mut writer: Box<dyn AsyncWrite + Unpin + Send> = if let Some(path) = &args.emails_file {
- let file = File::create(path)
- .await
- .with_context(|| format!("failed to create output file at '{}'", path.display()))?;
- Box::new(file)
- } else {
- Box::new(io::stdout())
- };
+pub(crate) async fn sync_users_to_csv(
+ client: &CognitoClient,
+ args: &CommonOperationArgs,
+) -> Result<()> {
+ let mut writer: Box<dyn AsyncWrite + Unpin + Send> =
+ if let Some(path) = &args.emails_file {
+ let file = File::create(path).await.with_context(|| {
+ format!("failed to create output file at '{}'", path.display())
+ })?;
+ Box::new(file)
+ } else {
+ Box::new(io::stdout())
+ };
// CSV header
writer
@@ -382,9 +455,8 @@ pub(crate)async fn sync_users_to_csv(client: &CognitoClient, args: &CommonOperat
total_users += 1;
}
- pagination_token = response
- .pagination_token()
- .map(|token| token.to_owned());
+ pagination_token =
+ response.pagination_token().map(|token| token.to_owned());
if pagination_token.is_none() {
break;
@@ -412,3 +484,205 @@ fn extract_username_and_email(user: &UserType) -> (String, String) {
(username, email)
}
+/// Read a Cognito sync CSV file ("username,email") and return a
+/// HashMap<email, username>.
+///
+/// Expected CSV format:
+/// ```text
+/// username,email
+/// johndoe,john@example.com
+/// alice,alice@example.com
+/// ...
+/// ```
+///
+/// Notes:
+/// - The first line is treated as a header and skipped.
+/// - Lines missing either username or email are ignored.
+/// - Email is lowercased and trimmed to allow normalized lookups.
+pub async fn read_sync_file_to_map<P: AsRef<Path>>(
+ path: P,
+) -> Result<HashMap<String, String>> {
+ let file = File::open(&path).await.with_context(|| {
+ format!("failed to open sync file '{}'", path.as_ref().display())
+ })?;
+
+ let reader = BufReader::new(file);
+ let mut lines = reader.lines();
+
+ let mut map = HashMap::<String, String>::new();
+
+ // Skip header: "username,email"
+ if let Some(line) = lines.next_line().await? {
+ let _header = line.trim();
+ // We can ignore header validation for now.
+ }
+
+ while let Some(line) = lines.next_line().await? {
+ let trimmed = line.trim();
+ if trimmed.is_empty() {
+ continue;
+ }
+
+ let parts: Vec<&str> = trimmed.split(',').collect();
+ if parts.len() < 2 {
+ // malformed line, skip
+ continue;
+ }
+
+ let username = parts[0].trim();
+ let email = parts[1].trim().to_lowercase();
+
+ if !email.is_empty() && !username.is_empty() {
+ map.insert(email, username.to_string());
+ }
+ }
+
+ Ok(map)
+}
+
+/// Load a plain-text file containing one e-mail per line.
+///
+/// Each line is trimmed and lowercased. Empty lines are skipped.
+pub async fn load_email_list<P: AsRef<Path>>(path: P) -> Result<Vec<String>> {
+ let file = File::open(&path).await.with_context(|| {
+ format!(
+ "failed to open e-mail list file '{}'",
+ path.as_ref().display()
+ )
+ })?;
+
+ let reader = BufReader::new(file);
+ let mut lines = reader.lines();
+ let mut emails = Vec::new();
+
+ while let Some(line) = lines.next_line().await? {
+ let email = line.trim().to_lowercase();
+ if email.is_empty() {
+ continue;
+ }
+ emails.push(email);
+ }
+
+ Ok(emails)
+}
+
+/// Add a Cognito user to one or more groups using the Admin API.
+///
+/// This function assumes AWS credentials and permissions allow admin operations.
+pub async fn admin_add_user_to_groups(
+ client: &CognitoClient,
+ pool_id: &str,
+ username: &str,
+ groups: &[String],
+) -> Result<()> {
+ for group in groups {
+ info!(%username, %group, %pool_id, "adding user to Cognito group");
+
+ client
+ .admin_add_user_to_group()
+ .user_pool_id(pool_id)
+ .username(username)
+ .group_name(group)
+ .send()
+ .await
+ .with_context(|| {
+ format!(
+ "failed to add user '{}' to group '{}'",
+ username, group
+ )
+ })?;
+
+ info!(%username, %group, "user successfully added to group");
+ }
+
+ Ok(())
+}
+
+/// High-level flow:
+/// 1. Load sync CSV into HashMap<email, username>.
+/// 2. Load target e-mails (one per line).
+/// 3. For each email, find username in HashMap.
+/// 4. Call `admin_add_user_to_groups` with concurrency limit.
+/// 5. Log success or failure for each email.
+///
+/// `sync_csv_path`: CSV generated by `sync` (username,email).
+/// `emails_list_path`: TXT file with one e-mail per line (users to be added).
+pub async fn add_users_to_groups_from_files(
+ client: &CognitoClient,
+ pool_id: &str,
+ sync_csv_path: &Path,
+ emails_list_path: &Path,
+ groups: &[String],
+ concurrency: usize,
+) -> Result<()> {
+ let concurrency = std::cmp::max(1, concurrency);
+
+ // 1. Load sync CSV into map: email -> username
+ let email_to_username = read_sync_file_to_map(sync_csv_path).await?;
+ info!(
+ total_entries = email_to_username.len(),
+ "loaded sync map (email -> username)"
+ );
+
+ // 2. Load target e-mails
+ let emails = load_email_list(emails_list_path).await?;
+ info!(total_emails = emails.len(), "loaded target e-mail list");
+
+ let semaphore = Arc::new(Semaphore::new(concurrency));
+ let mut handles: Vec<JoinHandle<()>> = Vec::with_capacity(emails.len());
+
+ for email in emails {
+ let username = match email_to_username.get(&email) {
+ Some(u) => u.clone(),
+ None => {
+ error!(%email, "e-mail not found in sync map, skipping");
+ continue;
+ }
+ };
+
+ let permit = semaphore.clone().acquire_owned().await?;
+ let client_clone = client.clone();
+ let pool_id = pool_id.to_string();
+ let groups = groups.to_vec();
+
+ let handle = tokio::spawn(async move {
+ let _permit = permit; // keep permit alive for the duration of this task
+
+ if let Err(err) = admin_add_user_to_groups(
+ &client_clone,
+ &pool_id,
+ &username,
+ &groups,
+ )
+ .await
+ {
+ error!(
+ %email,
+ %username,
+ error = ?err,
+ "failed to add user to one or more groups"
+ );
+ } else {
+ info!(
+ %email,
+ %username,
+ groups = ?groups,
+ "user successfully processed for all groups"
+ );
+ }
+ });
+
+ handles.push(handle);
+ }
+
+ // Wait for all tasks to complete
+ for handle in handles {
+ // Ignore panics here, just surface as error logs.
+ if let Err(join_err) = handle.await {
+ error!(error = ?join_err, "join error while processing a user");
+ }
+ }
+
+ info!("finished processing all users for add-groups operation");
+ Ok(())
+}