Skip to content

Conversation

adamstankiewicz
Copy link
Member

@adamstankiewicz adamstankiewicz commented Jun 11, 2025

Description:

Important

Depends on openedx/edx-enterprise#2415 (for enterprise-specific waffle-based feature flags)

Adds capability to execute API requests to external services concurrently within the BFF API layer with concurrent.futures.ThreadPoolExecutor. Only uses the concurrent approach if the enterprise_learner_bff_concurrent_requests feature flag exposed by the LMS /enterprise/api/v1/enterprise-learner/ API (see PR) is enabled. This feature flag may be configured to a percentage-based rollout for a specific customer.

Handlers add concurrent tasks to specific, named groups within CONCURRENCY_GROUPS

Handlers, responsible for data fetching and processing logic, may define a named group of concurrent tasks, e.g.:

from enum import Enum, auto

class BaseLearnerPortalHandler(BaseHandler):
    class BASE_CONCURRENCY_GROUPS(Enum):
        """
        Group names for concurrent tasks.
        """
        DEFAULT = auto()

    def __init__(self, context):
        # ...

Then, the handler may define a _get_concurrent_tasks method that adds tasks for the named groups, e.g.:

def _get_concurrent_tasks(self):
        """
        Establishes the data structure for tasks and adds base tasks.
        Subclasses may call this method via super() to extend the tasks
        for any specific group.
        """
        # Initialize groups
        tasks = {
            self.BASE_CONCURRENCY_GROUPS.DEFAULT: [],
        }

        # Add tasks to default group
        tasks[self.BASE_CONCURRENCY_GROUPS.DEFAULT].extend([
            self.load_and_process_subsidies,
            self.load_secured_algolia_api_key,
            self.load_and_process_default_enrollment_intentions,
        ])

        return tasks

Then, in the handler's load_and_process method, the configured concurrent tasks may be executed as follows:

# BaseLearnerPortalHandler.load_and_process
all_tasks_to_run = self._get_concurrent_tasks()
with ConcurrentTaskRunner(task_definitions=all_tasks_to_run) as runner:
    task_results = runner.run_group(self.BASE_CONCURRENCY_GROUPS.DEFAULT)
    def handle_task_error(task_name, error_message):
        self.add_error(...)
    runner.handle_failed_tasks(task_results, handle_task_error)

ConcurrentTaskRunner

The ConcurrentTaskRunner exposes a run_group method that executes the tasks in the given group. It uses concurrent.futures.ThreadPoolExecutor to submit tasks to concurrent threads.

It then iterates through the tasks as they complete with as_completed adding the appropriate result metadata to the returned results list.

The handlers can pass the results list into another method ConcurrentTaskRunner.handle_failed_tasks along with a callback method for how to handle the error (e.g., logging, self.add_error).

How do handler subclasses add tasks to an existing group from a parent handler class?

As an example, DashboardHandler makes an API request to fetch enterprise course enrollments for the request user. This call can be made independently of other calls in the parent BaseLearnerPortalHandler as it only depends on the enterprise_customer_uuid.

Given this, DashboardHandler may extend _get_concurrent_tasks to add additional task(s) to an existing group configured in BASE_CONCURRENCY_GROUPS, e.g.:

def _get_concurrent_tasks(self):
        """
        Add additional concurrent tasks for the dashboard.
        """
        tasks = super()._get_concurrent_tasks()
        tasks[self.BASE_CONCURRENCY_GROUPS.DEFAULT].extend([
            self.load_enterprise_course_enrollments,
        ])
        return tasks

Jira:
ENT-10429

Merge checklist:

  • ./manage.py makemigrations has been run
    • Note: This must be run if you modified any models.
      • It may or may not make a migration depending on exactly what you modified, but it should still be run.

Post merge:

  • Ensure that your changes went out to the stage instance
  • Deploy to prod instance

@adamstankiewicz adamstankiewicz changed the title Ags/ent 10429 feat: add support for concurrent API requests to external services in BFF API layer Jun 11, 2025
if not self.enterprise_customer_uuid:
self._enterprise_customer_uuid = self.enterprise_customer.get('uuid')

# Initialize the secured algolia api keys metadata derived from enterprise catalog
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[inform] Moved outside of context.py and into BaseLearnerPortalHandler.

logger = logging.getLogger(__name__)


MOCK_TASK_DELAY = 5
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[inform] Temporary constant to use with time.sleep(MOCK_TASK_DELAY) to simulate longer requests.

@adamstankiewicz adamstankiewicz marked this pull request as draft June 11, 2025 13:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant