Skip to content

Commit 2e04f3e

Browse files
STACAPIJobDatabase.get_by_status() sorts df on index #707 (#734)
* STACAPIJobDatabase.get_by_status() sorts df on index #707 * STACAPIJobDatabase: item ID is no longer derived from pandas.DataFrame index, but rather from item_id column; which is created if it doesn't exist #707 * created STACAPIJobDatabase._normalize_df() method #707 * update changelog for #707
1 parent 79127f1 commit 2e04f3e

File tree

3 files changed

+38
-16
lines changed

3 files changed

+38
-16
lines changed

CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414
### Removed
1515

1616
### Fixed
17-
17+
- `STACAPIJobDatabase.get_by_status()` now always returns a `pandas.DataFrame` with an index compatible with `MultiBackendJobManager`. ([#707](https://github.com/Open-EO/openeo-python-client/issues/707))
1818

1919
## [0.39.1] - 2025-02-26
2020

openeo/extra/job_management/stac_job_db.py

+20-9
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import concurrent.futures
22
import datetime
33
import logging
4-
from typing import Iterable, List
4+
from typing import Iterable, List, Optional
55

66
import geopandas as gpd
77
import numpy as np
@@ -53,6 +53,17 @@ def __init__(
5353

5454
def exists(self) -> bool:
5555
return any(c.id == self.collection_id for c in self.client.get_collections())
56+
57+
def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
58+
"""
59+
Normalize the given dataframe to be compatible with :py:class:`MultiBackendJobManager`
60+
by adding the default columns and setting the index.
61+
"""
62+
df = MultiBackendJobManager._normalize_df(df)
63+
# If the user doesn't specify the item_id column, we will use the index.
64+
if "item_id" not in df.columns:
65+
df = df.reset_index(names=["item_id"])
66+
return df
5667

5768
def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
5869
"""
@@ -83,15 +94,15 @@ def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
8394
raise FileExistsError(f"Job database {self!r} already exists.")
8495
elif on_exists == "append":
8596
existing_df = self.get_by_status([])
86-
df = MultiBackendJobManager._normalize_df(df)
97+
df = self._normalize_df(df)
8798
df = pd.concat([existing_df, df], ignore_index=True).replace({np.nan: None})
8899
self.persist(df)
89100
return self
90101

91102
else:
92103
raise ValueError(f"Invalid on_exists={on_exists!r}")
93104

94-
df = MultiBackendJobManager._normalize_df(df)
105+
df = self._normalize_df(df)
95106
self.persist(df)
96107
# Return self to allow chaining with constructor.
97108
return self
@@ -105,7 +116,6 @@ def series_from(self, item: pystac.Item) -> pd.Series:
105116
"""
106117
item_dict = item.to_dict()
107118
item_id = item_dict["id"]
108-
dt = item_dict["properties"]["datetime"]
109119

110120
return pd.Series(item_dict["properties"], name=item_id)
111121

@@ -118,6 +128,7 @@ def item_from(self, series: pd.Series) -> pystac.Item:
118128
:return: pystac.Item
119129
"""
120130
series_dict = series.to_dict()
131+
item_id = series_dict.pop("item_id")
121132
item_dict = {}
122133
item_dict.setdefault("stac_version", pystac.get_stac_version())
123134
item_dict.setdefault("type", "Feature")
@@ -139,7 +150,7 @@ def item_from(self, series: pd.Series) -> pystac.Item:
139150
item_dict["geometry"] = None
140151

141152
# from_dict handles associating any Links and Assets with the Item
142-
item_dict["id"] = series.name
153+
item_dict["id"] = item_id
143154
item = pystac.Item.from_dict(item_dict)
144155
if self.has_geometry:
145156
item.bbox = shape(series[self.geometry_column]).bounds
@@ -151,13 +162,13 @@ def count_by_status(self, statuses: Iterable[str] = ()) -> dict:
151162
if isinstance(statuses, str):
152163
statuses = {statuses}
153164
statuses = set(statuses)
154-
items = self.get_by_status(statuses, max=200)
165+
items = self.get_by_status(statuses)
155166
if items is None:
156167
return {k: 0 for k in statuses}
157168
else:
158169
return items["status"].value_counts().to_dict()
159170

160-
def get_by_status(self, statuses: Iterable[str], max=None) -> pd.DataFrame:
171+
def get_by_status(self, statuses: Iterable[str], max: Optional[int] = None) -> pd.DataFrame:
161172
if isinstance(statuses, str):
162173
statuses = {statuses}
163174
statuses = set(statuses)
@@ -172,10 +183,10 @@ def get_by_status(self, statuses: Iterable[str], max=None) -> pd.DataFrame:
172183

173184
series = [self.series_from(item) for item in search_results.items()]
174185

175-
df = pd.DataFrame(series)
186+
df = pd.DataFrame(series).reset_index(names=["item_id"])
176187
if len(series) == 0:
177188
# TODO: What if default columns are overwritten by the user?
178-
df = MultiBackendJobManager._normalize_df(
189+
df = self._normalize_df(
179190
df
180191
) # Even for an empty dataframe the default columns are required
181192
return df

tests/extra/job_management/test_stac_job_db.py

+17-6
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def dummy_dataframe() -> pd.DataFrame:
6969
def normalized_dummy_dataframe() -> pd.DataFrame:
7070
return pd.DataFrame(
7171
{
72+
"item_id": [0],
7273
"no": [1],
7374
"geometry": [2],
7475
"here": [3],
@@ -87,13 +88,14 @@ def normalized_dummy_dataframe() -> pd.DataFrame:
8788

8889
@pytest.fixture
8990
def another_dummy_dataframe() -> pd.DataFrame:
90-
return pd.DataFrame({"no": [4], "geometry": [5], "here": [6]})
91+
return pd.DataFrame({"item_id": [1], "no": [4], "geometry": [5], "here": [6]})
9192

9293

9394
@pytest.fixture
9495
def normalized_merged_dummy_dataframe() -> pd.DataFrame:
9596
return pd.DataFrame(
9697
{
98+
"item_id": [0, 1],
9799
"no": [1, 4],
98100
"geometry": [2, 5],
99101
"here": [3, 6],
@@ -126,6 +128,7 @@ def dummy_geodataframe() -> gpd.GeoDataFrame:
126128
def normalized_dummy_geodataframe() -> pd.DataFrame:
127129
return pd.DataFrame(
128130
{
131+
"item_id": [0],
129132
"there": [1],
130133
"is": [2],
131134
"geometry": [{"type": "Point", "coordinates": (1.0, 1.0)}],
@@ -174,13 +177,20 @@ def dummy_stac_item_geometry() -> pystac.Item:
174177

175178
@pytest.fixture
176179
def dummy_series() -> pd.Series:
177-
return pd.Series({"datetime": pystac.utils.datetime_to_str(FAKE_NOW), "some_property": "value"}, name="test")
180+
return pd.Series(
181+
{"item_id": "test", "datetime": pystac.utils.datetime_to_str(FAKE_NOW), "some_property": "value"}, name="test"
182+
)
178183

179184

185+
@pytest.fixture
186+
def dummy_series_no_item_id() -> pd.Series:
187+
return pd.Series({"datetime": pystac.utils.datetime_to_str(FAKE_NOW), "some_property": "value"}, name="test")
188+
180189
@pytest.fixture
181190
def dummy_series_geometry() -> pd.Series:
182191
return pd.Series(
183192
{
193+
"item_id": "test",
184194
"datetime": pystac.utils.datetime_to_str(FAKE_NOW),
185195
"some_property": "value",
186196
"geometry": {
@@ -203,6 +213,7 @@ def patch_datetime_now():
203213
def bulk_dataframe():
204214
return pd.DataFrame(
205215
{
216+
"item_id": [f"test-{i}" for i in range(10)],
206217
"some_property": [f"value-{i}" for i in range(10)],
207218
},
208219
index=[i for i in range(10)],
@@ -259,8 +270,8 @@ def test_initialize_from_df_with_geometry(
259270
assert job_db_not_exists.has_geometry == True
260271
assert job_db_not_exists.geometry_column == "geometry"
261272

262-
def test_series_from(self, job_db_exists, dummy_series, dummy_stac_item):
263-
pdt.assert_series_equal(job_db_exists.series_from(dummy_stac_item), dummy_series)
273+
def test_series_from(self, job_db_exists, dummy_series_no_item_id, dummy_stac_item):
274+
pdt.assert_series_equal(job_db_exists.series_from(dummy_stac_item), dummy_series_no_item_id)
264275

265276
def test_item_from(self, patch_datetime_now, job_db_exists, dummy_series, dummy_stac_item):
266277
item = job_db_exists.item_from(dummy_series)
@@ -298,10 +309,11 @@ def test_get_by_status_result(self, job_db_exists):
298309
df,
299310
pd.DataFrame(
300311
{
312+
"item_id": ["test"],
301313
"datetime": [pystac.utils.datetime_to_str(FAKE_NOW)],
302314
"some_property": ["value"],
303315
},
304-
index=["test"],
316+
index=[0],
305317
),
306318
)
307319

@@ -326,7 +338,6 @@ def handle_row(series):
326338
mock_requests_post.reason = "OK"
327339

328340
job_db_exists.persist(bulk_dataframe)
329-
# job_db_exists._upload_items_bulk(collection_id=job_db_exists.collection_id, items=items)
330341

331342
mock_requests_post.assert_called_once()
332343

0 commit comments

Comments
 (0)