Skip to content

Commit f8021a7

Browse files
Integrating telegram bot alert for Lambda execution Status (#77)
* telegram bot alert has been integrated * code has been corrected
1 parent 4f5521e commit f8021a7

File tree

7 files changed

+108
-30
lines changed

7 files changed

+108
-30
lines changed

aws/app.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
#!/usr/bin/env python3
2-
import boto3
32
import aws_cdk as cdk
4-
from constants import SSM_PARAMETER_PREFIX
5-
from utils import get_parameter_from_ssm
3+
from parameters import (
4+
account_id,
5+
cricsheet_data_downloading_bucket_name,
6+
region,
7+
stack_name,
8+
)
69
from mens_t20i_dataset_stack import MenT20IDatasetStack
710

811

9-
# Fetch values from SSM
10-
ssm = boto3.client('ssm')
11-
account_id = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}account_id")
12-
cricsheet_data_downloading_bucket_name = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}cricsheet_data_downloading_bucket")
13-
region = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}aws_region")
14-
stack_name = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}stack_name")
15-
16-
1712
app = cdk.App()
1813
env = cdk.Environment(account=account_id, region=region)
1914

aws/mens_t20i_dataset_stack.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import boto3
1616
from constructs import Construct
1717
from constants import AWS_SDK_PANDAS_LAYER_ARN, THRESHOLD_FOR_NUMBER_OF_FILES_TO_BE_SENT_FOR_PROCESSING
18+
from parameters import TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID
1819
from utils import get_secret_from_secrets_manager
1920

2021

@@ -90,6 +91,8 @@ def __init__(
9091
"DOWNLOAD_BUCKET_NAME": cricsheet_data_downloading_bucket.bucket_name,
9192
"DYNAMODB_TABLE_NAME": dynamodb_to_store_file_status_data.table_name,
9293
"THRESHOLD_FOR_NUMBER_OF_FILES_TO_BE_SENT_FOR_PROCESSING": THRESHOLD_FOR_NUMBER_OF_FILES_TO_BE_SENT_FOR_PROCESSING,
94+
"TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN,
95+
"TELEGRAM_CHAT_ID": TELEGRAM_CHAT_ID,
9396
},
9497
function_name="cricsheet-data-downloading-lambda",
9598
layers=[
@@ -140,6 +143,8 @@ def __init__(
140143
"DOWNLOAD_BUCKET_NAME": cricsheet_data_downloading_bucket.bucket_name,
141144
**__db_secrets,
142145
"DYNAMODB_TABLE_NAME": dynamodb_to_store_file_status_data.table_name,
146+
"TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN,
147+
"TELEGRAM_CHAT_ID": TELEGRAM_CHAT_ID,
143148
},
144149
function_name="cricsheet-deliverywise-data-extraction-lambda",
145150
layers=[
@@ -201,6 +206,8 @@ def __init__(
201206
"DOWNLOAD_BUCKET_NAME": cricsheet_data_downloading_bucket.bucket_name,
202207
**__db_secrets,
203208
"DYNAMODB_TABLE_NAME": dynamodb_to_store_file_status_data.table_name,
209+
"TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN,
210+
"TELEGRAM_CHAT_ID": TELEGRAM_CHAT_ID,
204211
},
205212
function_name="cricsheet-matchwise-data-extraction-lambda",
206213
layers=[
@@ -261,6 +268,8 @@ def __init__(
261268
environment={
262269
"DOWNLOAD_BUCKET_NAME": cricsheet_data_downloading_bucket.bucket_name,
263270
**__db_secrets,
271+
"TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN,
272+
"TELEGRAM_CHAT_ID": TELEGRAM_CHAT_ID,
264273
},
265274
function_name="convert-mongo-data-to-csv-lambda",
266275
layers=[
@@ -301,6 +310,8 @@ def __init__(
301310
environment={
302311
"DOWNLOAD_BUCKET_NAME": cricsheet_data_downloading_bucket.bucket_name,
303312
**__kaggle_secrets,
313+
"TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN,
314+
"TELEGRAM_CHAT_ID": TELEGRAM_CHAT_ID,
304315
},
305316
function_name="upload-dataset-to-kaggle-lambda",
306317
layers=[

aws/parameters.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import boto3
2+
from constants import SSM_PARAMETER_PREFIX
3+
from utils import get_parameter_from_ssm
4+
5+
6+
# Fetch values from SSM
7+
ssm = boto3.client('ssm')
8+
account_id = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}account_id")
9+
cricsheet_data_downloading_bucket_name = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}cricsheet_data_downloading_bucket")
10+
region = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}aws_region")
11+
stack_name = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}stack_name")
12+
TELEGRAM_BOT_TOKEN = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}TELEGRAM_BOT_TOKEN")
13+
TELEGRAM_CHAT_ID = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}TELEGRAM_CHAT_ID")

src/mens_t20i_data_collector/_lambdas/constants.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,17 @@
3939
"fielder_name"
4040
]
4141
MATCHWISE_DATA_CSV_FILE_NAME: str = "matchwise_data.csv"
42+
TELEGRAM_MESSAGE_TEMPLATE: str = """
43+
<b>🏏 T20I Data Extraction Pipeline Status - {}</b>
44+
45+
46+
<b> Timestamp :</b> {}
47+
48+
<b> Lambda Name :</b> {}
49+
50+
<b> Execution Status :</b> {}
51+
52+
<b> Message :</b> {}
53+
54+
-- Automated Message from T20I Data Extraction Pipeline --
55+
"""

