Skip to content

Conversation

weslleyspereira
Copy link
Collaborator

@weslleyspereira weslleyspereira commented Sep 11, 2025

Sorry that I missed the release by a few hours. I saw this bug yesterday, I think. This PR fixes the logic in EvaluationManager.retrieve_results().

  • Fix bug in EvaluationManager.retrieve_results(), and an inconsistency in submit_tasks() (*args can't come after keyword argument).
  • Also, add a queue lock to EvaluationManager. This is made so that submit_tasks() can be called in parallel with retrieve_results(). Not necessarily needed now, but I can think on workflows that do need that.

Here follows the tests that I used that can reproduce the issue in the version of the code in the current develop branch. Please, I would recommend that you include such tests in future releases:

import logging
from unittest.mock import Mock, patch
from concurrent.futures import ProcessPoolExecutor, Future
from hiopbbpy.utils.evaluation_manager import (
    EvaluationManager,
    is_running_with_mpi,
)
from concurrent.futures import CancelledError


# Helper functions for testing - defined at module level to avoid pickle issues
def square(x):
    """Helper function for testing - squares input."""
    return x**2


def double(x):
    """Helper function for testing - doubles input."""
    return x * 2


def triple(x):
    """Helper function for testing - triples input."""
    return x * 3


class TestEvaluationManager:
    """Test suite for EvaluationManager class."""

    def test_init_default_executors(self):
        """Test initialization with default executors."""
        manager = EvaluationManager()

        assert "cpu" in manager.executors
        assert isinstance(manager.executors["cpu"], ProcessPoolExecutor)
        assert isinstance(manager.logger, logging.Logger)
        assert len(manager._queue) == 0

        manager.__del__()

    def test_init_custom_cpu_executor(self):
        """Test initialization with custom CPU executor."""
        custom_executor = Mock()
        manager = EvaluationManager(cpu_executor=custom_executor)

        assert manager.executors["cpu"] is custom_executor

        manager.__del__()

    def test_init_with_mpi_executor(self):
        """Test initialization with MPI executor when provided."""
        mock_mpi_executor = Mock()
        manager = EvaluationManager(mpi_executor=mock_mpi_executor)

        if is_running_with_mpi():
            assert "mpi" in manager.executors
        else:
            # Should still add MPI executor even if MPI not available
            assert manager.executors.get("mpi") is mock_mpi_executor

        manager.__del__()

    def test_submit_tasks_cpu(self):
        """Test submitting tasks to CPU executor."""
        mock_executor = Mock()
        mock_future = Mock(spec=Future)
        mock_executor.submit.return_value = mock_future

        manager = EvaluationManager(cpu_executor=mock_executor)

        def dummy_fn(x):
            return x * 2

        X = [1, 2, 3]
        manager.submit_tasks(dummy_fn, X, execute_at="cpu")

        assert len(manager._queue) == 3
        assert mock_executor.submit.call_count == 3

        # Check that tasks were submitted with correct arguments
        for i, call in enumerate(mock_executor.submit.call_args_list):
            args, kwargs = call
            assert args[0] is dummy_fn
            assert args[1] == X[i]

        manager.__del__()

    def test_submit_tasks_with_kwargs(self):
        """Test submitting tasks with additional arguments."""
        mock_executor = Mock()
        mock_future = Mock(spec=Future)
        mock_executor.submit.return_value = mock_future

        manager = EvaluationManager(cpu_executor=mock_executor)

        def dummy_fn(x, multiplier=1, offset=0):
            return x * multiplier + offset

        X = [1, 2]
        manager.submit_tasks(dummy_fn, X, execute_at="cpu", multiplier=5, offset=10)

        assert len(manager._queue) == 2

        # Check arguments passed to submit
        for call in mock_executor.submit.call_args_list:
            args, kwargs = call
            assert len(args) == 2  # fn, x
            assert kwargs["multiplier"] == 5
            assert kwargs["offset"] == 10

        manager.__del__()

    def test_retrieve_results_completed_tasks(self):
        """Test retrieving results from completed tasks."""
        mock_executor = Mock()

        # Create mock futures with results
        mock_futures = []
        expected_results = [4, 9, 16]

        for result in expected_results:
            mock_future = Mock(spec=Future)
            mock_future.done.return_value = True
            mock_future.result.return_value = result
            mock_futures.append(mock_future)

        mock_executor.submit.side_effect = mock_futures

        manager = EvaluationManager(cpu_executor=mock_executor)

        # Submit tasks
        X_input = [2, 3, 4]
        manager.submit_tasks(square, X_input, execute_at="cpu")

        # Retrieve results
        X_output, F_output = manager.retrieve_results()

        assert X_output == X_input
        assert F_output == expected_results
        assert len(manager._queue) == 0  # Queue should be empty after retrieval

        manager.__del__()

    def test_retrieve_results_mixed_completion(self):
        """Test retrieving results when some tasks are complete and others are not."""
        mock_executor = Mock()

        # Create mock futures - some completed, some not
        mock_future1 = Mock(spec=Future)
        mock_future1.done.return_value = True
        mock_future1.result.return_value = 4

        mock_future2 = Mock(spec=Future)
        mock_future2.done.return_value = False

        mock_future3 = Mock(spec=Future)
        mock_future3.done.return_value = True
        mock_future3.result.return_value = 16

        mock_executor.submit.side_effect = [mock_future1, mock_future2, mock_future3]

        manager = EvaluationManager(cpu_executor=mock_executor)

        # Submit tasks
        X_input = [2, 3, 4]
        manager.submit_tasks(square, X_input, execute_at="cpu")

        # Retrieve results
        X_output, F_output = manager.retrieve_results()

        # Only completed tasks should be returned
        assert len(X_output) == 2
        assert len(F_output) == 2
        # Check that the completed results are correct
        assert X_output == [2, 4]  # Results should be in order of completion
        assert F_output == [4, 16]
        assert len(manager._queue) == 1  # One task still pending

        manager.__del__()

    def test_retrieve_results_cancelled_task(self):
        """Test retrieving results when a task is cancelled."""

        mock_executor = Mock()

        mock_future1 = Mock(spec=Future)
        mock_future1.done.return_value = True
        mock_future1.result.return_value = 4

        mock_future2 = Mock(spec=Future)
        mock_future2.done.return_value = True
        mock_future2.result.side_effect = CancelledError()

        mock_executor.submit.side_effect = [mock_future1, mock_future2]

        manager = EvaluationManager(cpu_executor=mock_executor)

        # Submit tasks
        X_input = [2, 3]
        manager.submit_tasks(square, X_input, execute_at="cpu")

        # Retrieve results
        X_output, F_output = manager.retrieve_results()

        # Only successful task should be returned
        assert len(X_output) == 1
        assert len(F_output) == 1
        assert X_output[0] == 2
        assert F_output[0] == 4

        manager.__del__()

    @patch("ds4mems.evaluation_manager.wait")
    def test_sync(self, mock_wait):
        """Test sync method waits for all futures."""
        mock_executor = Mock()
        mock_futures = [Mock(spec=Future) for _ in range(3)]
        mock_executor.submit.side_effect = mock_futures

        manager = EvaluationManager(cpu_executor=mock_executor)

        # Submit tasks
        manager.submit_tasks(lambda x: x, [1, 2, 3], execute_at="cpu")

        # Call sync
        manager.sync()

        # Verify wait was called with all futures
        mock_wait.assert_called_once()
        called_futures = mock_wait.call_args[0][0]
        assert len(called_futures) == 3
        assert all(f in mock_futures for f in called_futures)

        manager.__del__()

    def test_destructor_shuts_down_executors(self):
        """Test that destructor properly shuts down executors."""
        mock_cpu_executor = Mock()
        mock_mpi_executor = Mock()

        manager = EvaluationManager(
            cpu_executor=mock_cpu_executor, mpi_executor=mock_mpi_executor
        )

        # Call destructor
        manager.__del__()

        # Verify shutdown was called on executors
        mock_cpu_executor.shutdown.assert_called_once_with(wait=False)
        if hasattr(manager.executors, "mpi"):
            mock_mpi_executor.shutdown.assert_called_once_with(wait=False)

    def test_case_insensitive_executor_selection(self):
        """Test that executor selection is case insensitive."""
        mock_executor = Mock()
        mock_future = Mock(spec=Future)
        mock_executor.submit.return_value = mock_future

        manager = EvaluationManager(cpu_executor=mock_executor)

        # Test different cases
        for execute_at in ["CPU", "Cpu", "cpu"]:
            manager.submit_tasks(lambda x: x, [1], execute_at=execute_at)

        assert mock_executor.submit.call_count == 3

        manager.__del__()


class TestHelperFunctions:
    """Test suite for helper functions."""

    @patch.dict("os.environ", {}, clear=True)
    def test_is_running_with_mpi_false(self):
        """Test MPI detection when no MPI environment variables are set."""
        assert is_running_with_mpi() is False

    @patch.dict("os.environ", {"OMPI_COMM_WORLD_RANK": "0"})
    def test_is_running_with_mpi_ompi(self):
        """Test MPI detection with Open MPI environment variable."""
        assert is_running_with_mpi() is True

    @patch.dict("os.environ", {"PMI_RANK": "0"})
    def test_is_running_with_mpi_pmi(self):
        """Test MPI detection with PMI rank environment variable."""
        assert is_running_with_mpi() is True

    @patch.dict("os.environ", {"MV2_COMM_WORLD_RANK": "0"})
    def test_is_running_with_mpi_mvapich(self):
        """Test MPI detection with MVAPICH environment variable."""
        assert is_running_with_mpi() is True


class TestIntegration:
    """Integration tests using real executors."""

    def test_basic_workflow(self):
        """Test basic workflow with real ProcessPoolExecutor."""
        manager = EvaluationManager()

        # Submit tasks
        X_input = [1, 2, 3, 4]
        manager.submit_tasks(square, X_input, execute_at="cpu")

        # Wait for completion
        manager.sync()

        # Retrieve results
        X_output, F_output = manager.retrieve_results()

        assert len(X_output) == len(X_input)
        assert len(F_output) == len(X_input)
        assert all(x in X_input for x in X_output)
        assert all(f == x**2 for x, f in zip(X_output, F_output))

        manager.__del__()

    def test_multiple_batches(self):
        """Test submitting multiple batches of tasks."""
        manager = EvaluationManager()

        # First batch
        manager.submit_tasks(double, [1, 2], execute_at="cpu")

        # Second batch
        manager.submit_tasks(triple, [3, 4], execute_at="cpu")

        # Wait and retrieve all results
        manager.sync()
        X_output, F_output = manager.retrieve_results()

        assert len(X_output) == 4
        assert len(F_output) == 4

        # Check that results match expected outputs
        expected_pairs = [(1, 2), (2, 4), (3, 9), (4, 12)]
        for x, f in zip(X_output, F_output):
            assert (x, f) in expected_pairs

        manager.__del__()

… that submit_tasks() can be called asynchronously with retrieve_results().
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a critical bug in the EvaluationManager.retrieve_results() method and addresses a Python syntax issue in submit_tasks(). The main issue was incorrect indexing logic that caused wrong results to be returned, and the secondary issue was invalid parameter ordering with *args after keyword arguments.

  • Fixed incorrect indexing logic in retrieve_results() that was causing wrong results to be returned
  • Corrected invalid Python syntax by removing *args parameter from submit_tasks()
  • Added thread safety with a queue lock to support parallel operations

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Copy link
Collaborator

@thartland thartland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. I tested this on an example problem. Thanks Wessley!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants