Skip to content

Commit 2757f40

Browse files
committed
refactor stage 1, pulling notifier and utils out
1 parent 70027f6 commit 2757f40

File tree

2 files changed

+886
-0
lines changed

2 files changed

+886
-0
lines changed

outage_notifier.py

Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
import pandas as pd
2+
from datetime import datetime
3+
import argparse
4+
import json
5+
import math
6+
import requests
7+
import os
8+
import outage_utils
9+
import glob
10+
11+
def send_telegram_message(token, chat_id, message, thread_id=None):
12+
"""Sends a message to a specified Telegram chat."""
13+
url = f"https://api.telegram.org/bot{token}/sendMessage"
14+
data = {
15+
"chat_id": chat_id,
16+
"text": message,
17+
"link_preview_options": {
18+
"is_disabled": True
19+
},
20+
"parse_mode": "Markdown"
21+
}
22+
23+
if thread_id:
24+
data["message_thread_id"] = thread_id
25+
try:
26+
response = requests.post(url, json=data)
27+
response.raise_for_status() # Raise an exception for HTTP errors
28+
print("Telegram message sent successfully!")
29+
return True
30+
except requests.exceptions.RequestException as e:
31+
print(f"Error sending Telegram message: {e}")
32+
return False
33+
34+
def send_notification(notification_data, file_input, thresholds, bot_token=None, chat_id=None, thread_id=None, geocode_api_key=None, notification_output_dir="."):
35+
"""
36+
Send Telegram notification about outages that meet the threshold criteria.
37+
notification_data contains: new_outages, resolved_outages, new_customers, resolved_customers
38+
"""
39+
try:
40+
new_outages = notification_data['new_outages']
41+
resolved_outages = notification_data['resolved_outages']
42+
messages_to_send = []
43+
44+
# Create individual notification for each new outage
45+
if not new_outages.empty:
46+
for _, outage in new_outages.iterrows():
47+
48+
elapsed_hours = outage['elapsed_time_minutes'] / 60
49+
50+
new_message = f"🚨 NEW OUTAGE ALERT 🚨\n\n"
51+
new_message += f"Utility: {outage['utility'].upper()}\n"
52+
new_message += f"ID: {outage['outage_id']}\n"
53+
new_message += f"Customers: {outage['customers_impacted']:,.0f} \n"
54+
new_message += f"Current Duration: {elapsed_hours:.1f}h \n"
55+
56+
# Add estimated duration if available
57+
if pd.notna(outage['expected_length_minutes']) and outage['expected_length_minutes'] is not None:
58+
expected_hours = outage['expected_length_minutes'] / 60
59+
new_message += f"Expected Duration: {expected_hours:.1f}h \n"
60+
61+
new_message += f"Status: {outage['status']}\n"
62+
new_message += f"Cause: {outage['cause']}\n"
63+
64+
# Add location if available
65+
if pd.notna(outage['center_lat']) and pd.notna(outage['center_lon']):
66+
location_info = reverse_geocode(outage['center_lat'], outage['center_lon'], geocode_api_key)
67+
new_message += f"Location: {location_info}\n"
68+
69+
messages_to_send.append(('new', new_message, outage['outage_id']))
70+
71+
# Create individual notification for each resolved outage
72+
if not resolved_outages.empty:
73+
for _, outage in resolved_outages.iterrows():
74+
resolved_message = f"😌 RESOLVED OUTAGE ALERT 😌\n\n"
75+
resolved_message += f"Utility: {outage['utility'].upper()}\n"
76+
resolved_message += f"ID: {outage['outage_id']}\n"
77+
resolved_message += f"Customers: {outage['customers_impacted']:,.0f}\n"
78+
79+
# Add actual duration if available
80+
if pd.notna(outage['elapsed_time_minutes']) and outage['elapsed_time_minutes'] is not None:
81+
actual_hours = outage['elapsed_time_minutes'] / 60
82+
resolved_message += f"Actual Duration: {actual_hours:.1f}h\n"
83+
84+
# Add location if available
85+
if pd.notna(outage['center_lat']) and pd.notna(outage['center_lon']):
86+
location_info = reverse_geocode(outage['center_lat'], outage['center_lon'], geocode_api_key)
87+
resolved_message += f"Location: {location_info}\n"
88+
89+
messages_to_send.append(('resolved', resolved_message, outage['outage_id']))
90+
91+
# Send each message separately
92+
for msg_type, message, outage_id in messages_to_send:
93+
# Send Telegram notification if credentials provided
94+
if bot_token and chat_id:
95+
success = send_telegram_message(bot_token, chat_id, message, thread_id)
96+
if success:
97+
print(f"Telegram {msg_type} outage notification sent for {outage_id} to chat {chat_id}" + (f" (thread {thread_id})" if thread_id else ""))
98+
else:
99+
print(f"Failed to send Telegram {msg_type} outage notification for {outage_id}")
100+
else:
101+
print(f"Telegram credentials not provided, skipping {msg_type} notification for {outage_id}")
102+
103+
# Save notification to timestamped file with outage ID
104+
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
105+
notification_filename = f"notification_{msg_type}_{outage_id}_{timestamp}.txt"
106+
notification_path = os.path.join(notification_output_dir, notification_filename)
107+
with open(notification_path, 'w', encoding="utf-8") as f:
108+
f.write(message)
109+
print(f"{msg_type.capitalize()} outage notification for {outage_id} saved to: {notification_path}")
110+
111+
if not messages_to_send:
112+
print("No outages meeting criteria - no notification sent.")
113+
114+
except Exception as e:
115+
print(f"Error sending notification: {e}")
116+
117+
def main():
118+
parser = argparse.ArgumentParser(description="Send notifications about outages based on a set of file updates.")
119+
120+
parser.add_argument('-d', '--directory', type=str, default='.', help='Directory containing files that match the pattern')
121+
parser.add_argument('-r', '--remaining_expected_length_threshold', type=float, default=0.0,
122+
help='Minimum remaining expected length threshold in hours (default: 0.0)')
123+
parser.add_argument('-c', '--customer_threshold', type=int, default=0,
124+
help='Customers affected threshold (default: 0)')
125+
parser.add_argument('-e', '--elapsed_time_threshold', type=float, default=0.0,
126+
help='Minimum elapsed time threshold in hours (default: 0.0)')
127+
parser.add_argument('-u', '--utility', type=str, required=True,
128+
help='utility, for now only used in output file naming')
129+
parser.add_argument('--telegram-token', type=str,
130+
help='Telegram bot token for notifications')
131+
parser.add_argument('--telegram-chat-id', type=str,
132+
help='Telegram chat ID for notifications')
133+
parser.add_argument('--telegram-thread-id', type=str,
134+
help='Telegram thread ID for notifications (optional)')
135+
parser.add_argument('--geocode-api-key', type=str,
136+
help='API key for geocoding service (optional)')
137+
parser.add_argument('--notification-output-dir', type=str, default=".",
138+
help='Directory to save notification files for testing (default: current directory)')
139+
args = parser.parse_args()
140+
141+
print(f"====== analyze_current_outages.py starting for utility {args.utility} =======")
142+
143+
144+
# Convert thresholds from hours to minutes
145+
expected_length_threshold_minutes = args.remaining_expected_length_threshold * 60
146+
elapsed_time_threshold_minutes = args.elapsed_time_threshold * 60
147+
148+
print(f"Expected length threshold: {args.remaining_expected_length_threshold} hours ({expected_length_threshold_minutes} minutes)")
149+
print(f"Customer threshold: {args.customer_threshold}")
150+
print(f"Elapsed time threshold: {args.elapsed_time_threshold} hours ({elapsed_time_threshold_minutes} minutes)")
151+
152+
153+
filename_suffix = outage_utils.get_filename_suffix_for_utility(args.utility)
154+
file_pattern = os.path.join(args.directory, "*"+filename_suffix)
155+
print(f"file pattern {file_pattern}")
156+
all_files = sorted(glob.glob(file_pattern), reverse=False)
157+
158+
is_first_file = True
159+
previous_file_df = pd.DataFrame()
160+
current_file_df = pd.DataFrame()
161+
for file_index, file in enumerate(all_files):
162+
# Extract date and time from filename
163+
print(f"====== starting processing for file =======")
164+
print(f"file: {file}")
165+
basename = os.path.basename(file)
166+
date_time_part = basename.split(filename_suffix)[0]
167+
# TODO: for some reason, the first file created by expand.py has a slightly different format. Figure out why and address there?
168+
# filenames are in GMT, stick with that throughout
169+
gmt_file_datetime = datetime.strptime(date_time_part, "%Y-%m-%dT%H%M%S%f")
170+
171+
previous_file_df = current_file_df
172+
current_file_rows = []
173+
174+
with open(file, "r", encoding="utf-8") as f:
175+
if(args.utility == "pse"):
176+
outage_utils.parse_pse_file(f, current_file_rows, gmt_file_datetime, False)
177+
elif (args.utility == "scl"):
178+
outage_utils.parse_scl_file(f, current_file_rows, gmt_file_datetime, False)
179+
elif (args.utility == "snopud"):
180+
outage_utils.parse_snopud_file(f, current_file_rows, gmt_file_datetime, False)
181+
elif (args.utility == "pge"):
182+
outage_utils.parse_pge_file(f, current_file_rows, gmt_file_datetime, False)
183+
else:
184+
print("no utility specified, will not parse")
185+
186+
current_file_df = pd.DataFrame(current_file_rows)
187+
188+
# add some calculated columns to the current file df
189+
if not current_file_df.empty:
190+
# Calculate expected length in minutes for latest outages
191+
current_file_df['expected_length_minutes'] = current_file_df.apply(
192+
lambda row: outage_utils.calculate_expected_length_minutes(gmt_file_datetime, row['est_restoration_time']),
193+
axis=1
194+
)
195+
196+
# Calculate active duration in minutes for latest outages
197+
current_file_df['elapsed_time_minutes'] = current_file_df.apply(
198+
lambda row: outage_utils.calculate_active_duration_minutes(row['start_time'], gmt_file_datetime),
199+
axis=1
200+
)
201+
202+
if is_first_file:
203+
is_first_file = False
204+
previous_file_df = current_file_df
205+
continue
206+
207+
208+
# compare the previous and current file rowsets to find notification-worthy changes
209+
# we need to look at:
210+
# outages in current that weren't in previous: we want to send a notification for these if they meet our thresholds
211+
# outages in previous that aren't in current: we want to send a notification for these if they meet our thresholds
212+
# outages in both: we want to send a notification for these if they didn't meet our thresholds in previous
213+
# but meet our thresholds in current
214+
215+
new_outages = current_file_df[~current_file_df['outage_id'].isin(previous_file_df['outage_id'])]
216+
resolved_outages = previous_file_df[~previous_file_df['outage_id'].isin(current_file_df['outage_id'])]
217+
active_outages = current_file_df[current_file_df['outage_id'].isin(previous_file_df['outage_id'])]
218+
219+
220+
notifiable_new_outages = new_outages[
221+
(new_outages['expected_length_minutes'] > expected_length_threshold_minutes) &
222+
(new_outages['customers_impacted'] > args.customer_threshold) &
223+
(new_outages['elapsed_time_minutes'] > elapsed_time_threshold_minutes)
224+
]
225+
226+
227+
notifiable_resolved_outages = resolved_outages[
228+
(resolved_outages['customers_impacted'] > args.customer_threshold) &
229+
(resolved_outages['elapsed_time_minutes'] > elapsed_time_threshold_minutes)
230+
]
231+
232+
# Check active outages (outages that exist in both previous and current files)
233+
notifiable_active_outages = pd.DataFrame()
234+
if not active_outages.empty:
235+
# Get the corresponding previous data for active outages
236+
active_previous = previous_file_df[previous_file_df['outage_id'].isin(active_outages['outage_id'])]
237+
238+
# Merge current and previous data for active outages
239+
active_merged = active_outages.merge(
240+
active_previous[['outage_id', 'expected_length_minutes', 'elapsed_time_minutes', 'customers_impacted']],
241+
on='outage_id',
242+
suffixes=('_current', '_previous')
243+
)
244+
245+
# Check which active outages meet thresholds now but didn't before
246+
notifiable_active_outages = active_merged[
247+
# Current data meets all thresholds
248+
(active_merged['expected_length_minutes_current'] > expected_length_threshold_minutes) &
249+
(active_merged['customers_impacted_current'] > args.customer_threshold) &
250+
(active_merged['elapsed_time_minutes_current'] > elapsed_time_threshold_minutes) &
251+
# Previous data missed at least one threshold
252+
((pd.isna(active_merged['expected_length_minutes_previous']) | (active_merged['expected_length_minutes_previous'] <= expected_length_threshold_minutes)) |
253+
(pd.isna(active_merged['customers_impacted_previous']) | (active_merged['customers_impacted_previous'] <= args.customer_threshold)) |
254+
(pd.isna(active_merged['elapsed_time_minutes_previous']) | (active_merged['elapsed_time_minutes_previous'] <= elapsed_time_threshold_minutes)))
255+
]
256+
257+
# Rename columns to match expected format
258+
if not notifiable_active_outages.empty:
259+
notifiable_active_outages = notifiable_active_outages.rename(columns={
260+
'customers_impacted_current': 'customers_impacted',
261+
'expected_length_minutes_current': 'expected_length_minutes',
262+
'elapsed_time_minutes_current': 'elapsed_time_minutes'
263+
})
264+
265+
print(f"notifiable_new_outages: {notifiable_new_outages}")
266+
print(f"notifiable_resolved_outages: {notifiable_resolved_outages}")
267+
print(f"notifiable_active_outages: {notifiable_active_outages}")
268+
269+
print(f"====== completed processing for file =======")
270+
271+
# # Send notification if there are new outages or resolved outages
272+
# if len(new_outages) > 0 or len(resolved_outages) > 0:
273+
# thresholds = {
274+
# 'length': args.length_threshold,
275+
# 'customers': args.customer_threshold,
276+
# 'elapsed': args.elapsed_time_threshold
277+
# }
278+
# # Pass both new and resolved outages to notification logic
279+
# notification_data = {
280+
# 'new_outages': new_outages,
281+
# 'resolved_outages': resolved_outages
282+
# }
283+
# send_notification(notification_data, args.file_input, thresholds, args.telegram_token, args.telegram_chat_id, args.telegram_thread_id, args.geocode_api_key, args.notification_output_dir)
284+
285+
print(f"====== outage_notifier.py completed =======")
286+
287+
if __name__ == "__main__":
288+
main()
289+
290+

0 commit comments

Comments
 (0)