src/mens_t20i_data_collector/_lambdas/convert_mongodb_data_to_csv/convert_mongo_db_data_to_csv_lambda.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,4 @@ def handler(_, __): # noqa: Vulture
8282
"""
8383
dataset_preparation_handler = DatasetPreparationHandler()
8484
dataset_preparation_handler.prepare_dataset()
85-
return "Datasets prepared and uploaded successfully."
85+
return "Datasets prepared and uploaded to S3 successfully."

src/mens_t20i_data_collector/_lambdas/download_from_cricsheet/download_from_cricsheet_lambda_function.py

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
CRICSHEET_DATA_S3_FOLDER_TO_STORE_PROCESSED_JSON_FILES_ZIP
1111
)
1212
from mens_t20i_data_collector._lambdas.utils import (
13+
exception_handler,
1314
get_environmental_variable_value
1415
)
1516

@@ -72,6 +73,9 @@ def upload_new_json_data_files_for_data_processing(self, downloaded_zip_file_pat
7273
self._upload_new_json_files_to_s3(new_files=new_files)
7374
if new_files:
7475
self._trigger_an_sqs_message_whenever_new_file_is_downloaded(new_files=new_files)
76+
return "Data file has been downloaded and placed successfully for processing"
77+
logger.info("No new files to process")
78+
return "No new files to process"
7579

7680
def _list_all_files_from_dynamo_db(self) -> Set:
7781
response = self._dynamo_db_to_store_file_data_extraction_status.scan(ProjectionExpression="file_name")
@@ -115,21 +119,10 @@ def _upload_new_json_files_to_s3(self, new_files: List):
115119
logger.info(f"File {file} uploaded to {key}")
116120

117121

118-
def handler(_, __): # noqa: Vulture
119-
try:
120-
downloader = DownloadDataFromCricsheetHandler()
121-
zip_file_path = downloader.download_data_from_cricsheet()
122-
downloader.upload_new_json_data_files_for_data_processing(zip_file_path)
123-
124-
logging.shutdown()
125-
return {
126-
"statusCode": 200,
127-
"body": "Data downloaded and placed successfully for processing"
128-
}
129-
except Exception as e: # pylint: disable=broad-exception-caught
130-
logger.error(f"Handler execution failed: {e}")
131-
logging.shutdown()
132-
return {
133-
"statusCode": 500,
134-
"body": f"Internal Server Error: {str(e)}"
135-
}
122+
@exception_handler # noqa: Vulture
123+
def handler(_, __):
124+
downloader = DownloadDataFromCricsheetHandler()
125+
zip_file_path = downloader.download_data_from_cricsheet()
126+
output = downloader.upload_new_json_data_files_for_data_processing(zip_file_path)
127+
logging.shutdown()
128+
return output

src/mens_t20i_data_collector/_lambdas/utils.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1+
import datetime
12
import functools
23
import logging
34
import os
45
from typing import Any
6+
import requests
57
from botocore.exceptions import ClientError
8+
from mens_t20i_data_collector._lambdas.constants import (
9+
TELEGRAM_MESSAGE_TEMPLATE
10+
)
611

712
# Set up logging
813
logger = logging.getLogger()
@@ -15,14 +20,42 @@ def exception_handler(function):
1520
"""
1621
@functools.wraps(function)
1722
def wrapper(*args, **kwargs):
23+
current_time = datetime.datetime.now()
24+
function_name = "unknown_function"
25+
for arg in args:
26+
if hasattr(arg, "function_name"):
27+
function_name = arg.function_name
28+
break
1829
try:
1930
response_body = function(*args, **kwargs)
31+
send_alert_via_telegram_bot(
32+
chat_id=get_environmental_variable_value("TELEGRAM_CHAT_ID"),
33+
message=TELEGRAM_MESSAGE_TEMPLATE.format(
34+
current_time.strftime("%d-%m-%Y"),
35+
current_time.strftime("%H:%M:%S"),
36+
function_name,
37+
"SUCCESS ✅",
38+
response_body
39+
),
40+
telegram_bot_token=get_environmental_variable_value("TELEGRAM_BOT_TOKEN")
41+
)
2042
return {
2143
"statusCode": 200,
2244
"body": response_body
2345
}
2446
except Exception as e: # pylint: disable=broad-exception-caught
2547
logger.error(f"Error occurred: {str(e)}", exc_info=True)
48+
send_alert_via_telegram_bot(
49+
chat_id=get_environmental_variable_value("TELEGRAM_CHAT_ID"),
50+
message=TELEGRAM_MESSAGE_TEMPLATE.format(
51+
current_time.strftime("%d-%m-%Y"),
52+
current_time.strftime("%H:%M:%S"),
53+
function_name,
54+
"ERROR ❌",
55+
str(e)
56+
),
57+
telegram_bot_token=get_environmental_variable_value("TELEGRAM_BOT_TOKEN")
58+
)
2659
return {
2760
"statusCode": 500,
2861
"body": f"Internal Server Error: {str(e)}"
@@ -71,3 +104,22 @@ def wrapper(event, _):
71104
return function(json_file_key, match_id)
72105

73106
return wrapper
107+
108+
109+
def send_alert_via_telegram_bot(chat_id: str, message: str, telegram_bot_token: str, ) -> None:
110+
"""
111+
Sends the statsu of the function execution through an alert to a Telegram chat.
112+
113+
:param telegram_bot_token: Telegram bot token
114+
:param chat_id: Chat ID
115+
:param message: Message to send in HTML format
116+
"""
117+
url = f"https://api.telegram.org/bot{telegram_bot_token}/sendMessage"
118+
payload = {
119+
"chat_id": chat_id,
120+
"text": message,
121+
"parse_mode": "HTML"
122+
}
123+
response = requests.post(url, json=payload, timeout=10)
124+
if response.status_code != 200:
125+
print(f"Failed to send message: {response.text}")

0 commit comments

Comments
 (0)