26
26
import requests
27
27
import shapely .geometry
28
28
import collections
29
+ import time
30
+
29
31
30
32
import openeo
31
33
import openeo .extra .job_management
40
42
get_job_db ,
41
43
)
42
44
43
- from openeo .extra .job_management ._thread_worker import _JobManagerWorkerThreadPool
44
45
from openeo .rest ._testing import OPENEO_BACKEND , DummyBackend , build_capabilities
45
46
from openeo .util import rfc3339
46
47
from openeo .utils .version import ComparableVersion
47
48
49
+ from openeo .extra .job_management ._thread_worker import (
50
+ Task ,
51
+ _TaskResult ,
52
+ _JobManagerWorkerThreadPool ,
53
+ )
54
+
48
55
49
56
@pytest .fixture
50
57
def con (requests_mock ) -> openeo .Connection :
@@ -83,6 +90,26 @@ def sleep_mock():
83
90
with mock .patch ("time.sleep" ) as sleep :
84
91
yield sleep
85
92
93
+ class DummyTask (Task ):
94
+ """
95
+ A Task that simply sleeps and then returns a predetermined _TaskResult.
96
+ """
97
+ def __init__ (self , job_id , db_update , stats_update , delay = 0.0 ):
98
+ self .job_id = job_id
99
+ self ._db_update = db_update or {}
100
+ self ._stats_update = stats_update or {}
101
+ self ._delay = delay
102
+
103
+ def execute (self ) -> _TaskResult :
104
+ if self ._delay :
105
+ time .sleep (self ._delay )
106
+ return _TaskResult (
107
+ job_id = self .job_id ,
108
+ db_update = self ._db_update ,
109
+ stats_update = self ._stats_update ,
110
+ )
111
+
112
+
86
113
87
114
class TestMultiBackendJobManager :
88
115
@@ -96,6 +123,7 @@ def job_manager(self, job_manager_root_dir, dummy_backend_foo, dummy_backend_bar
96
123
manager .add_backend ("foo" , connection = dummy_backend_foo .connection )
97
124
manager .add_backend ("bar" , connection = dummy_backend_bar .connection )
98
125
return manager
126
+
99
127
100
128
@staticmethod
101
129
def _create_year_job (row , connection , ** kwargs ):
@@ -724,6 +752,50 @@ def get_status(job_id, current_status):
724
752
filled_running_start_time = final_df .iloc [0 ]["running_start_time" ]
725
753
assert isinstance (rfc3339 .parse_datetime (filled_running_start_time ), datetime .datetime )
726
754
755
+
756
+
757
+
758
+ def test_process_threadworker_updates (self , job_manager , tmp_path ):
759
+
760
+ csv_path = tmp_path / "jobs.csv"
761
+ df = pd .DataFrame ([
762
+ {"id" : "job-1" , "status" : "created" },
763
+ {"id" : "job-2" , "status" : "created" },
764
+ ])
765
+ job_db = CsvJobDatabase (csv_path ).initialize_from_df (df )
766
+
767
+ pool = _JobManagerWorkerThreadPool (max_workers = 2 )
768
+
769
+ # Submit two dummy tasks with different delays and updates
770
+ t1 = DummyTask ("job-1" , {"status" : "done" }, {"a" : 1 }, delay = 0.05 )
771
+ t2 = DummyTask ("job-2" , {"status" : "failed" }, {"b" : 2 }, delay = 0.1 )
772
+ pool .submit_task (t1 )
773
+ pool .submit_task (t2 )
774
+
775
+ # Wait for all futures to be done
776
+ # We access the internal list of (future, task) pairs to check .done()
777
+ start = time .time ()
778
+ timeout = 2.0
779
+ while time .time () - start < timeout :
780
+ pairs = list (pool ._future_task_pairs )
781
+ if all (future .done () for future , _ in pairs ):
782
+ break
783
+ time .sleep (0.01 )
784
+ else :
785
+ pytest .skip ("Tasks did not complete within timeout" )
786
+
787
+ # Now invoke the real update loop
788
+ stats = collections .defaultdict (int )
789
+ job_manager ._process_threadworker_updates (pool , job_db , stats )
790
+
791
+ # Check that the in-memory database was updated
792
+ df = job_db .df
793
+ assert df .loc [df .id == "job-1" , "status" ].iloc [0 ] == "done"
794
+ assert df .loc [df .id == "job-2" , "status" ].iloc [0 ] == "failed"
795
+
796
+ # And that our stats were aggregated
797
+ assert stats == {"a" : 1 , "b" : 2 }
798
+
727
799
728
800
JOB_DB_DF_BASICS = pd .DataFrame (
729
801
{
@@ -1821,168 +1893,6 @@ def test_with_job_manager_parameter_column_map(
1821
1893
},
1822
1894
}
1823
1895
1824
- from unittest .mock import MagicMock , call , patch
1825
-
1826
-
1827
- class TestProcessThreadWorkerUpdates :
1828
- """Unit tests for _process_threadworker_updates functionality."""
1829
-
1830
- @pytest .fixture
1831
- def job_manager (self ):
1832
- return MultiBackendJobManager ()
1833
-
1834
- @pytest .fixture
1835
- def mock_worker_pool (self ):
1836
- pool = MagicMock (spec = _JobManagerWorkerThreadPool )
1837
- return pool
1838
1896
1839
- @pytest .fixture
1840
- def mock_job_db (self ):
1841
- db = MagicMock ()
1842
- db ._update_row = MagicMock ()
1843
- return db
1844
1897
1845
- @pytest .fixture
1846
- def stats (self ):
1847
- return collections .defaultdict (int )
1848
1898
1849
- @pytest .fixture
1850
- def mock_log (self , mocker ):
1851
- return mocker .patch ("openeo.extra.job_management._log" )
1852
-
1853
- def test_basic (self , job_manager , mock_worker_pool , mock_job_db , stats ):
1854
- """Basic success scenario with mixed DB and stats updates"""
1855
- # Setup mock worker results
1856
- result1 = MagicMock (
1857
- job_id = "job-123" ,
1858
- db_update = {"status" : "queued" },
1859
- stats_update = {"jobs_started" : 1 }
1860
- )
1861
- result2 = MagicMock (
1862
- job_id = "job-456" ,
1863
- db_update = {"status" : "running" },
1864
- stats_update = {"jobs_started" : 1 }
1865
- )
1866
- mock_worker_pool .process_futures .return_value = [result1 , result2 ]
1867
-
1868
- # Execute
1869
- job_manager ._process_threadworker_updates (mock_worker_pool , mock_job_db , stats )
1870
-
1871
- # Verify DB updates
1872
- mock_job_db ._update_row .assert_has_calls ([
1873
- call (job_id = "job-123" , updates = {"status" : "queued" }),
1874
- call (job_id = "job-456" , updates = {"status" : "running" }),
1875
- ], any_order = True )
1876
-
1877
- # Verify stats aggregation
1878
- assert dict (stats ) == {"jobs_started" : 2 }
1879
-
1880
- def test_error_handling (self , job_manager , mock_worker_pool , mock_job_db , stats , caplog ):
1881
- """Failed DB updates should be logged but not break processing"""
1882
- # Setup one successful and one failing result
1883
- result1 = MagicMock (
1884
- job_id = "job-123" ,
1885
- db_update = {"status" : "running" },
1886
- stats_update = {"jobs_started" : 1 }
1887
- )
1888
- result2 = MagicMock (
1889
- job_id = "job-456" ,
1890
- db_update = {"status" : "running" },
1891
- stats_update = {"jobs_started" : 1 }
1892
- )
1893
-
1894
- # Make second DB update fail
1895
- mock_job_db ._update_row .side_effect = [None , Exception ("DB connection lost" )]
1896
-
1897
- mock_worker_pool .process_futures .return_value = [result1 , result2 ]
1898
-
1899
- # Execute
1900
- job_manager ._process_threadworker_updates (mock_worker_pool , mock_job_db , stats )
1901
-
1902
- # Verify partial updates
1903
- assert mock_job_db ._update_row .call_count == 2
1904
- assert dict (stats ) == {"jobs_started" : 1 } # Only first result's stats applied
1905
- assert "Failed aggregating the updates for update for job" in caplog .text
1906
-
1907
-
1908
-
1909
- def test_only_stats_cases (self , job_manager , mock_worker_pool , mock_job_db , stats ):
1910
- """Handle empty updates and invalid values"""
1911
- # Setup results with missing/none updates
1912
- result1 = MagicMock (db_update = None , stats_update = None )
1913
- result2 = MagicMock (
1914
- job_id = "job-789" ,
1915
- db_update = {},
1916
- stats_update = {"start_failed" : 1 }
1917
- )
1918
- mock_worker_pool .process_futures .return_value = [result1 , result2 ]
1919
-
1920
- # Execute
1921
- job_manager ._process_threadworker_updates (mock_worker_pool , mock_job_db , stats )
1922
-
1923
- # Verify no DB interactions for empty updates
1924
- mock_job_db ._update_row .assert_not_called ()
1925
-
1926
- # Verify stats skipped invalid values
1927
- assert dict (stats ) == {"start_failed" : 1 }
1928
-
1929
- def test_concurrency_handling (self , job_manager , mock_worker_pool , mock_job_db , stats ):
1930
- """Verify stats aggregation works with concurrent updates"""
1931
- # Setup 100 identical results
1932
- results = [MagicMock (
1933
- job_id = f"job-{ i } " ,
1934
- db_update = {"status" : "queued" },
1935
- stats_update = {"job start" : 1 }
1936
- ) for i in range (100 )]
1937
-
1938
- mock_worker_pool .process_futures .return_value = results
1939
-
1940
- # Execute
1941
- job_manager ._process_threadworker_updates (mock_worker_pool , mock_job_db , stats )
1942
-
1943
- # Verify all DB updates processed
1944
- assert mock_job_db ._update_row .call_count == 100
1945
-
1946
- # Verify stats aggregation
1947
- assert stats ["job start" ] == 100
1948
-
1949
- def test_mixed_scenario (self , job_manager , mock_worker_pool , mock_job_db , stats ):
1950
- """Complex scenario mirroring production conditions"""
1951
- # Setup mixed results
1952
- results = [
1953
- # Successful start
1954
- MagicMock (
1955
- job_id = "job-1" ,
1956
- db_update = {"status" : "queued" },
1957
- stats_update = {"started" : 1 }
1958
- ),
1959
- # Failed start
1960
- MagicMock (
1961
- job_id = "job-2" ,
1962
- db_update = {"status" : "start failed" },
1963
- stats_update = {"errors" : 1 }
1964
- ),
1965
- # Partial update
1966
- MagicMock (
1967
- job_id = "job-3" ,
1968
- db_update = {"status" : "running" },
1969
- stats_update = {}
1970
- ),
1971
- # No updates
1972
- MagicMock (db_update = None , stats_update = None )
1973
- ]
1974
-
1975
-
1976
- mock_worker_pool .process_futures .return_value = results
1977
-
1978
- # Execute
1979
- job_manager ._process_threadworker_updates (mock_worker_pool , mock_job_db , stats )
1980
-
1981
- # Verify DB attempts
1982
- assert mock_job_db ._update_row .call_count == 3 # job-3 update still attempted
1983
-
1984
- # Verify stats aggregation
1985
- assert dict (stats ) == {
1986
- "started" : 1 ,
1987
- "errors" : 1 ,
1988
- }
0 commit comments