Skip to content

Commit 14f1971

Browse files
committed
additional testing for processingworkerupdates
1 parent 8b51f84 commit 14f1971

File tree

2 files changed

+222
-1
lines changed

2 files changed

+222
-1
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,6 +1016,8 @@ def _update_row(self, job_id: str, updates: dict):
10161016
except Exception as e:
10171017
_log.error(f"Failed to persist row update for job {job_id}: {e}")
10181018

1019+
1020+
10191021

10201022

10211023
class CsvJobDatabase(FullDataFrameJobDatabase):

tests/extra/job_management/test_job_management.py

Lines changed: 220 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
get_job_db,
4141
)
4242

43-
from openeo.extra.job_management._thread_worker import _JobStartTask
43+
from openeo.extra.job_management._thread_worker import _JobManagerWorkerThreadPool
4444
from openeo.rest._testing import OPENEO_BACKEND, DummyBackend, build_capabilities
4545
from openeo.util import rfc3339
4646
from openeo.utils.version import ComparableVersion
@@ -838,6 +838,62 @@ def test_count_by_status(self, tmp_path, db_class):
838838
"running": 2,
839839
}
840840

841+
@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
842+
def test_update_existing_row(self, tmp_path, db_class):
843+
path = tmp_path / "jobs.db"
844+
df = pd.DataFrame({"id": ["job-123"], "status": ["created"], "costs": [0.0]})
845+
db = db_class(path).initialize_from_df(df)
846+
847+
db._update_row("job-123", {"status": "queued", "costs": 42.5})
848+
updated = db.read()
849+
850+
assert updated.loc[0, "status"] == "queued"
851+
assert updated.loc[0, "costs"] == 42.5
852+
853+
@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
854+
def test_update_unknown_job_id(self, tmp_path, db_class, caplog):
855+
path = tmp_path / "jobs.db"
856+
df = pd.DataFrame({"id": ["job-123"], "status": ["created"]})
857+
db = db_class(path).initialize_from_df(df)
858+
859+
db._update_row("nonexistent-job", {"status": "queued"})
860+
861+
assert "not found in database" in caplog.text
862+
# Ensure no updates happened
863+
assert db.read().loc[0, "status"] == "created"
864+
865+
@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
866+
def test_update_duplicate_job_id(self, tmp_path, db_class, caplog):
867+
path = tmp_path / "jobs.db"
868+
df = pd.DataFrame({"id": ["job-123", "job-123"], "status": ["created", "created"]})
869+
db = db_class(path).initialize_from_df(df)
870+
871+
db._update_row("job-123", {"status": "queued"})
872+
873+
assert "Duplicate job ID" in caplog.text
874+
assert set(db.read()["status"]) == {"created"}
875+
876+
@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
877+
def test_update_with_invalid_column(self, tmp_path, db_class, caplog):
878+
path = tmp_path / "jobs.db"
879+
df = pd.DataFrame({"id": ["job-123"], "status": ["created"]})
880+
db = db_class(path).initialize_from_df(df)
881+
882+
db._update_row("job-123", {"not_a_column": "value", "status": "finished"})
883+
884+
assert "Ignoring invalid column 'not_a_column'" in caplog.text
885+
assert db.read().loc[0, "status"] == "finished"
886+
887+
@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
888+
def test_update_with_no_valid_fields(self, tmp_path, db_class):
889+
path = tmp_path / "jobs.db"
890+
df = pd.DataFrame({"id": ["job-123"], "status": ["created"]})
891+
db = db_class(path).initialize_from_df(df)
892+
893+
db._update_row("job-123", {"invalid_field": "value"})
894+
895+
assert db.read().loc[0, "status"] == "created"
896+
841897

842898
class TestCsvJobDatabase:
843899

@@ -1765,5 +1821,168 @@ def test_with_job_manager_parameter_column_map(
17651821
},
17661822
}
17671823

1824+
from unittest.mock import MagicMock, call, patch
17681825

17691826

