feat: multi-threading secret lookups

This commit is contained in:
Bart van der Braak 2023-11-08 01:11:38 +01:00
parent dc1d8a7a1f
commit cd7f7f59ab

View file

@ -4,6 +4,7 @@ use clap::Parser;
use futures::stream::StreamExt;
use std::fs::File;
use std::io::Write;
use tokio::sync::mpsc;
#[derive(Parser)]
#[clap(author, version, about, long_about = None)]
@ -46,15 +47,42 @@ async fn fetch_secrets_from_key_vault(
while let Some(page) = secret_pages.next().await {
let page = page?;
let (tx, mut rx) = mpsc::channel(32); // Channel for concurrent secret retrieval
for secret in &page.value {
if let Some(filter) = filter {
if !secret.id.contains(filter) {
continue;
}
}
let secret_name = secret.id.split('/').last().unwrap_or_default();
let secret_bundle = client.get(secret_name).await?;
secret_values.push((secret.id.clone(), secret_bundle.value));
let tx = tx.clone();
// Clone necessary data before moving into the spawned task
let secret_id = secret.id.clone();
let client_clone = client.clone();
tokio::spawn(async move {
let secret_name = secret_id.split('/').last().unwrap_or_default();
let secret_bundle = client_clone.get(secret_name).await;
// Handle the result and send it through the channel
match secret_bundle {
Ok(bundle) => {
tx.send((secret_id, bundle.value)).await.expect("Send error");
}
Err(err) => {
eprintln!("Error fetching secret: {}", err);
// You can decide to continue or not in case of an error.
}
}
});
}
drop(tx); // Drop the sender to signal the end of sending tasks
while let Some(result) = rx.recv().await {
let (key, value) = result;
secret_values.push((key, value));
}
}
@ -88,4 +116,4 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Process completed successfully!");
Ok(())
}
}