Skip to content

Commit 4f5521e

Browse files
Adding SQS trigger for the lambda to convert stored DB data to CSV files (#76)
1 parent e09db82 commit 4f5521e

File tree

3 files changed

+46
-5
lines changed

3 files changed

+46
-5
lines changed

aws/mens_t20i_dataset_stack.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
Stack,
88
RemovalPolicy,
99
aws_events as events,
10+
aws_lambda_event_sources as lambda_event_sources,
1011
aws_events_targets as events_targets,
12+
aws_sqs as sqs,
1113
aws_s3_notifications as s3_notifications,
1214
)
1315
import boto3
@@ -49,6 +51,14 @@ def __init__(
4951
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST
5052
)
5153

54+
######################################## SQS Configurations #####################################################
55+
sqs_queue_to_send_delayed_message_when_new_file_is_downloaded = sqs.Queue(
56+
self,
57+
f"{stack_name}-sqs_queue_to_send_delayed_message_when_new_file_is_downloaded",
58+
visibility_timeout=Duration.minutes(15),
59+
delivery_delay=Duration.minutes(5),
60+
)
61+
5262
######################################## SECRET MANAGER Configurations ##########################################
5363
__db_secrets = get_secret_from_secrets_manager(self._secret_manager_client, "db_secret")
5464
__kaggle_secrets = get_secret_from_secrets_manager(self._secret_manager_client, "kaggle_credentials")
@@ -76,6 +86,7 @@ def __init__(
7686
handler="download_from_cricsheet_lambda_function.handler",
7787
runtime=_lambda.Runtime.PYTHON_3_11,
7888
environment={
89+
"DELAYED_SQS_QUEUE_URL": sqs_queue_to_send_delayed_message_when_new_file_is_downloaded.queue_url,
7990
"DOWNLOAD_BUCKET_NAME": cricsheet_data_downloading_bucket.bucket_name,
8091
"DYNAMODB_TABLE_NAME": dynamodb_to_store_file_status_data.table_name,
8192
"THRESHOLD_FOR_NUMBER_OF_FILES_TO_BE_SENT_FOR_PROCESSING": THRESHOLD_FOR_NUMBER_OF_FILES_TO_BE_SENT_FOR_PROCESSING,
@@ -101,6 +112,8 @@ def __init__(
101112
resources=["*"],
102113
)
103114
)
115+
# Granting permissions to the Lambda function to send messages to the SQS queue
116+
sqs_queue_to_send_delayed_message_when_new_file_is_downloaded.grant_send_messages(cricsheet_data_downloading_lambda)
104117
# EventBridge Rule to trigger the Lambda every Monday at 12:00 AM UTC
105118
event_bridge_rule_to_trigger_cricsheet_data_downloading_lambda = events.Rule(
106119
self,
@@ -270,6 +283,13 @@ def __init__(
270283
resources=["*"],
271284
)
272285
)
286+
# Trgerring the convert_mongodb_data_to_csv lambda function with delayed SQS message
287+
convert_mongodb_data_to_csv_lambda.add_event_source(
288+
lambda_event_sources.SqsEventSource(
289+
sqs_queue_to_send_delayed_message_when_new_file_is_downloaded,
290+
batch_size=1,
291+
)
292+
)
273293

274294
# Lambda function to upload the dataset to KAGGLE and create a new version of dataset
275295
upload_dataset_to_kaggle_lambda = _lambda.Function(

src/mens_t20i_data_collector/_lambdas/download_from_cricsheet/download_from_cricsheet_lambda_function.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ def upload_new_json_data_files_for_data_processing(self, downloaded_zip_file_pat
6969
raise
7070

7171
new_files = self._seggregate_new_files_from_downloaded_zip()
72-
self._upload_new_json_files_to_s3_and_send_sns_notification(new_files=new_files)
72+
self._upload_new_json_files_to_s3(new_files=new_files)
73+
if new_files:
74+
self._trigger_an_sqs_message_whenever_new_file_is_downloaded(new_files=new_files)
7375

7476
def _list_all_files_from_dynamo_db(self) -> Set:
7577
response = self._dynamo_db_to_store_file_data_extraction_status.scan(ProjectionExpression="file_name")
@@ -87,7 +89,25 @@ def _seggregate_new_files_from_downloaded_zip(self) -> List:
8789
logger.info(f"Total newly downloaded files: {len(new_files)}")
8890
return new_files
8991

90-
def _upload_new_json_files_to_s3_and_send_sns_notification(self, new_files: List):
92+
def _trigger_an_sqs_message_whenever_new_file_is_downloaded(self, new_files: List[str]):
93+
"""
94+
This function will trigger an SQS message whenever a new file is downloaded
95+
:param new_files: List of new files downloaded
96+
:return: None
97+
"""
98+
sqs_client = boto3.client("sqs")
99+
queue_url = get_environmental_variable_value("DELAYED_SQS_QUEUE_URL")
100+
message_body = {
101+
"message": "New files downloaded from Cricsheet",
102+
"new_files": new_files,
103+
}
104+
response = sqs_client.send_message(
105+
QueueUrl=queue_url,
106+
MessageBody=str(message_body)
107+
)
108+
logger.info(f"Message sent to SQS: {response['MessageId']}")
109+
110+
def _upload_new_json_files_to_s3(self, new_files: List):
91111
for file in new_files[:self._threshold_for_number_of_files_to_be_sent_for_processing]:
92112
file_path = f"{self._extraction_directory}/{file}"
93113
key = f"{self._s3_folder_to_store_cricsheet_data}/{self._s3_folder_to_store_processed_json_files_zip}/{file}"

src/mens_t20i_data_collector/_lambdas/upload_dataset_to_kaggle/upload_dataset_to_kaggle_lambda.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ def upload_dataset_to_kaggle(self):
3838
Uploads the dataset to Kaggle.
3939
"""
4040
os.makedirs(self._folder_to_keep_the_files_to_upload, exist_ok=True)
41-
self._create_metadata_json_file()
4241
self._download_dataset_files_from_s3()
4342
self._authenticate_to_kaggle_and_upload_dataset()
4443

@@ -58,10 +57,11 @@ def _authenticate_to_kaggle_and_upload_dataset(self):
5857
team_1 = last_match_details["team_1"]
5958
team_2 = last_match_details["team_2"]
6059
date = last_match_details["date"]
60+
self._create_metadata_json_file(date)
6161
api.dataset_create_version(
6262
delete_old_versions=True,
6363
folder=self._folder_to_keep_the_files_to_upload,
64-
version_notes=f"Dataset updated till the match between {team_1} and {team_2} on {date}",
64+
version_notes=f"Updated till the match between {team_1} and {team_2} on {date}",
6565
)
6666
logger.info("Dataset uploaded to Kaggle successfully")
6767
except Exception as e:
@@ -83,13 +83,14 @@ def _create_kaggle_json_file(self):
8383
os.environ["KAGGLE_CONFIG_DIR"] = self._temporary_directory
8484
logger.info(f"kaggle.json file created at the temporary path {kaggle_json_file_path}")
8585

86-
def _create_metadata_json_file(self):
86+
def _create_metadata_json_file(self, date):
8787
"""
8888
Creates a metadata.json file with the dataset metadata for Kaggle API.
8989
"""
9090
logger.info("Creating metadata.json file...")
9191
metadata = {
9292
"id": f"{self._kaggle_username}/{get_environmental_variable_value('KAGGLE_DATASET_SLUG')}",
93+
"subtitle": f"Complete T20I data updated till {date} for ML & match analysis"
9394
}
9495
metadata_file_path = os.path.join(self._folder_to_keep_the_files_to_upload, "dataset-metadata.json")
9596
with open(metadata_file_path, "w", encoding="utf-8") as metadata_file:

0 commit comments

Comments
 (0)