From 1641f413cb54648b2bb7eb7107741e88f4d853a8 Mon Sep 17 00:00:00 2001 From: Adam Stankiewicz Date: Mon, 9 Jun 2025 08:34:58 -0400 Subject: [PATCH 1/9] chore: move secured algolia api key request to BaseLearnerPortalHandler --- enterprise_access/apps/bffs/api.py | 10 +-- enterprise_access/apps/bffs/context.py | 93 ++++++------------------ enterprise_access/apps/bffs/handlers.py | 13 ++-- enterprise_access/apps/bffs/mixins.py | 95 ++++++++++++++++++++++++- 4 files changed, 129 insertions(+), 82 deletions(-) diff --git a/enterprise_access/apps/bffs/api.py b/enterprise_access/apps/bffs/api.py index fba68de45..f1fc64451 100644 --- a/enterprise_access/apps/bffs/api.py +++ b/enterprise_access/apps/bffs/api.py @@ -54,11 +54,11 @@ def get_and_cache_enterprise_customer_users(request, **kwargs): username = request.user.username cache_key = enterprise_customer_users_cache_key(username) cached_response = request_cache(namespace=REQUEST_CACHE_NAMESPACE).get_cached_response(cache_key) - if cached_response.is_found: - logger.info( - f'enterprise_customer_users cache hit for username {username}' - ) - return cached_response.value + # if cached_response.is_found: + # logger.info( + # f'enterprise_customer_users cache hit for username {username}' + # ) + # return cached_response.value client = LmsUserApiClient(request) response_payload = client.get_enterprise_customers_for_user( diff --git a/enterprise_access/apps/bffs/context.py b/enterprise_access/apps/bffs/context.py index 94c364864..e30aa8d23 100644 --- a/enterprise_access/apps/bffs/context.py +++ b/enterprise_access/apps/bffs/context.py @@ -56,6 +56,8 @@ def __init__(self, request): self._enterprise_customer_slug = None self._lms_user_id = getattr(self.user, 'lms_user_id', None) self._enterprise_features = {} + self._algolia_api_key = None + self._catalog_uuids_to_catalog_query_uuids = {} self.data = {} # Stores processed data for the response # Initialize common context data @@ -208,54 +210,6 @@ def _initialize_common_context_data(self): if not self.enterprise_customer_uuid: self._enterprise_customer_uuid = self.enterprise_customer.get('uuid') - # Initialize the secured algolia api keys metadata derived from enterprise catalog - try: - self._initialize_secured_algolia_api_keys() - except HTTPError as exc: - exception_response = exc.response.json() - exception_response_user_message = exception_response.get('user_message') - exception_response_developer_message = exception_response.get('developer_message') - logger.exception( - 'HTTP Error initializing the secured algolia api keys for request user %s, ' - 'enterprise customer uuid %s', - self.lms_user_id, - enterprise_customer_uuid, - ) - self.add_error( - user_message=exception_response_user_message or 'HTTP Error initializing the secured algolia api keys', - developer_message=exception_response_developer_message or - f'Could not initialize the secured algolia api keys. Error: {exc}', - ) - except Exception as exc: # pylint: disable=broad-except - logger.exception( - 'Error initializing the secured algolia api keys for request user %s, ' - 'enterprise customer uuid %s', - self.lms_user_id, - enterprise_customer_uuid, - ) - self.add_error( - user_message='Error initializing the secured algolia api keys', - developer_message=f'Could not initialize the secured algolia api keys. Error: {exc}', - ) - - if not (self.secured_algolia_api_key and self.catalog_uuids_to_catalog_query_uuids): - logger.info( - 'No secured algolia key found for request user %s, enterprise customer uuid %s, ' - 'and/or enterprise slug %s', - self.lms_user_id, - enterprise_customer_uuid, - enterprise_customer_slug, - ) - self.add_error( - user_message='No secured algolia api key or catalog query mapping found', - developer_message=( - f'No secured algolia api key or catalog query mapping found for request ' - f'user {self.lms_user_id} and enterprise uuid ' - f'{enterprise_customer_uuid}, and/or enterprise slug {enterprise_customer_slug}' - ), - ) - return - def _initialize_enterprise_customer_users(self): """ Initializes the enterprise customer users for the request user. @@ -298,33 +252,30 @@ def _initialize_enterprise_customer_users(self): ) }) - def _initialize_secured_algolia_api_keys(self): + def update_algolia_keys(self, api_key, catalog_mapping): """ - Initializes the secured algolia api key for the request user. + Updates the Algolia API keys in the context. + + Args: + api_key: The secured Algolia API key + catalog_mapping: Dictionary mapping catalog UUIDs to query UUIDs """ - secured_algolia_api_key_data = get_and_cache_secured_algolia_search_keys( - self.request, - self._enterprise_customer_uuid, - ) - - secured_algolia_api_key = None - catalog_uuids_to_catalog_query_uuids = {} - try: - secured_algolia_api_key, catalog_uuids_to_catalog_query_uuids = transform_secured_algolia_api_key_response( - secured_algolia_api_key_data - ) - except Exception: # pylint: disable=broad-except - logger.exception( - 'Error transforming secured algolia api key for request user %s,' - 'enterprise customer uuid %s and/or slug %s', - self.lms_user_id, - self.enterprise_customer_uuid, - self.enterprise_customer_slug, - ) + self._algolia_api_key = api_key + self._catalog_uuids_to_catalog_query_uuids = catalog_mapping or {} self.data.update({ - 'secured_algolia_api_key': secured_algolia_api_key, - 'catalog_uuids_to_catalog_query_uuids': catalog_uuids_to_catalog_query_uuids + 'secured_algolia_api_key': api_key, + 'catalog_uuids_to_catalog_query_uuids': catalog_mapping or {} }) + + @property + def secured_algolia_api_key(self): + """Get the secured Algolia API key.""" + return self._algolia_api_key + + @property + def catalog_uuids_to_catalog_query_uuids(self): + """Get the mapping of catalog UUIDs to query UUIDs.""" + return self._catalog_uuids_to_catalog_query_uuids def add_error(self, status_code=None, **kwargs): """ diff --git a/enterprise_access/apps/bffs/handlers.py b/enterprise_access/apps/bffs/handlers.py index 616af35e7..35570e1e7 100644 --- a/enterprise_access/apps/bffs/handlers.py +++ b/enterprise_access/apps/bffs/handlers.py @@ -15,7 +15,11 @@ invalidate_subscription_licenses_cache ) from enterprise_access.apps.bffs.context import HandlerContext -from enterprise_access.apps.bffs.mixins import BaseLearnerDataMixin, LearnerDashboardDataMixin +from enterprise_access.apps.bffs.mixins import ( + AlgoliaDataMixin, + BaseLearnerDataMixin, + LearnerDashboardDataMixin +) from enterprise_access.apps.bffs.serializers import EnterpriseCustomerUserSubsidiesSerializer logger = logging.getLogger(__name__) @@ -64,7 +68,7 @@ def add_warning(self, user_message, developer_message): ) -class BaseLearnerPortalHandler(BaseHandler, BaseLearnerDataMixin): +class BaseLearnerPortalHandler(BaseHandler, AlgoliaDataMixin, BaseLearnerDataMixin): """ A base handler class for learner-focused routes. @@ -87,8 +91,6 @@ def __init__(self, context): def load_and_process(self): """ Loads and processes data. This is a basic implementation that can be overridden by subclasses. - - The method in this class simply calls common learner logic to ensure the context is set up. """ try: # Verify enterprise customer attrs have learner portal enabled @@ -97,6 +99,9 @@ def load_and_process(self): # Transform enterprise customer data self.transform_enterprise_customers() + # Initialize Algolia API keys after customer data is available + self._initialize_secured_algolia_api_keys() + # Retrieve and process subscription licenses. Handles activation and auto-apply logic. self.load_and_process_subsidies() diff --git a/enterprise_access/apps/bffs/mixins.py b/enterprise_access/apps/bffs/mixins.py index 1f114584a..0e30a46a7 100644 --- a/enterprise_access/apps/bffs/mixins.py +++ b/enterprise_access/apps/bffs/mixins.py @@ -3,8 +3,13 @@ """ import logging +from urllib.error import HTTPError -from enterprise_access.apps.bffs.api import get_and_cache_enterprise_course_enrollments +from enterprise_access.apps.bffs.api import ( + get_and_cache_enterprise_course_enrollments, + get_and_cache_secured_algolia_search_keys, + transform_secured_algolia_api_key_response +) from enterprise_access.apps.bffs.constants import COURSE_ENROLLMENT_STATUSES, UNENROLLABLE_COURSE_STATUSES logger = logging.getLogger(__name__) @@ -276,7 +281,93 @@ def _can_unenroll_course_enrollment(self, enrollment): ) -class LearnerDashboardDataMixin(EnterpriseCourseEnrollmentsDataMixin, BaseLearnerDataMixin): +class AlgoliaDataMixin(BFFContextDataMixin): + """ + Mixin to handle Algolia search functionality and API key management. + """ + + def _initialize_secured_algolia_api_keys(self): + """ + Fetches and initializes the secured Algolia API keys for the request user. + Updates the context with the fetched keys. + """ + try: + secured_algolia_api_key_data = get_and_cache_secured_algolia_search_keys( + self.context.request, + self.context.enterprise_customer_uuid, + ) + + secured_algolia_api_key = None + catalog_uuids_to_catalog_query_uuids = {} + + try: + secured_algolia_api_key, catalog_uuids_to_catalog_query_uuids = ( + transform_secured_algolia_api_key_response(secured_algolia_api_key_data) + ) + except Exception: # pylint: disable=broad-except + logger.exception( + 'Error transforming secured algolia api key for request user %s,' + 'enterprise customer uuid %s and/or slug %s', + self.context.lms_user_id, + self.context.enterprise_customer_uuid, + self.context.enterprise_customer_slug, + ) + + # Update context with the fetched data + self.context.update_algolia_keys( + secured_algolia_api_key, + catalog_uuids_to_catalog_query_uuids + ) + + # Log if no Algolia key or catalog mapping was found + if not (secured_algolia_api_key and catalog_uuids_to_catalog_query_uuids): + logger.info( + 'No secured algolia key found for request user %s, enterprise customer uuid %s, ' + 'and/or enterprise slug %s', + self.context.lms_user_id, + self.context.enterprise_customer_uuid, + self.context.enterprise_customer_slug, + ) + self.context.add_error( + user_message='No secured algolia api key or catalog query mapping found', + developer_message=( + f'No secured algolia api key or catalog query mapping found for request ' + f'user {self.context.lms_user_id} and enterprise uuid ' + f'{self.context.enterprise_customer_uuid}' + ), + ) + + except HTTPError as exc: + exception_response = exc.response.json() + exception_response_user_message = exception_response.get('user_message') + exception_response_developer_message = exception_response.get('developer_message') + + logger.exception( + 'HTTP Error initializing the secured algolia api keys for request user %s, ' + 'enterprise customer uuid %s', + self.context.lms_user_id, + self.context.enterprise_customer_uuid, + ) + self.context.add_error( + user_message=exception_response_user_message or 'Error initializing search functionality', + developer_message=exception_response_developer_message or str(exc), + status_code=exc.response.status_code + ) + + except Exception as exc: # pylint: disable=broad-except + logger.exception( + 'Error initializing the secured algolia api keys for request user %s, ' + 'enterprise customer uuid %s', + self.context.lms_user_id, + self.context.enterprise_customer_uuid, + ) + self.context.add_error( + user_message='Error initializing search functionality', + developer_message=f'Could not initialize the secured algolia api keys. Error: {exc}' + ) + + +class LearnerDashboardDataMixin(EnterpriseCourseEnrollmentsDataMixin, AlgoliaDataMixin, BaseLearnerDataMixin): """ Mixin to access learner dashboard data from the context. """ From 6d9be2d6edb2702b923ae04de27c63911972efed Mon Sep 17 00:00:00 2001 From: Adam Stankiewicz Date: Mon, 9 Jun 2025 10:52:43 -0400 Subject: [PATCH 2/9] feat: initial ConcurrentTaskRunner --- enterprise_access/apps/bffs/context.py | 18 +-- enterprise_access/apps/bffs/handlers.py | 166 +++++++++++++++++++++--- enterprise_access/apps/bffs/mixins.py | 12 +- 3 files changed, 164 insertions(+), 32 deletions(-) diff --git a/enterprise_access/apps/bffs/context.py b/enterprise_access/apps/bffs/context.py index e30aa8d23..5df96bfaa 100644 --- a/enterprise_access/apps/bffs/context.py +++ b/enterprise_access/apps/bffs/context.py @@ -61,7 +61,7 @@ def __init__(self, request): self.data = {} # Stores processed data for the response # Initialize common context data - self._initialize_common_context_data() + self.load_common_context_data() @property def request(self): @@ -119,10 +119,6 @@ def all_linked_enterprise_customer_users(self): def should_update_active_enterprise_customer_user(self): return self.data.get('should_update_active_enterprise_customer_user') - @property - def secured_algolia_api_key(self): - return self.data.get('secured_algolia_api_key') - @property def catalog_uuids_to_catalog_query_uuids(self): return self.data.get('catalog_uuids_to_catalog_query_uuids') @@ -149,7 +145,7 @@ def set_status_code(self, status_code): """ self._status_code = status_code - def _initialize_common_context_data(self): + def load_common_context_data(self): """ Initializes common context data, like enterprise customer UUID and user ID. """ @@ -169,7 +165,7 @@ def _initialize_common_context_data(self): # Initialize the enterprise customer users metadata derived from the LMS try: - self._initialize_enterprise_customer_users() + self.load_enterprise_customer_users() except Exception as exc: # pylint: disable=broad-except logger.exception( 'Error initializing enterprise customer users for request user %s, ' @@ -210,7 +206,7 @@ def _initialize_common_context_data(self): if not self.enterprise_customer_uuid: self._enterprise_customer_uuid = self.enterprise_customer.get('uuid') - def _initialize_enterprise_customer_users(self): + def load_enterprise_customer_users(self): """ Initializes the enterprise customer users for the request user. """ @@ -255,7 +251,7 @@ def _initialize_enterprise_customer_users(self): def update_algolia_keys(self, api_key, catalog_mapping): """ Updates the Algolia API keys in the context. - + Args: api_key: The secured Algolia API key catalog_mapping: Dictionary mapping catalog UUIDs to query UUIDs @@ -266,12 +262,12 @@ def update_algolia_keys(self, api_key, catalog_mapping): 'secured_algolia_api_key': api_key, 'catalog_uuids_to_catalog_query_uuids': catalog_mapping or {} }) - + @property def secured_algolia_api_key(self): """Get the secured Algolia API key.""" return self._algolia_api_key - + @property def catalog_uuids_to_catalog_query_uuids(self): """Get the mapping of catalog UUIDs to query UUIDs.""" diff --git a/enterprise_access/apps/bffs/handlers.py b/enterprise_access/apps/bffs/handlers.py index 35570e1e7..7cbbf9a91 100644 --- a/enterprise_access/apps/bffs/handlers.py +++ b/enterprise_access/apps/bffs/handlers.py @@ -3,6 +3,8 @@ """ import json import logging +from concurrent.futures import Future, ThreadPoolExecutor +from typing import Any, Callable, Dict, List, NotRequired, Tuple, TypedDict, Union from enterprise_access.apps.api_client.constants import LicenseStatuses from enterprise_access.apps.api_client.license_manager_client import LicenseManagerUserApiClient @@ -15,16 +17,141 @@ invalidate_subscription_licenses_cache ) from enterprise_access.apps.bffs.context import HandlerContext -from enterprise_access.apps.bffs.mixins import ( - AlgoliaDataMixin, - BaseLearnerDataMixin, - LearnerDashboardDataMixin -) +from enterprise_access.apps.bffs.mixins import AlgoliaDataMixin, BaseLearnerDataMixin, LearnerDashboardDataMixin from enterprise_access.apps.bffs.serializers import EnterpriseCustomerUserSubsidiesSerializer logger = logging.getLogger(__name__) +class Task(TypedDict): + """ + Defines the structure for a task. + - func: The function to execute (required). + - args: A tuple of positional arguments for the func (optional). + - kwargs: A dict of keyword arguments for the func (optional). + """ + + func: Callable[..., Any] + args: NotRequired[Tuple] + kwargs: NotRequired[Dict[str, Any]] + + +class ConcurrentTaskRunner: + """ + A context manager for running I/O-bound tasks concurrently. + + This class creates and manages a ThreadPoolExecutor, ensuring that + resources are properly shut down. It's designed to be used with a + `with` statement for guaranteed cleanup. + + Usage: + with ConcurrentTaskRunner(max_workers=5) as runner: + results = runner.execute(tasks) + """ + + DEFAULT_MAX_WORKERS = 5 + DEFAULT_TIMEOUT_SECONDS = 15 + + def __init__(self, max_workers: int = None): + """Initializes the runner and its persistent ThreadPoolExecutor.""" + self._executor = ThreadPoolExecutor(max_workers=max_workers or self.DEFAULT_MAX_WORKERS) + + def __enter__(self): + """Enables the use of the 'with' statement.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Ensures the executor is shut down when the context is exited.""" + self._executor.shutdown(wait=True) + + def execute_methods(self, *methods: Callable[[], Any], **kwargs) -> List[Union[Any, Exception]]: + tasks: List[Task] = [{"func": method} for method in methods] + return self.execute(tasks, **kwargs) + + def execute( + self, + tasks: List[Task], + timeout_seconds: int = None + ) -> List[Union[Any, Exception]]: + """ + Executes a list of tasks concurrently, preserving result order + and propagating exceptions to the caller. + + Args: + tasks: A list of Task dictionaries to be executed. + timeout_seconds: The maximum time to wait for each task. + + Returns: + A list where each element is either the result of the + corresponding task or the Exception it raised. The order + matches the input `tasks` list. + """ + if not tasks: + return [] + + futures: List[Future] = [ + self._executor.submit( + task["func"], + *task.get("args", ()), + **task.get("kwargs", {}) + ) + for task in tasks + ] + + results: List[Union[Any, Exception]] = [] + for i, future in enumerate(futures): + func_name = tasks[i]["func"].__name__ + try: + result = future.result(timeout=timeout_seconds or self.DEFAULT_TIMEOUT_SECONDS) + results.append(result) + except Exception as exc: # pylint: disable=broad-except + logging.exception(f"Task '{func_name}' generated an exception") + results.append(exc) + + return results + + @staticmethod + def run_methods_and_raise_on_error( + methods: List[Callable[[], Any]], + max_workers: int = None, + timeout_seconds: int = None + ) -> List[Any]: + """ + Convenience wrapper to run no-argument methods concurrently. + + This handles runner creation and raises the first exception encountered. + """ + tasks: List[Task] = [{"func": method} for method in methods] + return ConcurrentTaskRunner.run_and_raise_on_error( + tasks, + max_workers=max_workers, + timeout_seconds=timeout_seconds, + ) + + @staticmethod + def run_and_raise_on_error( + tasks: List[Task], + max_workers: int = None, + timeout_seconds: int = None + ) -> List[Any]: + """ + Runs a batch of tasks concurrently in a self-contained operation. + + This utility creates a runner, executes tasks, and raises the first + exception encountered, returning a clean list of results on success. + """ + max_workers = max_workers or ConcurrentTaskRunner.DEFAULT_MAX_WORKERS + timeout_seconds = timeout_seconds or ConcurrentTaskRunner.DEFAULT_TIMEOUT_SECONDS + + with ConcurrentTaskRunner(max_workers=max_workers) as runner: + results = runner.execute(tasks, timeout_seconds=timeout_seconds) + + exceptions = [res for res in results if isinstance(res, Exception)] + if exceptions: + raise exceptions[0] + return results + + class BaseHandler: """ A base handler class that provides shared core functionality for different BFF handlers. @@ -93,21 +220,20 @@ def load_and_process(self): Loads and processes data. This is a basic implementation that can be overridden by subclasses. """ try: - # Verify enterprise customer attrs have learner portal enabled + # Verify enterprise customer exists and has learner portal enabled self.ensure_learner_portal_enabled() # Transform enterprise customer data self.transform_enterprise_customers() - # Initialize Algolia API keys after customer data is available - self._initialize_secured_algolia_api_keys() - - # Retrieve and process subscription licenses. Handles activation and auto-apply logic. - self.load_and_process_subsidies() - - # Retrieve default enterprise courses and enroll in the redeemable ones - self.load_default_enterprise_enrollment_intentions() - self.enroll_in_redeemable_default_enterprise_enrollment_intentions() + ConcurrentTaskRunner.run_methods_and_raise_on_error( + methods=[ + self.load_secured_algolia_api_key, + self.load_and_process_subsidies, + self.load_and_process_default_enrollment_intentions, + ], + max_workers=3, + ) except Exception as exc: # pylint: disable=broad-exception-caught logger.exception( "Error loading/processing learner portal handler for request user %s and enterprise customer %s", @@ -182,6 +308,8 @@ def load_and_process_subsidies(self): } self.context.data['enterprise_customer_user_subsidies'] =\ EnterpriseCustomerUserSubsidiesSerializer(empty_subsidies).data + + # Retrieve and process subsidies self.load_and_process_subscription_licenses() def transform_enterprise_customer_user(self, enterprise_customer_user): @@ -653,6 +781,14 @@ def enroll_in_redeemable_default_enterprise_enrollment_intentions(self): 'subscription_license_uuid': license_uuids_by_course_run_key.get(course_run_key), }) + def load_and_process_default_enrollment_intentions(self): + """ + Helper method to encapsulate the two-step enrollment process + into a single unit of work for the concurrent runner. + """ + self.load_default_enterprise_enrollment_intentions() + self.enroll_in_redeemable_default_enterprise_enrollment_intentions() + def _request_default_enrollment_realizations(self, license_uuids_by_course_run_key): """ Sends the request to bulk enroll into default enrollment intentions via the LMS diff --git a/enterprise_access/apps/bffs/mixins.py b/enterprise_access/apps/bffs/mixins.py index 0e30a46a7..2591839a3 100644 --- a/enterprise_access/apps/bffs/mixins.py +++ b/enterprise_access/apps/bffs/mixins.py @@ -286,7 +286,7 @@ class AlgoliaDataMixin(BFFContextDataMixin): Mixin to handle Algolia search functionality and API key management. """ - def _initialize_secured_algolia_api_keys(self): + def load_secured_algolia_api_key(self): """ Fetches and initializes the secured Algolia API keys for the request user. Updates the context with the fetched keys. @@ -312,13 +312,13 @@ def _initialize_secured_algolia_api_keys(self): self.context.enterprise_customer_uuid, self.context.enterprise_customer_slug, ) - + # Update context with the fetched data self.context.update_algolia_keys( secured_algolia_api_key, catalog_uuids_to_catalog_query_uuids ) - + # Log if no Algolia key or catalog mapping was found if not (secured_algolia_api_key and catalog_uuids_to_catalog_query_uuids): logger.info( @@ -336,12 +336,12 @@ def _initialize_secured_algolia_api_keys(self): f'{self.context.enterprise_customer_uuid}' ), ) - + except HTTPError as exc: exception_response = exc.response.json() exception_response_user_message = exception_response.get('user_message') exception_response_developer_message = exception_response.get('developer_message') - + logger.exception( 'HTTP Error initializing the secured algolia api keys for request user %s, ' 'enterprise customer uuid %s', @@ -353,7 +353,7 @@ def _initialize_secured_algolia_api_keys(self): developer_message=exception_response_developer_message or str(exc), status_code=exc.response.status_code ) - + except Exception as exc: # pylint: disable=broad-except logger.exception( 'Error initializing the secured algolia api keys for request user %s, ' From e4c9ac61e5ad5ce3b8610aa5a88f741806000c20 Mon Sep 17 00:00:00 2001 From: Adam Stankiewicz Date: Mon, 9 Jun 2025 11:05:57 -0400 Subject: [PATCH 3/9] chore: clean up --- enterprise_access/apps/bffs/handlers.py | 3 +++ enterprise_access/apps/bffs/mixins.py | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/enterprise_access/apps/bffs/handlers.py b/enterprise_access/apps/bffs/handlers.py index 7cbbf9a91..4aaa1850e 100644 --- a/enterprise_access/apps/bffs/handlers.py +++ b/enterprise_access/apps/bffs/handlers.py @@ -3,6 +3,7 @@ """ import json import logging +import time from concurrent.futures import Future, ThreadPoolExecutor from typing import Any, Callable, Dict, List, NotRequired, Tuple, TypedDict, Union @@ -301,6 +302,7 @@ def load_and_process_subsidies(self): """ Load and process subsidies for learners """ + time.sleep(5) empty_subsidies = { 'subscriptions': { 'customer_agreement': None, @@ -786,6 +788,7 @@ def load_and_process_default_enrollment_intentions(self): Helper method to encapsulate the two-step enrollment process into a single unit of work for the concurrent runner. """ + time.sleep(5) self.load_default_enterprise_enrollment_intentions() self.enroll_in_redeemable_default_enterprise_enrollment_intentions() diff --git a/enterprise_access/apps/bffs/mixins.py b/enterprise_access/apps/bffs/mixins.py index 2591839a3..7615873b0 100644 --- a/enterprise_access/apps/bffs/mixins.py +++ b/enterprise_access/apps/bffs/mixins.py @@ -2,6 +2,8 @@ Mixins for accessing `HandlerContext` data for bffs app """ +import time + import logging from urllib.error import HTTPError @@ -291,6 +293,8 @@ def load_secured_algolia_api_key(self): Fetches and initializes the secured Algolia API keys for the request user. Updates the context with the fetched keys. """ + time.sleep(5) + try: secured_algolia_api_key_data = get_and_cache_secured_algolia_search_keys( self.context.request, From 5aa82f50e26e8f0bf176074f294bcc0337f5e1ad Mon Sep 17 00:00:00 2001 From: Adam Stankiewicz Date: Wed, 11 Jun 2025 09:11:59 -0400 Subject: [PATCH 4/9] chore: updates --- enterprise_access/apps/bffs/api.py | 10 +- enterprise_access/apps/bffs/context.py | 24 +- enterprise_access/apps/bffs/handlers.py | 278 +++++++-------------- enterprise_access/apps/bffs/mixins.py | 4 - enterprise_access/apps/bffs/task_runner.py | 92 +++++++ enterprise_access/settings/base.py | 3 + 6 files changed, 202 insertions(+), 209 deletions(-) create mode 100644 enterprise_access/apps/bffs/task_runner.py diff --git a/enterprise_access/apps/bffs/api.py b/enterprise_access/apps/bffs/api.py index f1fc64451..fba68de45 100644 --- a/enterprise_access/apps/bffs/api.py +++ b/enterprise_access/apps/bffs/api.py @@ -54,11 +54,11 @@ def get_and_cache_enterprise_customer_users(request, **kwargs): username = request.user.username cache_key = enterprise_customer_users_cache_key(username) cached_response = request_cache(namespace=REQUEST_CACHE_NAMESPACE).get_cached_response(cache_key) - # if cached_response.is_found: - # logger.info( - # f'enterprise_customer_users cache hit for username {username}' - # ) - # return cached_response.value + if cached_response.is_found: + logger.info( + f'enterprise_customer_users cache hit for username {username}' + ) + return cached_response.value client = LmsUserApiClient(request) response_payload = client.get_enterprise_customers_for_user( diff --git a/enterprise_access/apps/bffs/context.py b/enterprise_access/apps/bffs/context.py index 5df96bfaa..c754adf99 100644 --- a/enterprise_access/apps/bffs/context.py +++ b/enterprise_access/apps/bffs/context.py @@ -2,16 +2,13 @@ HandlerContext for bffs app. """ import logging -from urllib.error import HTTPError from rest_framework import status from enterprise_access.apps.bffs import serializers from enterprise_access.apps.bffs.api import ( get_and_cache_enterprise_customer_users, - get_and_cache_secured_algolia_search_keys, - transform_enterprise_customer_users_data, - transform_secured_algolia_api_key_response + transform_enterprise_customer_users_data ) logger = logging.getLogger(__name__) @@ -56,8 +53,6 @@ def __init__(self, request): self._enterprise_customer_slug = None self._lms_user_id = getattr(self.user, 'lms_user_id', None) self._enterprise_features = {} - self._algolia_api_key = None - self._catalog_uuids_to_catalog_query_uuids = {} self.data = {} # Stores processed data for the response # Initialize common context data @@ -123,6 +118,11 @@ def should_update_active_enterprise_customer_user(self): def catalog_uuids_to_catalog_query_uuids(self): return self.data.get('catalog_uuids_to_catalog_query_uuids') + @property + def secured_algolia_api_key(self): + """Get the secured Algolia API key.""" + return self.data.get('secured_algolia_api_key') + @property def is_request_user_linked_to_enterprise_customer(self): """ @@ -256,23 +256,11 @@ def update_algolia_keys(self, api_key, catalog_mapping): api_key: The secured Algolia API key catalog_mapping: Dictionary mapping catalog UUIDs to query UUIDs """ - self._algolia_api_key = api_key - self._catalog_uuids_to_catalog_query_uuids = catalog_mapping or {} self.data.update({ 'secured_algolia_api_key': api_key, 'catalog_uuids_to_catalog_query_uuids': catalog_mapping or {} }) - @property - def secured_algolia_api_key(self): - """Get the secured Algolia API key.""" - return self._algolia_api_key - - @property - def catalog_uuids_to_catalog_query_uuids(self): - """Get the mapping of catalog UUIDs to query UUIDs.""" - return self._catalog_uuids_to_catalog_query_uuids - def add_error(self, status_code=None, **kwargs): """ Adds an error to the context. diff --git a/enterprise_access/apps/bffs/handlers.py b/enterprise_access/apps/bffs/handlers.py index 4aaa1850e..1e91f4b63 100644 --- a/enterprise_access/apps/bffs/handlers.py +++ b/enterprise_access/apps/bffs/handlers.py @@ -4,8 +4,7 @@ import json import logging import time -from concurrent.futures import Future, ThreadPoolExecutor -from typing import Any, Callable, Dict, List, NotRequired, Tuple, TypedDict, Union +from enum import Enum, auto from enterprise_access.apps.api_client.constants import LicenseStatuses from enterprise_access.apps.api_client.license_manager_client import LicenseManagerUserApiClient @@ -20,137 +19,12 @@ from enterprise_access.apps.bffs.context import HandlerContext from enterprise_access.apps.bffs.mixins import AlgoliaDataMixin, BaseLearnerDataMixin, LearnerDashboardDataMixin from enterprise_access.apps.bffs.serializers import EnterpriseCustomerUserSubsidiesSerializer +from enterprise_access.apps.bffs.task_runner import ConcurrentTaskRunner logger = logging.getLogger(__name__) -class Task(TypedDict): - """ - Defines the structure for a task. - - func: The function to execute (required). - - args: A tuple of positional arguments for the func (optional). - - kwargs: A dict of keyword arguments for the func (optional). - """ - - func: Callable[..., Any] - args: NotRequired[Tuple] - kwargs: NotRequired[Dict[str, Any]] - - -class ConcurrentTaskRunner: - """ - A context manager for running I/O-bound tasks concurrently. - - This class creates and manages a ThreadPoolExecutor, ensuring that - resources are properly shut down. It's designed to be used with a - `with` statement for guaranteed cleanup. - - Usage: - with ConcurrentTaskRunner(max_workers=5) as runner: - results = runner.execute(tasks) - """ - - DEFAULT_MAX_WORKERS = 5 - DEFAULT_TIMEOUT_SECONDS = 15 - - def __init__(self, max_workers: int = None): - """Initializes the runner and its persistent ThreadPoolExecutor.""" - self._executor = ThreadPoolExecutor(max_workers=max_workers or self.DEFAULT_MAX_WORKERS) - - def __enter__(self): - """Enables the use of the 'with' statement.""" - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Ensures the executor is shut down when the context is exited.""" - self._executor.shutdown(wait=True) - - def execute_methods(self, *methods: Callable[[], Any], **kwargs) -> List[Union[Any, Exception]]: - tasks: List[Task] = [{"func": method} for method in methods] - return self.execute(tasks, **kwargs) - - def execute( - self, - tasks: List[Task], - timeout_seconds: int = None - ) -> List[Union[Any, Exception]]: - """ - Executes a list of tasks concurrently, preserving result order - and propagating exceptions to the caller. - - Args: - tasks: A list of Task dictionaries to be executed. - timeout_seconds: The maximum time to wait for each task. - - Returns: - A list where each element is either the result of the - corresponding task or the Exception it raised. The order - matches the input `tasks` list. - """ - if not tasks: - return [] - - futures: List[Future] = [ - self._executor.submit( - task["func"], - *task.get("args", ()), - **task.get("kwargs", {}) - ) - for task in tasks - ] - - results: List[Union[Any, Exception]] = [] - for i, future in enumerate(futures): - func_name = tasks[i]["func"].__name__ - try: - result = future.result(timeout=timeout_seconds or self.DEFAULT_TIMEOUT_SECONDS) - results.append(result) - except Exception as exc: # pylint: disable=broad-except - logging.exception(f"Task '{func_name}' generated an exception") - results.append(exc) - - return results - - @staticmethod - def run_methods_and_raise_on_error( - methods: List[Callable[[], Any]], - max_workers: int = None, - timeout_seconds: int = None - ) -> List[Any]: - """ - Convenience wrapper to run no-argument methods concurrently. - - This handles runner creation and raises the first exception encountered. - """ - tasks: List[Task] = [{"func": method} for method in methods] - return ConcurrentTaskRunner.run_and_raise_on_error( - tasks, - max_workers=max_workers, - timeout_seconds=timeout_seconds, - ) - - @staticmethod - def run_and_raise_on_error( - tasks: List[Task], - max_workers: int = None, - timeout_seconds: int = None - ) -> List[Any]: - """ - Runs a batch of tasks concurrently in a self-contained operation. - - This utility creates a runner, executes tasks, and raises the first - exception encountered, returning a clean list of results on success. - """ - max_workers = max_workers or ConcurrentTaskRunner.DEFAULT_MAX_WORKERS - timeout_seconds = timeout_seconds or ConcurrentTaskRunner.DEFAULT_TIMEOUT_SECONDS - - with ConcurrentTaskRunner(max_workers=max_workers) as runner: - results = runner.execute(tasks, timeout_seconds=timeout_seconds) - - exceptions = [res for res in results if isinstance(res, Exception)] - if exceptions: - raise exceptions[0] - return results +MOCK_TASK_DELAY = 5 class BaseHandler: @@ -204,6 +78,12 @@ class BaseLearnerPortalHandler(BaseHandler, AlgoliaDataMixin, BaseLearnerDataMix across all learner-focused page routes, such as the learner dashboard, search, and course routes. """ + class CONCURRENCY_GROUPS(Enum): + """ + Group names for concurrent tasks. + """ + DEFAULT = auto() + def __init__(self, context): """ Initializes the BaseLearnerPortalHandler with a HandlerContext and API clients. @@ -216,6 +96,58 @@ def __init__(self, context): self.license_manager_user_api_client = LicenseManagerUserApiClient(self.context.request) self.lms_api_client = LmsApiClient() + def _get_concurrent_tasks(self): + """ + Establishes the data structure for tasks and adds base tasks. + Subclasses may call this method via super() to extend the tasks + for any specific group. + """ + # Initialize groups + tasks = { + self.CONCURRENCY_GROUPS.DEFAULT: [], + } + + # Add tasks to default group + tasks[self.CONCURRENCY_GROUPS.DEFAULT].extend([ + self.load_and_process_subsidies, + self.load_secured_algolia_api_key, + self.load_and_process_default_enrollment_intentions, + ]) + + return tasks + + def load_secured_algolia_api_key(self): + """ + Temporary override to add delay. + """ + time.sleep(MOCK_TASK_DELAY) + super().load_secured_algolia_api_key() + + def load_and_process_subsidies(self): + """ + Load and process subsidies for learners + """ + time.sleep(MOCK_TASK_DELAY) + empty_subsidies = { + 'subscriptions': { + 'customer_agreement': None, + }, + } + self.context.data['enterprise_customer_user_subsidies'] =\ + EnterpriseCustomerUserSubsidiesSerializer(empty_subsidies).data + + # Retrieve and process subsidies + self.load_and_process_subscription_licenses() + + def load_and_process_default_enrollment_intentions(self): + """ + Helper method to encapsulate the two-step enrollment process + into a single unit of work for the concurrent runner. + """ + time.sleep(MOCK_TASK_DELAY) + self.load_default_enterprise_enrollment_intentions() + self.enroll_in_redeemable_default_enterprise_enrollment_intentions() + def load_and_process(self): """ Loads and processes data. This is a basic implementation that can be overridden by subclasses. @@ -226,16 +158,7 @@ def load_and_process(self): # Transform enterprise customer data self.transform_enterprise_customers() - - ConcurrentTaskRunner.run_methods_and_raise_on_error( - methods=[ - self.load_secured_algolia_api_key, - self.load_and_process_subsidies, - self.load_and_process_default_enrollment_intentions, - ], - max_workers=3, - ) - except Exception as exc: # pylint: disable=broad-exception-caught + except Exception as exc: # pylint: disable=broad-except logger.exception( "Error loading/processing learner portal handler for request user %s and enterprise customer %s", self.context.lms_user_id, @@ -245,6 +168,26 @@ def load_and_process(self): user_message="Could not load and/or process common data", developer_message=f"Unable to load and/or process common learner portal data: {exc}", ) + return + + all_tasks_to_run = self._get_concurrent_tasks() + with ConcurrentTaskRunner(task_definitions=all_tasks_to_run) as runner: + task_results = runner.run_group(self.CONCURRENCY_GROUPS.DEFAULT) + def handle_task_error(task_name, error_message): + logger.error( + "Error running concurrent task '%s' for request user %s and enterprise customer %s: %s", + task_name, + self.context.lms_user_id, + self.context.enterprise_customer_uuid, + error_message, + ) + self.add_error( + user_message="Could not load and/or process a concurrent task", + developer_message=( + f"Unable to load and/or process concurrent task '{task_name}': {error_message}" + ), + ) + runner.handle_failed_tasks(task_results, handle_task_error) def ensure_learner_portal_enabled(self): """ @@ -298,22 +241,6 @@ def transform_enterprise_customers(self): f"No linked enterprise customer users found in the context for request user {self.context.lms_user_id}" ) - def load_and_process_subsidies(self): - """ - Load and process subsidies for learners - """ - time.sleep(5) - empty_subsidies = { - 'subscriptions': { - 'customer_agreement': None, - }, - } - self.context.data['enterprise_customer_user_subsidies'] =\ - EnterpriseCustomerUserSubsidiesSerializer(empty_subsidies).data - - # Retrieve and process subsidies - self.load_and_process_subscription_licenses() - def transform_enterprise_customer_user(self, enterprise_customer_user): """ Transform the enterprise customer user data. @@ -783,15 +710,6 @@ def enroll_in_redeemable_default_enterprise_enrollment_intentions(self): 'subscription_license_uuid': license_uuids_by_course_run_key.get(course_run_key), }) - def load_and_process_default_enrollment_intentions(self): - """ - Helper method to encapsulate the two-step enrollment process - into a single unit of work for the concurrent runner. - """ - time.sleep(5) - self.load_default_enterprise_enrollment_intentions() - self.enroll_in_redeemable_default_enterprise_enrollment_intentions() - def _request_default_enrollment_realizations(self, license_uuids_by_course_run_key): """ Sends the request to bulk enroll into default enrollment intentions via the LMS @@ -842,27 +760,23 @@ class DashboardHandler(LearnerDashboardDataMixin, BaseLearnerPortalHandler): of data specific to the learner dashboard. """ - def load_and_process(self): + def _get_concurrent_tasks(self): """ - Loads and processes data for the learner dashboard route. - - This method overrides the `load_and_process` method in `BaseLearnerPortalHandler`. + This is the key method. It extends the tasks from its parent. """ - super().load_and_process() + tasks = super()._get_concurrent_tasks() + tasks[self.CONCURRENCY_GROUPS.DEFAULT].extend([ + self.load_enterprise_course_enrollments, + ]) + return tasks - try: - # Load data specific to the dashboard route - self.load_enterprise_course_enrollments() - except Exception as e: # pylint: disable=broad-exception-caught - logger.exception( - "Error loading and/or processing dashboard data for user %s and enterprise customer %s", - self.context.lms_user_id, - self.context.enterprise_customer_uuid, - ) - self.add_error( - user_message="Could not load and/or processing the learner dashboard.", - developer_message=f"Failed to load and/or processing the learner dashboard data: {e}", - ) + def load_enterprise_course_enrollments(self): + """ + Temporary override to add delay. + """ + time.sleep(MOCK_TASK_DELAY) + # raise Exception('Failed to load enterprise course enrollments?!') + return super().load_enterprise_course_enrollments() class SearchHandler(BaseLearnerPortalHandler): diff --git a/enterprise_access/apps/bffs/mixins.py b/enterprise_access/apps/bffs/mixins.py index 7615873b0..2591839a3 100644 --- a/enterprise_access/apps/bffs/mixins.py +++ b/enterprise_access/apps/bffs/mixins.py @@ -2,8 +2,6 @@ Mixins for accessing `HandlerContext` data for bffs app """ -import time - import logging from urllib.error import HTTPError @@ -293,8 +291,6 @@ def load_secured_algolia_api_key(self): Fetches and initializes the secured Algolia API keys for the request user. Updates the context with the fetched keys. """ - time.sleep(5) - try: secured_algolia_api_key_data = get_and_cache_secured_algolia_search_keys( self.context.request, diff --git a/enterprise_access/apps/bffs/task_runner.py b/enterprise_access/apps/bffs/task_runner.py new file mode 100644 index 000000000..3c88367c0 --- /dev/null +++ b/enterprise_access/apps/bffs/task_runner.py @@ -0,0 +1,92 @@ +""" +Task runner for executing concurrent tasks. +""" + +import logging +import os +from concurrent.futures import ThreadPoolExecutor, as_completed + +from django.conf import settings + +logger = logging.getLogger(__name__) + + +class ConcurrentTaskRunner: + """ + Accepts a dictionary of task definitions and runs them concurrently. + """ + + def __init__(self, task_definitions): + """ + Initializes the runner with a pre-built dictionary of tasks. + + Args: + task_definitions (dict): A dictionary where keys are group names + and values are lists of callable tasks. + """ + self.task_registry = task_definitions or {} + + def run_group(self, group, max_workers=None): + """ + Runs all tasks for a specific group using a ThreadPoolExecutor. + """ + tasks_to_run = self.task_registry.get(group, []) + if not tasks_to_run: + logger.warning(f"No tasks found for group '{group.name}'.") + return [] + + num_tasks_to_run = len(tasks_to_run) + logger.info( + f"Running task group: '{group.name}' with {num_tasks_to_run} tasks" + ) + if not max_workers: + default_max_workers = (os.cpu_count() or 1) + 4 + max_workers = min(num_tasks_to_run, default_max_workers) + if settings.MAX_CONCURRENT_TASK_WORKERS is not None: + max_workers = min(max_workers, settings.MAX_CONCURRENT_TASK_WORKERS) + results = [] + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_task = { + executor.submit(task): task for task in tasks_to_run + } + for future in as_completed(future_to_task): + task_name = future_to_task[future].__name__ + try: + result = future.result() + logger.info(f"Task {task_name} completed successfully") + results.append({ + 'task_name': task_name, + 'result': result, + 'error': None + }) + except Exception as exc: # pylint: disable=broad-except + logger.exception(f"Task {task_name} failed") + results.append({ + 'task_name': task_name, + 'result': None, + 'error': str(exc) + }) + return results + + def __enter__(self): + """Entering the 'with' block returns the runner instance.""" + return self + + def handle_failed_tasks(self, task_results, error_callback): + """ + Process any failed tasks from the results. + + Args: + task_results (list): List of task result dictionaries. + error_callback (callable): A function that will be called for each failed task. + Signature: error_callback(task_name, error_message) + """ + if not task_results: + return + + failed_tasks = [result for result in task_results if result['error'] is not None] + for failed_task in failed_tasks: + error_callback(failed_task['task_name'], str(failed_task['error'])) + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exiting the 'with' block. No cleanup needed.""" diff --git a/enterprise_access/settings/base.py b/enterprise_access/settings/base.py index 027e3d987..99c908821 100644 --- a/enterprise_access/settings/base.py +++ b/enterprise_access/settings/base.py @@ -534,6 +534,9 @@ def root(*path_fragments): DEFAULT_ENTERPRISE_ENROLLMENT_INTENTIONS_CACHE_TIMEOUT = DEFAULT_CACHE_TIMEOUT ALL_ENTERPRISE_GROUP_MEMBERS_CACHE_TIMEOUT = DEFAULT_CACHE_TIMEOUT +# Maximum number of concurrent tasks to run (optional) +MAX_CONCURRENT_TASK_WORKERS = None + BRAZE_GROUP_EMAIL_FORCE_REMIND_ALL_PENDING_LEARNERS = False BRAZE_GROUPS_EMAIL_AUTO_REMINDER_DAY_5_CAMPAIGN = '' BRAZE_GROUPS_EMAIL_AUTO_REMINDER_DAY_25_CAMPAIGN = '' From 025e059e89b90feb9bf2a1f3abe91b47c55cd656 Mon Sep 17 00:00:00 2001 From: Adam Stankiewicz Date: Wed, 11 Jun 2025 09:20:29 -0400 Subject: [PATCH 5/9] chore: re-order methods --- enterprise_access/apps/bffs/task_runner.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/enterprise_access/apps/bffs/task_runner.py b/enterprise_access/apps/bffs/task_runner.py index 3c88367c0..12e96fe9e 100644 --- a/enterprise_access/apps/bffs/task_runner.py +++ b/enterprise_access/apps/bffs/task_runner.py @@ -68,10 +68,6 @@ def run_group(self, group, max_workers=None): }) return results - def __enter__(self): - """Entering the 'with' block returns the runner instance.""" - return self - def handle_failed_tasks(self, task_results, error_callback): """ Process any failed tasks from the results. @@ -88,5 +84,9 @@ def handle_failed_tasks(self, task_results, error_callback): for failed_task in failed_tasks: error_callback(failed_task['task_name'], str(failed_task['error'])) + def __enter__(self): + """Entering the 'with' block returns the runner instance.""" + return self + def __exit__(self, exc_type, exc_val, exc_tb): """Exiting the 'with' block. No cleanup needed.""" From 9b85132b0a6afd6971570be3d1d0c36b6e59fa61 Mon Sep 17 00:00:00 2001 From: Adam Stankiewicz Date: Wed, 11 Jun 2025 09:39:05 -0400 Subject: [PATCH 6/9] chore: updates --- enterprise_access/apps/bffs/handlers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enterprise_access/apps/bffs/handlers.py b/enterprise_access/apps/bffs/handlers.py index 1e91f4b63..3e14362e3 100644 --- a/enterprise_access/apps/bffs/handlers.py +++ b/enterprise_access/apps/bffs/handlers.py @@ -762,7 +762,7 @@ class DashboardHandler(LearnerDashboardDataMixin, BaseLearnerPortalHandler): def _get_concurrent_tasks(self): """ - This is the key method. It extends the tasks from its parent. + Add additional concurrent tasks for the dashboard. """ tasks = super()._get_concurrent_tasks() tasks[self.CONCURRENCY_GROUPS.DEFAULT].extend([ From 3eeb579cb6ccd767b75d84ceaa0bcbb84444cfdb Mon Sep 17 00:00:00 2001 From: Adam Stankiewicz Date: Wed, 11 Jun 2025 09:52:43 -0400 Subject: [PATCH 7/9] chore: updates --- enterprise_access/apps/bffs/handlers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enterprise_access/apps/bffs/handlers.py b/enterprise_access/apps/bffs/handlers.py index 3e14362e3..a56930e6b 100644 --- a/enterprise_access/apps/bffs/handlers.py +++ b/enterprise_access/apps/bffs/handlers.py @@ -23,7 +23,6 @@ logger = logging.getLogger(__name__) - MOCK_TASK_DELAY = 5 @@ -170,6 +169,7 @@ def load_and_process(self): ) return + # Run concurrent tasks all_tasks_to_run = self._get_concurrent_tasks() with ConcurrentTaskRunner(task_definitions=all_tasks_to_run) as runner: task_results = runner.run_group(self.CONCURRENCY_GROUPS.DEFAULT) From 0c8039a8fa7942c0ab8c7b7dcf02b3cc9d355128 Mon Sep 17 00:00:00 2001 From: Adam Stankiewicz Date: Wed, 11 Jun 2025 10:00:05 -0400 Subject: [PATCH 8/9] chore: updates --- enterprise_access/apps/bffs/handlers.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/enterprise_access/apps/bffs/handlers.py b/enterprise_access/apps/bffs/handlers.py index a56930e6b..5765275a7 100644 --- a/enterprise_access/apps/bffs/handlers.py +++ b/enterprise_access/apps/bffs/handlers.py @@ -23,7 +23,7 @@ logger = logging.getLogger(__name__) -MOCK_TASK_DELAY = 5 +MOCK_TASK_DELAY = 0 class BaseHandler: @@ -77,7 +77,7 @@ class BaseLearnerPortalHandler(BaseHandler, AlgoliaDataMixin, BaseLearnerDataMix across all learner-focused page routes, such as the learner dashboard, search, and course routes. """ - class CONCURRENCY_GROUPS(Enum): + class BASE_CONCURRENCY_GROUPS(Enum): """ Group names for concurrent tasks. """ @@ -103,11 +103,11 @@ def _get_concurrent_tasks(self): """ # Initialize groups tasks = { - self.CONCURRENCY_GROUPS.DEFAULT: [], + self.BASE_CONCURRENCY_GROUPS.DEFAULT: [], } # Add tasks to default group - tasks[self.CONCURRENCY_GROUPS.DEFAULT].extend([ + tasks[self.BASE_CONCURRENCY_GROUPS.DEFAULT].extend([ self.load_and_process_subsidies, self.load_secured_algolia_api_key, self.load_and_process_default_enrollment_intentions, @@ -172,7 +172,7 @@ def load_and_process(self): # Run concurrent tasks all_tasks_to_run = self._get_concurrent_tasks() with ConcurrentTaskRunner(task_definitions=all_tasks_to_run) as runner: - task_results = runner.run_group(self.CONCURRENCY_GROUPS.DEFAULT) + task_results = runner.run_group(self.BASE_CONCURRENCY_GROUPS.DEFAULT) def handle_task_error(task_name, error_message): logger.error( "Error running concurrent task '%s' for request user %s and enterprise customer %s: %s", @@ -760,12 +760,18 @@ class DashboardHandler(LearnerDashboardDataMixin, BaseLearnerPortalHandler): of data specific to the learner dashboard. """ + class DASHBOARD_CONCURRENCY_GROUPS(Enum): + """ + Group names for concurrent tasks. + """ + DEFAULT = auto() + def _get_concurrent_tasks(self): """ Add additional concurrent tasks for the dashboard. """ tasks = super()._get_concurrent_tasks() - tasks[self.CONCURRENCY_GROUPS.DEFAULT].extend([ + tasks[self.BASE_CONCURRENCY_GROUPS.DEFAULT].extend([ self.load_enterprise_course_enrollments, ]) return tasks From 95f93b982d5d394168cef6463a8da4ccc6c36969 Mon Sep 17 00:00:00 2001 From: Adam Stankiewicz Date: Thu, 12 Jun 2025 10:44:04 -0400 Subject: [PATCH 9/9] feat: wrap concurrent requests behind feature flag --- .../apps/api/v1/views/bffs/common.py | 2 + enterprise_access/apps/bffs/context.py | 25 +++++++++++ enterprise_access/apps/bffs/handlers.py | 45 ++++++++++++++++++- .../apps/bffs/response_builder.py | 1 + enterprise_access/apps/bffs/serializers.py | 1 + enterprise_access/apps/bffs/task_runner.py | 2 +- 6 files changed, 73 insertions(+), 3 deletions(-) diff --git a/enterprise_access/apps/api/v1/views/bffs/common.py b/enterprise_access/apps/api/v1/views/bffs/common.py index fe7e632a1..730c3f76f 100644 --- a/enterprise_access/apps/api/v1/views/bffs/common.py +++ b/enterprise_access/apps/api/v1/views/bffs/common.py @@ -103,8 +103,10 @@ def load_route_data_and_build_response(self, request, handler_class, response_bu errors = ordered_representation.pop('errors', []) warnings = ordered_representation.pop('warnings', []) enterprise_features = ordered_representation.pop('enterprise_features', {}) + enterprise_features_by_customer = ordered_representation.pop('enterprise_features_by_customer', {}) ordered_representation['errors'] = errors ordered_representation['warnings'] = warnings ordered_representation['enterprise_features'] = enterprise_features + ordered_representation['enterprise_features_by_customer'] = enterprise_features_by_customer return dict(ordered_representation), status_code diff --git a/enterprise_access/apps/bffs/context.py b/enterprise_access/apps/bffs/context.py index c754adf99..859b6de4e 100644 --- a/enterprise_access/apps/bffs/context.py +++ b/enterprise_access/apps/bffs/context.py @@ -53,6 +53,7 @@ def __init__(self, request): self._enterprise_customer_slug = None self._lms_user_id = getattr(self.user, 'lms_user_id', None) self._enterprise_features = {} + self._enterprise_features_by_customer = {} self.data = {} # Stores processed data for the response # Initialize common context data @@ -94,6 +95,10 @@ def lms_user_id(self): def enterprise_features(self): return self._enterprise_features + @property + def enterprise_features_by_customer(self): + return self._enterprise_features_by_customer + @property def enterprise_customer(self): return self.data.get('enterprise_customer') @@ -217,6 +222,10 @@ def load_enterprise_customer_users(self): # Set enterprise features from the response self._enterprise_features = enterprise_customer_users_data.get('enterprise_features', {}) + self._enterprise_features_by_customer = enterprise_customer_users_data.get( + 'enterprise_features_by_customer', + {}, + ) # Parse/transform the enterprise customer users data and update the context data transformed_data = {} @@ -287,3 +296,19 @@ def add_warning(self, **kwargs): serializer = serializers.WarningSerializer(data=kwargs) serializer.is_valid(raise_exception=True) self.warnings.append(serializer.data) + + def feature_enabled_for_enterprise_customer(self, feature_name): + """ + Returns the feature for the enterprise customer. + + Args: + feature_name (str): The name of the feature to retrieve. + """ + if not self.enterprise_customer_uuid: + return False + + enterprise_features_for_customer = self.enterprise_features_by_customer.get( + self.enterprise_customer_uuid, + {}, + ) + return enterprise_features_for_customer.get(feature_name, False) diff --git a/enterprise_access/apps/bffs/handlers.py b/enterprise_access/apps/bffs/handlers.py index 5765275a7..76af7dc52 100644 --- a/enterprise_access/apps/bffs/handlers.py +++ b/enterprise_access/apps/bffs/handlers.py @@ -23,7 +23,7 @@ logger = logging.getLogger(__name__) -MOCK_TASK_DELAY = 0 +MOCK_TASK_DELAY = 5 class BaseHandler: @@ -169,10 +169,24 @@ def load_and_process(self): ) return - # Run concurrent tasks + # Check if concurrent requests are enabled; if not, run tasks serially. + if not self.context.feature_enabled_for_enterprise_customer('enterprise_learner_bff_concurrent_requests'): + # Retrieve and process algolia api key + self.load_secured_algolia_api_key() + + # Retrieve and process subscription licenses. Handles activation and auto-apply logic. + self.load_and_process_subsidies() + + # Retrieve default enterprise courses and enroll in the redeemable ones + self.load_default_enterprise_enrollment_intentions() + self.enroll_in_redeemable_default_enterprise_enrollment_intentions() + return + + # Otherwise, run concurrent tasks all_tasks_to_run = self._get_concurrent_tasks() with ConcurrentTaskRunner(task_definitions=all_tasks_to_run) as runner: task_results = runner.run_group(self.BASE_CONCURRENCY_GROUPS.DEFAULT) + def handle_task_error(task_name, error_message): logger.error( "Error running concurrent task '%s' for request user %s and enterprise customer %s: %s", @@ -784,6 +798,33 @@ def load_enterprise_course_enrollments(self): # raise Exception('Failed to load enterprise course enrollments?!') return super().load_enterprise_course_enrollments() + def load_and_process(self): + """ + Loads and processes data for the learner dashboard route. + + This method overrides the `load_and_process` method in `BaseLearnerPortalHandler`. + """ + super().load_and_process() + + # If concurrent requests are enabled, do not load enterprise course enrollments as they're requested + # within the concurrent task group returned by the _get_concurrent_tasks method. + if self.context.feature_enabled_for_enterprise_customer('enterprise_learner_bff_concurrent_requests'): + return + + # Otherwise, load enterprise course enrollments serially. + try: + self.load_enterprise_course_enrollments() + except Exception as e: # pylint: disable=broad-except + logger.exception( + "Error loading and/or processing dashboard data for user %s and enterprise customer %s", + self.context.lms_user_id, + self.context.enterprise_customer_uuid, + ) + self.add_error( + user_message="Could not load and/or processing the learner dashboard.", + developer_message=f"Failed to load and/or processing the learner dashboard data: {e}", + ) + class SearchHandler(BaseLearnerPortalHandler): """ diff --git a/enterprise_access/apps/bffs/response_builder.py b/enterprise_access/apps/bffs/response_builder.py index 1d5d8c670..1400f1a77 100644 --- a/enterprise_access/apps/bffs/response_builder.py +++ b/enterprise_access/apps/bffs/response_builder.py @@ -62,6 +62,7 @@ def build(self): self.context.should_update_active_enterprise_customer_user ) self.response_data['enterprise_features'] = self.context.enterprise_features + self.response_data['enterprise_features_by_customer'] = self.context.enterprise_features_by_customer self.response_data['secured_algolia_api_key'] = self.context.secured_algolia_api_key self.response_data['catalog_uuids_to_catalog_query_uuids'] = self.context.catalog_uuids_to_catalog_query_uuids diff --git a/enterprise_access/apps/bffs/serializers.py b/enterprise_access/apps/bffs/serializers.py index a5cffd8db..ee8baf21a 100644 --- a/enterprise_access/apps/bffs/serializers.py +++ b/enterprise_access/apps/bffs/serializers.py @@ -180,6 +180,7 @@ class BaseResponseSerializer(BaseBffSerializer): errors = ErrorSerializer(many=True, required=False, default=list) warnings = WarningSerializer(many=True, required=False, default=list) enterprise_features = serializers.DictField(required=False, default=dict) + enterprise_features_by_customer = serializers.DictField(required=False, default=dict) class CustomerAgreementSerializer(BaseBffSerializer): diff --git a/enterprise_access/apps/bffs/task_runner.py b/enterprise_access/apps/bffs/task_runner.py index 12e96fe9e..10fd8d207 100644 --- a/enterprise_access/apps/bffs/task_runner.py +++ b/enterprise_access/apps/bffs/task_runner.py @@ -79,7 +79,7 @@ def handle_failed_tasks(self, task_results, error_callback): """ if not task_results: return - + failed_tasks = [result for result in task_results if result['error'] is not None] for failed_task in failed_tasks: error_callback(failed_task['task_name'], str(failed_task['error']))