1827+
class TestProcessThreadWorkerUpdates:
1828+
"""Unit tests for _process_threadworker_updates functionality."""
1829+
1830+
@pytest.fixture
1831+
def job_manager(self):
1832+
return MultiBackendJobManager()
1833+
1834+
@pytest.fixture
1835+
def mock_worker_pool(self):
1836+
pool = MagicMock(spec=_JobManagerWorkerThreadPool)
1837+
return pool
1838+
1839+
@pytest.fixture
1840+
def mock_job_db(self):
1841+
db = MagicMock()
1842+
db._update_row = MagicMock()
1843+
return db
1844+
1845+
@pytest.fixture
1846+
def stats(self):
1847+
return collections.defaultdict(int)
1848+
1849+
@pytest.fixture
1850+
def mock_log(self, mocker):
1851+
return mocker.patch("openeo.extra.job_management._log")
1852+
1853+
def test_basic(self, job_manager, mock_worker_pool, mock_job_db, stats):
1854+
"""Basic success scenario with mixed DB and stats updates"""
1855+
# Setup mock worker results
1856+
result1 = MagicMock(
1857+
job_id="job-123",
1858+
db_update={"status": "queued"},
1859+
stats_update={"jobs_started": 1}
1860+
)
1861+
result2 = MagicMock(
1862+
job_id="job-456",
1863+
db_update={"status": "running"},
1864+
stats_update={"jobs_started": 1}
1865+
)
1866+
mock_worker_pool.process_futures.return_value = [result1, result2]
1867+
1868+
# Execute
1869+
job_manager._process_threadworker_updates(mock_worker_pool, mock_job_db, stats)
1870+
1871+
# Verify DB updates
1872+
mock_job_db._update_row.assert_has_calls([
1873+
call(job_id="job-123", updates={"status": "queued"}),
1874+
call(job_id="job-456", updates={"status": "running"}),
1875+
], any_order=True)
1876+
1877+
# Verify stats aggregation
1878+
assert dict(stats) == {"jobs_started": 2}
1879+
1880+
def test_error_handling(self, job_manager, mock_worker_pool, mock_job_db, stats, caplog):
1881+
"""Failed DB updates should be logged but not break processing"""
1882+
# Setup one successful and one failing result
1883+
result1 = MagicMock(
1884+
job_id="job-123",
1885+
db_update={"status": "running"},
1886+
stats_update={"jobs_started": 1}
1887+
)
1888+
result2 = MagicMock(
1889+
job_id="job-456",
1890+
db_update={"status": "running"},
1891+
stats_update={"jobs_started": 1}
1892+
)
1893+
1894+
# Make second DB update fail
1895+
mock_job_db._update_row.side_effect = [None, Exception("DB connection lost")]
1896+
1897+
mock_worker_pool.process_futures.return_value = [result1, result2]
1898+
1899+
# Execute
1900+
job_manager._process_threadworker_updates(mock_worker_pool, mock_job_db, stats)
1901+
1902+
# Verify partial updates
1903+
assert mock_job_db._update_row.call_count == 2
1904+
assert dict(stats) == {"jobs_started": 1} # Only first result's stats applied
1905+
assert "Failed aggregating the updates for update for job" in caplog.text
1906+
1907+
1908+
1909+
def test_only_stats_cases(self, job_manager, mock_worker_pool, mock_job_db, stats):
1910+
"""Handle empty updates and invalid values"""
1911+
# Setup results with missing/none updates
1912+
result1 = MagicMock(db_update=None, stats_update=None)
1913+
result2 = MagicMock(
1914+
job_id="job-789",
1915+
db_update={},
1916+
stats_update={"start_failed": 1}
1917+
)
1918+
mock_worker_pool.process_futures.return_value = [result1, result2]
1919+
1920+
# Execute
1921+
job_manager._process_threadworker_updates(mock_worker_pool, mock_job_db, stats)
1922+
1923+
# Verify no DB interactions for empty updates
1924+
mock_job_db._update_row.assert_not_called()
1925+
1926+
# Verify stats skipped invalid values
1927+
assert dict(stats) == {"start_failed": 1}
1928+
1929+
def test_concurrency_handling(self, job_manager, mock_worker_pool, mock_job_db, stats):
1930+
"""Verify stats aggregation works with concurrent updates"""
1931+
# Setup 100 identical results
1932+
results = [MagicMock(
1933+
job_id=f"job-{i}",
1934+
db_update={"status": "queued"},
1935+
stats_update={"job start": 1}
1936+
) for i in range(100)]
1937+
1938+
mock_worker_pool.process_futures.return_value = results
1939+
1940+
# Execute
1941+
job_manager._process_threadworker_updates(mock_worker_pool, mock_job_db, stats)
1942+
1943+
# Verify all DB updates processed
1944+
assert mock_job_db._update_row.call_count == 100
1945+
1946+
# Verify stats aggregation
1947+
assert stats["job start"] == 100
1948+
1949+
def test_mixed_scenario(self, job_manager, mock_worker_pool, mock_job_db, stats):
1950+
"""Complex scenario mirroring production conditions"""
1951+
# Setup mixed results
1952+
results = [
1953+
# Successful start
1954+
MagicMock(
1955+
job_id="job-1",
1956+
db_update={"status": "queued"},
1957+
stats_update={"started": 1}
1958+
),
1959+
# Failed start
1960+
MagicMock(
1961+
job_id="job-2",
1962+
db_update={"status": "start failed"},
1963+
stats_update={"errors": 1}
1964+
),
1965+
# Partial update
1966+
MagicMock(
1967+
job_id="job-3",
1968+
db_update={"status": "running"},
1969+
stats_update={}
1970+
),
1971+
# No updates
1972+
MagicMock(db_update=None, stats_update=None)
1973+
]
1974+
1975+
1976+
mock_worker_pool.process_futures.return_value = results
1977+
1978+
# Execute
1979+
job_manager._process_threadworker_updates(mock_worker_pool, mock_job_db, stats)
1980+
1981+
# Verify DB attempts
1982+
assert mock_job_db._update_row.call_count == 3 # job-3 update still attempted
1983+
1984+
# Verify stats aggregation
1985+
assert dict(stats) == {
1986+
"started": 1,
1987+
"errors": 1,
1988+
}

0 commit comments

Comments
 (0)