2
2
import logging
3
3
from collections import namedtuple
4
4
from typing import Any , Dict , List , Union
5
+ from zoneinfo import ZoneInfo
5
6
6
7
import hjson
7
8
import pandas as pd
8
- import pytz
9
9
import pangres
10
10
11
11
from django .conf import settings
12
12
from django .db import connections as conns , models
13
13
from django .db .models import QuerySet
14
14
from django_cron import CronJobBase , Schedule
15
15
from google .cloud import bigquery
16
- from sqlalchemy import types
16
+ from sqlalchemy import types , text
17
17
from sqlalchemy .engine import ResultProxy
18
+ from sqlalchemy .orm import sessionmaker
18
19
19
- from dashboard .common import db_util , utils
20
+ from dashboard .common import db_util
20
21
from dashboard .models import Course , Resource , AcademicTerms , User
21
22
22
23
@@ -67,17 +68,17 @@ def util_function(sql_string, mysql_table, param_object=None, table_identifier=N
67
68
68
69
69
70
# execute database query
70
- def execute_db_query (query : str , params : List = None ) -> ResultProxy :
71
- with engine .connect () as connection :
71
+ def execute_db_query (query : str , params : Dict = None ) -> ResultProxy :
72
+ with engine .begin () as connection :
72
73
connection .detach ()
73
74
if params :
74
- return connection .execute (query , params )
75
+ return connection .execute (text ( query ) , params )
75
76
else :
76
- return connection .execute (query )
77
+ return connection .execute (text ( query ) )
77
78
78
79
79
80
# remove all records inside the specified table
80
- def delete_all_records_in_table (table_name : str , where_clause : str = "" , where_params : List = None ):
81
+ def delete_all_records_in_table (table_name : str , where_clause : str = "" , where_params : Dict = None ):
81
82
# delete all records in the table first, can have an optional where clause
82
83
result_proxy = execute_db_query (f"delete from { table_name } { where_clause } " , where_params )
83
84
return (f"\n { result_proxy .rowcount } rows deleted from { table_name } \n " )
@@ -99,7 +100,7 @@ def soft_update_datetime_field(
99
100
f'Skipped update of { field_name } for { model_name } instance ({ model_inst .id } ); existing value was found' )
100
101
else :
101
102
if warehouse_field_value :
102
- warehouse_field_value = warehouse_field_value .replace (tzinfo = pytz . UTC )
103
+ warehouse_field_value = warehouse_field_value .replace (tzinfo = ZoneInfo ( ' UTC' ) )
103
104
setattr (model_inst , field_name , warehouse_field_value )
104
105
logger .info (f'Updated { field_name } for { model_name } instance ({ model_inst .id } )' )
105
106
return [field_name ]
@@ -124,7 +125,7 @@ def verify_course_ids(self):
124
125
logger .debug ("in checking course" )
125
126
supported_courses = Course .objects .get_supported_courses ()
126
127
course_ids = [str (x ) for x in supported_courses .values_list ('id' , flat = True )]
127
- courses_data = pd .read_sql (queries ['course' ], data_warehouse_engine , params = {'course_ids' : tuple ( course_ids ) })
128
+ courses_data = pd .read_sql (queries ['course' ], data_warehouse_engine , params = {'course_ids' : course_ids })
128
129
# error out when course id is invalid, otherwise add DataFrame to list
129
130
for course_id , data_last_updated in supported_courses :
130
131
if course_id not in list (courses_data ['id' ]):
@@ -151,7 +152,7 @@ def update_user(self):
151
152
# cron status
152
153
status = ""
153
154
154
- logger .debug ("in update with data warehouse user" )
155
+ logger .info ("in update with data warehouse user" )
155
156
156
157
# delete all records in the table first
157
158
status += delete_all_records_in_table ("user" )
@@ -160,7 +161,7 @@ def update_user(self):
160
161
status += util_function (
161
162
queries ['user' ],
162
163
'user' ,
163
- {'course_ids' : tuple ( self .valid_locked_course_ids ) ,
164
+ {'course_ids' : self .valid_locked_course_ids ,
164
165
'canvas_data_id_increment' : settings .CANVAS_DATA_ID_INCREMENT
165
166
})
166
167
@@ -193,13 +194,13 @@ def update_canvas_resource(self):
193
194
# cron status
194
195
status = ""
195
196
196
- logger .debug ("in update canvas resource" )
197
+ logger .info ("in update canvas resource" )
197
198
198
199
# Select all the files for these courses
199
200
# convert int array to str array
200
201
df_attach = pd .read_sql (queries ['resource' ],
201
202
data_warehouse_engine ,
202
- params = {'course_ids' : tuple ( self .valid_locked_course_ids ) })
203
+ params = {'course_ids' : self .valid_locked_course_ids })
203
204
logger .debug (df_attach )
204
205
# Update these back again based on the dataframe
205
206
# Remove any rows where file_state is not available!
@@ -217,6 +218,8 @@ def update_resource_access(self):
217
218
# cron status
218
219
status = ""
219
220
221
+ logger .info ("in update resource access" )
222
+
220
223
# return string with concatenated SQL insert result
221
224
return_string = ""
222
225
@@ -231,7 +234,7 @@ def update_resource_access(self):
231
234
232
235
logger .info (f"Deleting all records in resource_access after { data_last_updated } " )
233
236
234
- status += delete_all_records_in_table ("resource_access" , f"WHERE access_time > %s " , [ data_last_updated , ] )
237
+ status += delete_all_records_in_table ("resource_access" , f"WHERE access_time > :data_last_updated " , { ' data_last_updated' : data_last_updated } )
235
238
236
239
# loop through multiple course ids, 20 at a time
237
240
# (This is set by the CRON_BQ_IN_LIMIT from settings)
@@ -393,7 +396,7 @@ def update_resource_access(self):
393
396
student_enrollment_type = User .EnrollmentType .STUDENT
394
397
student_enrollment_df = pd .read_sql (
395
398
'select user_id, course_id from user where enrollment_type= %s' ,
396
- engine , params = { student_enrollment_type } )
399
+ engine , params = [( str ( student_enrollment_type ),)] )
397
400
resource_access_df = pd .merge (
398
401
resource_access_df , student_enrollment_df ,
399
402
on = ['user_id' , 'course_id' ],
@@ -437,6 +440,8 @@ def update_groups(self):
437
440
# cron status
438
441
status = ""
439
442
443
+ logger .info ("update_groups(): " )
444
+
440
445
# delete all records in assignment_group table
441
446
status += delete_all_records_in_table ("assignment_groups" )
442
447
@@ -447,7 +452,7 @@ def update_groups(self):
447
452
# loop through multiple course ids
448
453
status += util_function (queries ['assignment_groups' ],
449
454
'assignment_groups' ,
450
- {'course_ids' : tuple ( self .valid_locked_course_ids ) })
455
+ {'course_ids' : self .valid_locked_course_ids })
451
456
452
457
return status
453
458
@@ -463,7 +468,7 @@ def update_assignment(self):
463
468
# loop through multiple course ids
464
469
status += util_function (queries ['assignment' ],
465
470
'assignment' ,
466
- {'course_ids' : tuple ( self .valid_locked_course_ids ) ,
471
+ {'course_ids' : self .valid_locked_course_ids ,
467
472
'time_zone' : settings .TIME_ZONE })
468
473
469
474
return status
@@ -480,14 +485,30 @@ def submission(self):
480
485
481
486
# loop through multiple course ids
482
487
# filter out not released grades (submission_dim.posted_at date is not null) and partial grades (submission_dim.workflow_state != 'graded')
483
- status += util_function (queries ['submission' ],
484
- 'submission' ,
485
- {
486
- 'course_ids' : tuple (self .valid_locked_course_ids ),
487
- 'canvas_data_id_increment' : settings .CANVAS_DATA_ID_INCREMENT ,
488
- 'time_zone' : settings .TIME_ZONE
489
- })
488
+ query_params = {
489
+ 'course_ids' : self .valid_locked_course_ids ,
490
+ 'time_zone' : settings .TIME_ZONE ,
491
+ 'canvas_data_id_increment' : settings .CANVAS_DATA_ID_INCREMENT ,
492
+ }
493
+ Session = sessionmaker (bind = data_warehouse_engine )
494
+ try :
495
+ # Create a session
496
+ with Session () as session :
497
+ # Execute the first query to create the temporary table
498
+ session .execute (text (queries ['submission' ]).bindparams (** query_params ))
499
+
500
+ # Execute the second query using the temporary table
501
+ result = session .execute (text (queries ['submission_with_avg_score' ]))
502
+ df = pd .DataFrame (result .fetchall (), columns = result .keys ())
503
+ df = df .drop_duplicates (keep = 'first' )
504
+ df .to_sql (con = engine , name = 'submission' , if_exists = 'append' , index = False )
505
+
506
+ except Exception as e :
507
+ logger .exception ('Error running sql on table submission' , str (e ))
508
+ raise
509
+ status += f"{ str (df .shape [0 ])} submission: { query_params } \n "
490
510
511
+ # returns the row size of dataframe
491
512
return status
492
513
493
514
def weight_consideration (self ):
@@ -503,7 +524,7 @@ def weight_consideration(self):
503
524
# loop through multiple course ids
504
525
status += util_function (queries ['assignment_weight' ],
505
526
'assignment_weight_consideration' ,
506
- {'course_ids' : tuple ( self .valid_locked_course_ids ) },
527
+ {'course_ids' : self .valid_locked_course_ids },
507
528
'weight' )
508
529
509
530
logger .debug (status + "\n \n " )
@@ -543,7 +564,7 @@ def update_course(self, warehouse_courses_data: pd.DataFrame) -> str:
543
564
Updates course records with data returned from verify_course_ids, only making changes when necessary.
544
565
"""
545
566
status : str = ''
546
- logger .debug ('update_course()' )
567
+ logger .info ('update_course()' )
547
568
548
569
logger .debug (warehouse_courses_data .to_json (orient = 'records' ))
549
570
courses : QuerySet = Course .objects .filter (id__in = self .valid_locked_course_ids )
@@ -588,7 +609,7 @@ def do(self) -> str:
588
609
589
610
status = ""
590
611
591
- run_start = datetime .now (pytz . UTC )
612
+ run_start = datetime .now (ZoneInfo ( ' UTC' ) )
592
613
status += f"Start cron: { str (run_start )} UTC\n "
593
614
course_verification = self .verify_course_ids ()
594
615
invalid_course_id_list = course_verification .invalid_course_ids
0 commit comments