diff --git a/.github/workflows/temp-branch-build-and-push.yaml b/.github/workflows/temp-branch-build-and-push.yaml index 24ee975a5..9a4b6561e 100644 --- a/.github/workflows/temp-branch-build-and-push.yaml +++ b/.github/workflows/temp-branch-build-and-push.yaml @@ -3,7 +3,7 @@ name: Branch - Build and push docker image on: push: branches: - - "POP-2530/implement-anon-stat-change-in-hnsw-that-do-not-count-all-rotations" + - "leonidasnanos-pop-2659-2d-hd-and-implementation-of-the-required-wiring" concurrency: group: "${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}" diff --git a/.test.env b/.test.env index 906e71a97..f6b3a501a 100644 --- a/.test.env +++ b/.test.env @@ -19,6 +19,7 @@ SMPC__KMS_KEY_ARNS='["arn:aws:kms:us-east-1:000000000000:key/00000000-0000-0000- SMPC__SERVICE_PORTS='["4000","4001","4002"]' SMPC__HEALTHCHECK_PORTS='["3000","3001","3002"]' SMPC__SHARES_BUCKET_NAME="wf-smpcv2-dev-sns-requests" +SMPC__SNS_BUFFER_BUCKET_NAME="wf-smpcv2-stage-sns-buffer" SMPC__RESULTS_TOPIC_ARN="arn:aws:sns:us-east-1:000000000000:iris-mpc-results.fifo" AWS_ENDPOINT_URL=http://localstack:4566 AWS_ACCESS_KEY_ID=test diff --git a/deploy/e2e/iris-mpc-0.yaml.tpl b/deploy/e2e/iris-mpc-0.yaml.tpl index 87cd386ee..0850fa820 100644 --- a/deploy/e2e/iris-mpc-0.yaml.tpl +++ b/deploy/e2e/iris-mpc-0.yaml.tpl @@ -187,6 +187,9 @@ iris-mpc-0: - name: SMPC__SHARES_BUCKET_NAME value: "wf-smpcv2-stage-sns-requests" + - name: SMPC__SNS_BUFFER_BUCKET_NAME + value: "wf-smpcv2-stage-sns-buffer" + - name: SMPC__CLEAR_DB_BEFORE_INIT value: "true" diff --git a/deploy/e2e/iris-mpc-1.yaml.tpl b/deploy/e2e/iris-mpc-1.yaml.tpl index e58b404ab..332b77ec7 100644 --- a/deploy/e2e/iris-mpc-1.yaml.tpl +++ b/deploy/e2e/iris-mpc-1.yaml.tpl @@ -187,6 +187,9 @@ iris-mpc-1: - name: SMPC__SHARES_BUCKET_NAME value: "wf-smpcv2-stage-sns-requests" + - name: SMPC__SNS_BUFFER_BUCKET_NAME + value: "wf-smpcv2-stage-sns-buffer" + - name: SMPC__CLEAR_DB_BEFORE_INIT value: "true" diff --git a/deploy/e2e/iris-mpc-2.yaml.tpl b/deploy/e2e/iris-mpc-2.yaml.tpl index ad9e1c174..661bc475f 100644 --- a/deploy/e2e/iris-mpc-2.yaml.tpl +++ b/deploy/e2e/iris-mpc-2.yaml.tpl @@ -187,6 +187,9 @@ iris-mpc-2: - name: SMPC__SHARES_BUCKET_NAME value: "wf-smpcv2-stage-sns-requests" + - name: SMPC__SNS_BUFFER_BUCKET_NAME + value: "wf-smpcv2-stage-sns-buffer" + - name: SMPC__CLEAR_DB_BEFORE_INIT value: "true" diff --git a/deploy/stage/smpcv2-0-stage/values-iris-mpc.yaml b/deploy/stage/smpcv2-0-stage/values-iris-mpc.yaml index 9b167eb2d..d6cfb9f17 100644 --- a/deploy/stage/smpcv2-0-stage/values-iris-mpc.yaml +++ b/deploy/stage/smpcv2-0-stage/values-iris-mpc.yaml @@ -49,7 +49,7 @@ env: - name: SMPC__PROCESSING_TIMEOUT_SECS value: "120" - + - name: SMPC__HEARTBEAT_INITIAL_RETRIES value: "65" @@ -71,6 +71,9 @@ env: - name: SMPC__SHARES_BUCKET_NAME value: "wf-smpcv2-stage-sns-requests" + - name: SMPC__SNS_BUFFER_BUCKET_NAME + value: "wf-smpcv2-stage-sns-buffer" + - name: SMPC__ENABLE_S3_IMPORTER value: "true" @@ -157,7 +160,7 @@ env: - name: SMPC__ENABLE_MODIFICATIONS_REPLAY value: "true" - - name : SMPC__ENABLE_DEBUG_TIMING + - name: SMPC__ENABLE_DEBUG_TIMING value: "true" initContainer: diff --git a/deploy/stage/smpcv2-1-stage/values-iris-mpc.yaml b/deploy/stage/smpcv2-1-stage/values-iris-mpc.yaml index 1fdb1edb5..6fd72eabe 100644 --- a/deploy/stage/smpcv2-1-stage/values-iris-mpc.yaml +++ b/deploy/stage/smpcv2-1-stage/values-iris-mpc.yaml @@ -49,7 +49,7 @@ env: - name: SMPC__PROCESSING_TIMEOUT_SECS value: "120" - + - name: SMPC__HEARTBEAT_INITIAL_RETRIES value: "65" @@ -71,6 +71,9 @@ env: - name: SMPC__SHARES_BUCKET_NAME value: "wf-smpcv2-stage-sns-requests" + - name: SMPC__SNS_BUFFER_BUCKET_NAME + value: "wf-smpcv2-stage-sns-buffer" + - name: SMPC__ENABLE_S3_IMPORTER value: "true" @@ -105,7 +108,7 @@ env: value: "10" - name: SMPC__ENABLE_SENDING_ANONYMIZED_STATS_MESSAGE - value: "true" + value: "true" - name: SMPC__ENABLE_SENDING_MIRROR_ANONYMIZED_STATS_MESSAGE value: "true" @@ -157,7 +160,7 @@ env: - name: SMPC__ENABLE_MODIFICATIONS_REPLAY value: "true" - - name : SMPC__ENABLE_DEBUG_TIMING + - name: SMPC__ENABLE_DEBUG_TIMING value: "true" initContainer: @@ -201,7 +204,7 @@ initContainer: # Execute AWS CLI command with the generated JSON aws route53 change-resource-record-sets --hosted-zone-id "$HOSTED_ZONE_ID" --change-batch "$BATCH_JSON" - + cd /libs aws s3 cp s3://wf-smpcv2-stage-libs/libcublas.so.12.2.5.6 . aws s3 cp s3://wf-smpcv2-stage-libs/libcublasLt.so.12.2.5.6 . diff --git a/deploy/stage/smpcv2-2-stage/values-iris-mpc.yaml b/deploy/stage/smpcv2-2-stage/values-iris-mpc.yaml index 0ef4bd6f3..799a06c4e 100644 --- a/deploy/stage/smpcv2-2-stage/values-iris-mpc.yaml +++ b/deploy/stage/smpcv2-2-stage/values-iris-mpc.yaml @@ -49,7 +49,7 @@ env: - name: SMPC__PROCESSING_TIMEOUT_SECS value: "120" - + - name: SMPC__HEARTBEAT_INITIAL_RETRIES value: "65" @@ -71,6 +71,9 @@ env: - name: SMPC__SHARES_BUCKET_NAME value: "wf-smpcv2-stage-sns-requests" + - name: SMPC__SNS_BUFFER_BUCKET_NAME + value: "wf-smpcv2-stage-sns-buffer" + - name: SMPC__ENABLE_S3_IMPORTER value: "true" @@ -85,7 +88,7 @@ env: - name: SMPC__LOAD_CHUNKS_BUFFER_SIZE value: "1024" - + - name: SMPC__CLEAR_DB_BEFORE_INIT value: "true" @@ -100,7 +103,7 @@ env: - name: SMPC__MATCH_DISTANCES_BUFFER_SIZE value: "64" - + - name: SMPC__N_BUCKETS value: "10" @@ -157,7 +160,7 @@ env: - name: SMPC__ENABLE_MODIFICATIONS_REPLAY value: "true" - - name : SMPC__ENABLE_DEBUG_TIMING + - name: SMPC__ENABLE_DEBUG_TIMING value: "true" initContainer: diff --git a/iris-mpc-common/src/config/mod.rs b/iris-mpc-common/src/config/mod.rs index 57adc0482..d586112c6 100644 --- a/iris-mpc-common/src/config/mod.rs +++ b/iris-mpc-common/src/config/mod.rs @@ -70,6 +70,9 @@ pub struct Config { #[serde(default = "default_shares_bucket_name")] pub shares_bucket_name: String, + #[serde(default = "default_sns_buffer_bucket_name")] + pub sns_buffer_bucket_name: String, + #[serde(default)] pub clear_db_before_init: bool, @@ -190,6 +193,9 @@ pub struct Config { #[serde(default)] pub enable_sending_mirror_anonymized_stats_message: bool, + #[serde(default)] + pub enable_sending_anonymized_stats_2d_message: bool, + #[serde(default)] pub enable_reauth: bool, @@ -296,6 +302,10 @@ fn default_shares_bucket_name() -> String { "wf-mpc-prod-smpcv2-sns-requests".to_string() } +fn default_sns_buffer_bucket_name() -> String { + "wf-smpcv2-prod-sns-buffer".to_string() +} + fn default_schema_name() -> String { "SMPC".to_string() } @@ -575,6 +585,7 @@ pub struct CommonConfig { startup_sync_timeout_secs: u64, public_key_base_url: String, shares_bucket_name: String, + sns_buffer_bucket_name: String, clear_db_before_init: bool, init_db_size: usize, max_db_size: usize, @@ -596,6 +607,7 @@ pub struct CommonConfig { n_buckets: usize, enable_sending_anonymized_stats_message: bool, enable_sending_mirror_anonymized_stats_message: bool, + enable_sending_anonymized_stats_2d_message: bool, enable_reauth: bool, enable_reset: bool, hawk_request_parallelism: usize, @@ -640,6 +652,7 @@ impl From for CommonConfig { startup_sync_timeout_secs, public_key_base_url, shares_bucket_name, + sns_buffer_bucket_name, clear_db_before_init, init_db_size, max_db_size, @@ -674,6 +687,7 @@ impl From for CommonConfig { n_buckets, enable_sending_anonymized_stats_message, enable_sending_mirror_anonymized_stats_message, + enable_sending_anonymized_stats_2d_message, enable_reauth, enable_reset, hawk_request_parallelism, @@ -707,6 +721,7 @@ impl From for CommonConfig { startup_sync_timeout_secs, public_key_base_url, shares_bucket_name, + sns_buffer_bucket_name, clear_db_before_init, init_db_size, max_db_size, @@ -728,6 +743,7 @@ impl From for CommonConfig { n_buckets, enable_sending_anonymized_stats_message, enable_sending_mirror_anonymized_stats_message, + enable_sending_anonymized_stats_2d_message, enable_reauth, enable_reset, hawk_request_parallelism, diff --git a/iris-mpc-common/src/helpers/smpc_request.rs b/iris-mpc-common/src/helpers/smpc_request.rs index ab7a270b8..dd8e39bea 100644 --- a/iris-mpc-common/src/helpers/smpc_request.rs +++ b/iris-mpc-common/src/helpers/smpc_request.rs @@ -103,6 +103,7 @@ where pub const IDENTITY_DELETION_MESSAGE_TYPE: &str = "identity_deletion"; pub const ANONYMIZED_STATISTICS_MESSAGE_TYPE: &str = "anonymized_statistics"; +pub const ANONYMIZED_STATISTICS_2D_MESSAGE_TYPE: &str = "anonymized_statistics_2d"; pub const CIRCUIT_BREAKER_MESSAGE_TYPE: &str = "circuit_breaker"; pub const UNIQUENESS_MESSAGE_TYPE: &str = "uniqueness"; pub const REAUTH_MESSAGE_TYPE: &str = "reauth"; diff --git a/iris-mpc-common/src/helpers/sqs_s3_helper.rs b/iris-mpc-common/src/helpers/sqs_s3_helper.rs index 29b091069..e1e9df6f3 100644 --- a/iris-mpc-common/src/helpers/sqs_s3_helper.rs +++ b/iris-mpc-common/src/helpers/sqs_s3_helper.rs @@ -22,10 +22,10 @@ pub async fn upload_file_to_s3( .await { Ok(_) => { - tracing::info!("File uploaded successfully."); + tracing::info!("File {} uploaded to s3 successfully", key); } Err(e) => { - tracing::error!("Error: Failed to upload file: {:?}", e); + tracing::error!("Failed to upload file {} to s3: {:?}", key, e); return Err(SharesDecodingError::UploadS3Error); } } diff --git a/iris-mpc-common/src/helpers/statistics.rs b/iris-mpc-common/src/helpers/statistics.rs index d3034ce85..645be559f 100644 --- a/iris-mpc-common/src/helpers/statistics.rs +++ b/iris-mpc-common/src/helpers/statistics.rs @@ -6,6 +6,7 @@ use chrono::{ use serde::{Deserialize, Serialize}; use std::fmt; +// 1D anonymized statistics types #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BucketResult { pub count: usize, @@ -147,3 +148,127 @@ impl BucketStatistics { self.next_start_time_utc_timestamp = Some(now_timestamp); } } + +// 2D anonymized statistics types +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Bucket2DResult { + pub count: usize, + pub left_hamming_distance_bucket: [f64; 2], + pub right_hamming_distance_bucket: [f64; 2], +} + +impl Eq for Bucket2DResult {} +impl PartialEq for Bucket2DResult { + fn eq(&self, other: &Self) -> bool { + self.count == other.count + && (self.left_hamming_distance_bucket[0] - other.left_hamming_distance_bucket[0]).abs() + <= 1e-9 + && (self.left_hamming_distance_bucket[1] - other.left_hamming_distance_bucket[1]).abs() + <= 1e-9 + && (self.right_hamming_distance_bucket[0] - other.right_hamming_distance_bucket[0]) + .abs() + <= 1e-9 + && (self.right_hamming_distance_bucket[1] - other.right_hamming_distance_bucket[1]) + .abs() + <= 1e-9 + } +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct BucketStatistics2D { + pub buckets: Vec, + pub n_buckets_per_side: usize, + // The number of two-sided matches gathered before sending the statistics + pub match_distances_buffer_size: usize, + pub party_id: usize, + #[serde(with = "ts_seconds")] + pub start_time_utc_timestamp: DateTime, + #[serde(with = "ts_seconds_option")] + pub end_time_utc_timestamp: Option>, + #[serde(skip_serializing)] + #[serde(skip_deserializing)] + #[serde(with = "ts_seconds_option")] + pub next_start_time_utc_timestamp: Option>, +} + +impl BucketStatistics2D { + pub fn is_empty(&self) -> bool { + self.buckets.is_empty() + } +} + +impl fmt::Display for BucketStatistics2D { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, " party_id: {}", self.party_id)?; + writeln!(f, " start_time_utc: {}", self.start_time_utc_timestamp)?; + match &self.end_time_utc_timestamp { + Some(end) => writeln!(f, " end_time_utc: {}", end)?, + None => writeln!(f, " end_time_utc: ")?, + } + for bucket in &self.buckets { + writeln!( + f, + " L({:.3}-{:.3}), R({:.3}-{:.3}): {}", + bucket.left_hamming_distance_bucket[0], + bucket.left_hamming_distance_bucket[1], + bucket.right_hamming_distance_bucket[0], + bucket.right_hamming_distance_bucket[1], + bucket.count + )?; + } + Ok(()) + } +} + +impl BucketStatistics2D { + pub fn new( + match_distances_buffer_size: usize, + n_buckets_per_side: usize, + party_id: usize, + ) -> Self { + Self { + buckets: Vec::with_capacity(n_buckets_per_side * n_buckets_per_side), + n_buckets_per_side, + match_distances_buffer_size, + party_id, + start_time_utc_timestamp: Utc::now(), + end_time_utc_timestamp: None, + next_start_time_utc_timestamp: None, + } + } + + /// Fill bucket counts for the 2D histogram. + /// buckets_2d is expected in row-major order (left index major): + /// buckets_2d[left_idx * n_buckets_per_side + right_idx] + pub fn fill_buckets( + &mut self, + buckets_2d: &[u32], + match_threshold_ratio: f64, + start_timestamp: Option>, + ) { + tracing::info!("Filling 2D buckets: {} entries", buckets_2d.len()); + + let now_timestamp = Utc::now(); + + self.buckets.clear(); + self.end_time_utc_timestamp = Some(now_timestamp); + + let step = match_threshold_ratio / (self.n_buckets_per_side as f64); + for (i, &count) in buckets_2d.iter().enumerate() { + let left_idx = i / self.n_buckets_per_side; + let right_idx = i % self.n_buckets_per_side; + let left_range = [step * (left_idx as f64), step * ((left_idx + 1) as f64)]; + let right_range = [step * (right_idx as f64), step * ((right_idx + 1) as f64)]; + self.buckets.push(Bucket2DResult { + count: count as usize, + left_hamming_distance_bucket: left_range, + right_hamming_distance_bucket: right_range, + }); + } + + if let Some(start_timestamp) = start_timestamp { + self.start_time_utc_timestamp = start_timestamp; + } + self.next_start_time_utc_timestamp = Some(now_timestamp); + } +} diff --git a/iris-mpc-common/src/job.rs b/iris-mpc-common/src/job.rs index 646ac8f38..96a711fd4 100644 --- a/iris-mpc-common/src/job.rs +++ b/iris-mpc-common/src/job.rs @@ -6,7 +6,7 @@ use crate::helpers::batch_sync::{ use crate::{ galois_engine::degree4::{GaloisRingIrisCodeShare, GaloisRingTrimmedMaskCodeShare}, helpers::{ - statistics::BucketStatistics, + statistics::{BucketStatistics, BucketStatistics2D}, sync::{Modification, ModificationKey}, }, ROTATIONS, @@ -407,6 +407,9 @@ pub struct ServerJobResult { // See struct definition for more details pub anonymized_bucket_statistics_left: BucketStatistics, pub anonymized_bucket_statistics_right: BucketStatistics, + // 2D anonymized statistics across both eyes (only for matches on both sides) + // Only for Normal orientation + pub anonymized_bucket_statistics_2d: BucketStatistics2D, // Mirror orientation bucket statistics pub anonymized_bucket_statistics_left_mirror: BucketStatistics, pub anonymized_bucket_statistics_right_mirror: BucketStatistics, diff --git a/iris-mpc-cpu/src/execution/hawk_main.rs b/iris-mpc-cpu/src/execution/hawk_main.rs index 23a01d32b..d1750552b 100644 --- a/iris-mpc-cpu/src/execution/hawk_main.rs +++ b/iris-mpc-cpu/src/execution/hawk_main.rs @@ -28,7 +28,7 @@ use iris_mpc_common::{ config::TlsConfig, helpers::{ smpc_request::{REAUTH_MESSAGE_TYPE, RESET_CHECK_MESSAGE_TYPE, UNIQUENESS_MESSAGE_TYPE}, - statistics::BucketStatistics, + statistics::{BucketStatistics, BucketStatistics2D}, }, vector_id::VectorId, }; @@ -1154,6 +1154,7 @@ impl HawkResult { anonymized_bucket_statistics_right, anonymized_bucket_statistics_left_mirror: BucketStatistics::default(), // TODO. anonymized_bucket_statistics_right_mirror: BucketStatistics::default(), // TODO. + anonymized_bucket_statistics_2d: BucketStatistics2D::default(), // TODO. successful_reauths, reauth_target_indices: batch.reauth_target_indices, diff --git a/iris-mpc-gpu/src/server/actor.rs b/iris-mpc-gpu/src/server/actor.rs index 5dbaa7d3c..459a79d80 100644 --- a/iris-mpc-gpu/src/server/actor.rs +++ b/iris-mpc-gpu/src/server/actor.rs @@ -38,7 +38,7 @@ use iris_mpc_common::{ inmemory_store::InMemoryStore, sha256::sha256_bytes, smpc_request::{REAUTH_MESSAGE_TYPE, RESET_CHECK_MESSAGE_TYPE, UNIQUENESS_MESSAGE_TYPE}, - statistics::BucketStatistics, + statistics::{BucketStatistics, BucketStatistics2D}, }, iris_db::{get_dummy_shares_for_deletion, iris::MATCH_THRESHOLD_RATIO}, job::{Eye, JobSubmissionHandle, ServerJobResult}, @@ -178,6 +178,7 @@ pub struct ServerActor { anonymized_bucket_statistics_right_mirror: BucketStatistics, // 2D anon stats buffer both_side_match_distances_buffer: Vec, + anonymized_bucket_statistics_2d: BucketStatistics2D, full_scan_side: Eye, full_scan_side_switching_enabled: bool, } @@ -523,6 +524,9 @@ impl ServerActor { let both_side_match_distances_buffer = vec![TwoSidedDistanceCache::default(); device_manager.device_count()]; + let anonymized_bucket_statistics_2d = + BucketStatistics2D::new(match_distances_2d_buffer_size, n_buckets, party_id); + Ok(Self { party_id, job_queue, @@ -576,6 +580,7 @@ impl ServerActor { full_scan_side, full_scan_side_switching_enabled, both_side_match_distances_buffer, + anonymized_bucket_statistics_2d, }) } @@ -633,6 +638,7 @@ impl ServerActor { self.anonymized_bucket_statistics_right_mirror .buckets .clear(); + self.anonymized_bucket_statistics_2d.buckets.clear(); tracing::info!( "Full batch duration took: {:?}", @@ -1598,6 +1604,7 @@ impl ServerActor { ); } + // Attempt for 2D anonymized bucket statistics calculation let (one_sided_distance_cache_left, one_sided_distance_cache_right) = if self.full_scan_side == Eye::Left { ( @@ -1676,6 +1683,14 @@ impl ServerActor { ); } tracing::info!("Bucket statistics calculated:\n{}", buckets_2d_string); + + // Fill the 2D anonymized statistics structure for propagation + self.anonymized_bucket_statistics_2d.fill_buckets( + &buckets_2d, + MATCH_THRESHOLD_RATIO, + self.anonymized_bucket_statistics_left + .next_start_time_utc_timestamp, + ); } // Instead of sending to return_channel, we'll return this at the end @@ -1704,6 +1719,7 @@ impl ServerActor { matched_batch_request_ids, anonymized_bucket_statistics_left: self.anonymized_bucket_statistics_left.clone(), anonymized_bucket_statistics_right: self.anonymized_bucket_statistics_right.clone(), + anonymized_bucket_statistics_2d: self.anonymized_bucket_statistics_2d.clone(), anonymized_bucket_statistics_left_mirror: self .anonymized_bucket_statistics_left_mirror .clone(), diff --git a/iris-mpc/bin/server.rs b/iris-mpc/bin/server.rs index 918fb074d..7d5c34cf6 100644 --- a/iris-mpc/bin/server.rs +++ b/iris-mpc/bin/server.rs @@ -5,6 +5,7 @@ use aws_sdk_secretsmanager::Client as SecretsManagerClient; use aws_sdk_sns::{types::MessageAttributeValue, Client as SNSClient}; use aws_sdk_sqs::Client; use axum::{response::IntoResponse, routing::get, Router}; +use chrono::Utc; use clap::Parser; use eyre::{bail, eyre, Context, Report, Result}; use futures::{stream::BoxStream, StreamExt}; @@ -37,9 +38,9 @@ use iris_mpc_common::{ decrypt_iris_share, get_iris_data_by_party_id, validate_iris_share, CircuitBreakerRequest, IdentityDeletionRequest, ReAuthRequest, ReceiveRequestError, ResetCheckRequest, ResetUpdateRequest, SQSMessage, UniquenessRequest, - ANONYMIZED_STATISTICS_MESSAGE_TYPE, CIRCUIT_BREAKER_MESSAGE_TYPE, - IDENTITY_DELETION_MESSAGE_TYPE, REAUTH_MESSAGE_TYPE, RESET_CHECK_MESSAGE_TYPE, - RESET_UPDATE_MESSAGE_TYPE, UNIQUENESS_MESSAGE_TYPE, + ANONYMIZED_STATISTICS_2D_MESSAGE_TYPE, ANONYMIZED_STATISTICS_MESSAGE_TYPE, + CIRCUIT_BREAKER_MESSAGE_TYPE, IDENTITY_DELETION_MESSAGE_TYPE, REAUTH_MESSAGE_TYPE, + RESET_CHECK_MESSAGE_TYPE, RESET_UPDATE_MESSAGE_TYPE, UNIQUENESS_MESSAGE_TYPE, }, smpc_response::{ create_message_type_attribute_map, IdentityDeletionResult, ReAuthResult, @@ -47,6 +48,7 @@ use iris_mpc_common::{ ERROR_FAILED_TO_PROCESS_IRIS_SHARES, ERROR_SKIPPED_REQUEST_PREVIOUS_NODE_BATCH, SMPC_MESSAGE_TYPE_ATTRIBUTE, }, + sqs_s3_helper::upload_file_to_s3, sync::{Modification, ModificationKey, SyncResult, SyncState}, task_monitor::TaskMonitor, }, @@ -63,6 +65,7 @@ use itertools::izip; use metrics_exporter_statsd::StatsdBuilder; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; +use sodiumoxide::hex; use std::process::exit; use std::{ collections::{HashMap, HashSet}, @@ -892,6 +895,8 @@ async fn server_main(config: Config) -> Result<()> { create_message_type_attribute_map(RESET_UPDATE_MESSAGE_TYPE); let anonymized_statistics_attributes = create_message_type_attribute_map(ANONYMIZED_STATISTICS_MESSAGE_TYPE); + let anonymized_statistics_2d_attributes = + create_message_type_attribute_map(ANONYMIZED_STATISTICS_2D_MESSAGE_TYPE); let identity_deletion_result_attributes = create_message_type_attribute_map(IDENTITY_DELETION_MESSAGE_TYPE); @@ -1363,6 +1368,7 @@ async fn server_main(config: Config) -> Result<()> { // Start thread that will be responsible for communicating back the results let (tx, mut rx) = mpsc::channel::(32); // TODO: pick some buffer value let sns_client_bg = aws_clients.sns_client.clone(); + let s3_client_bg = aws_clients.s3_client.clone(); let config_bg = config.clone(); let store_bg = store.clone(); let shutdown_handler_bg = Arc::clone(&shutdown_handler); @@ -1403,6 +1409,7 @@ async fn server_main(config: Config) -> Result<()> { mut modifications, actor_data: _, full_face_mirror_attack_detected, + anonymized_bucket_statistics_2d, }) = rx.recv().await { let dummy_deletion_shares = get_dummy_shares_for_deletion(party_id); @@ -1872,6 +1879,45 @@ async fn server_main(config: Config) -> Result<()> { .await?; } + // Send 2D anonymized statistics if present with their own flag + if config_bg.enable_sending_anonymized_stats_2d_message + && !anonymized_bucket_statistics_2d.buckets.is_empty() + { + tracing::info!("Sending 2D anonymized stats results"); + let serialized = serde_json::to_string(&anonymized_bucket_statistics_2d) + .wrap_err("failed to serialize 2D anonymized statistics result")?; + + // offloading 2D anon stats file to s3 to avoid sending large messages to SNS + // with 2D stats we were exceeding the SNS message size limit + let now_ms = Utc::now().timestamp_millis(); + let sha = iris_mpc_common::helpers::sha256::sha256_bytes(&serialized); + let content_hash = hex::encode(sha); + let s3_key = format!("stats2d/{}_{}.json", now_ms, content_hash); + + upload_file_to_s3( + &config_bg.sns_buffer_bucket_name, + &s3_key, + s3_client_bg.clone(), + serialized.as_bytes(), + ) + .await + .wrap_err("failed to upload 2D anonymized statistics to s3")?; + + // Publish only the S3 key to SNS + let payload = serde_json::to_string(&serde_json::json!({ + "s3_key": s3_key, + }))?; + send_results_to_sns( + vec![payload], + &metadata, + &sns_client_bg, + &config_bg, + &anonymized_statistics_2d_attributes, + ANONYMIZED_STATISTICS_2D_MESSAGE_TYPE, + ) + .await?; + } + shutdown_handler_bg.decrement_batches_pending_completion(); }