|
1 | 1 | use crate::{Error, Json, Result}; |
| 2 | +use futures_core::Stream; |
| 3 | +use futures_core::stream::BoxStream; |
| 4 | +use futures_util::StreamExt; |
2 | 5 | use geojson::Geometry; |
3 | 6 | use pyo3::prelude::*; |
4 | 7 | use pyo3::{Bound, FromPyObject, PyErr, PyResult, exceptions::PyValueError, types::PyDict}; |
5 | 8 | use pyo3_object_store::AnyObjectStore; |
| 9 | +use serde_json::{Map, Value}; |
6 | 10 | use stac::Bbox; |
7 | | -use stac_api::{Fields, Filter, Items, Search, Sortby}; |
| 11 | +use stac_api::{Client, Fields, Filter, Items, Search, Sortby}; |
8 | 12 | use stac_io::{Format, StacStore}; |
| 13 | +use std::sync::Arc; |
| 14 | +use tokio::{pin, sync::Mutex}; |
| 15 | + |
| 16 | +#[pyclass] |
| 17 | +struct SearchIterator(Arc<Mutex<BoxStream<'static, stac_api::Result<Map<String, Value>>>>>); |
| 18 | + |
| 19 | +#[pymethods] |
| 20 | +impl SearchIterator { |
| 21 | + fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { |
| 22 | + slf |
| 23 | + } |
| 24 | + |
| 25 | + fn __anext__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> { |
| 26 | + let stream = self.0.clone(); |
| 27 | + pyo3_async_runtimes::tokio::future_into_py(py, async move { |
| 28 | + let mut stream = stream.lock().await; |
| 29 | + if let Some(result) = stream.next().await { |
| 30 | + let item = result.map_err(Error::from)?; |
| 31 | + Ok(Some(Json(item))) |
| 32 | + } else { |
| 33 | + Ok(None) |
| 34 | + } |
| 35 | + }) |
| 36 | + } |
| 37 | +} |
| 38 | + |
| 39 | +#[pyfunction] |
| 40 | +#[pyo3(signature = (href, *, intersects=None, ids=None, collections=None, limit=None, bbox=None, datetime=None, include=None, exclude=None, sortby=None, filter=None, query=None, **kwargs))] |
| 41 | +#[allow(clippy::too_many_arguments)] |
| 42 | +pub fn iter_search<'py>( |
| 43 | + py: Python<'py>, |
| 44 | + href: String, |
| 45 | + intersects: Option<StringOrDict>, |
| 46 | + ids: Option<StringOrList>, |
| 47 | + collections: Option<StringOrList>, |
| 48 | + limit: Option<u64>, |
| 49 | + bbox: Option<Vec<f64>>, |
| 50 | + datetime: Option<String>, |
| 51 | + include: Option<StringOrList>, |
| 52 | + exclude: Option<StringOrList>, |
| 53 | + sortby: Option<PySortby<'py>>, |
| 54 | + filter: Option<StringOrDict>, |
| 55 | + query: Option<Bound<'py, PyDict>>, |
| 56 | + kwargs: Option<Bound<'_, PyDict>>, |
| 57 | +) -> PyResult<Bound<'py, PyAny>> { |
| 58 | + let search = build( |
| 59 | + intersects, |
| 60 | + ids, |
| 61 | + collections, |
| 62 | + limit, |
| 63 | + bbox, |
| 64 | + datetime, |
| 65 | + include, |
| 66 | + exclude, |
| 67 | + sortby, |
| 68 | + filter, |
| 69 | + query, |
| 70 | + kwargs, |
| 71 | + )?; |
| 72 | + pyo3_async_runtimes::tokio::future_into_py(py, async move { |
| 73 | + let stream = iter_search_api(href, search).await?; |
| 74 | + Ok(SearchIterator(Arc::new(Mutex::new(Box::pin(stream))))) |
| 75 | + }) |
| 76 | +} |
9 | 77 |
|
10 | 78 | #[pyfunction] |
11 | 79 | #[pyo3(signature = (href, *, intersects=None, ids=None, collections=None, max_items=None, limit=None, bbox=None, datetime=None, include=None, exclude=None, sortby=None, filter=None, query=None, use_duckdb=None, **kwargs))] |
@@ -165,8 +233,32 @@ async fn search_api( |
165 | 233 | search: Search, |
166 | 234 | max_items: Option<usize>, |
167 | 235 | ) -> Result<stac_api::ItemCollection> { |
168 | | - let value = stac_api::client::search(&href, search, max_items).await?; |
169 | | - Ok(value) |
| 236 | + let stream = iter_search_api(href, search).await?; |
| 237 | + pin!(stream); |
| 238 | + let mut items = if let Some(max_items) = max_items { |
| 239 | + Vec::with_capacity(max_items) |
| 240 | + } else { |
| 241 | + Vec::new() |
| 242 | + }; |
| 243 | + while let Some(result) = stream.next().await { |
| 244 | + let item = result?; |
| 245 | + items.push(item); |
| 246 | + if let Some(max_items) = max_items { |
| 247 | + if items.len() >= max_items { |
| 248 | + break; |
| 249 | + } |
| 250 | + } |
| 251 | + } |
| 252 | + Ok(items.into()) |
| 253 | +} |
| 254 | + |
| 255 | +async fn iter_search_api( |
| 256 | + href: String, |
| 257 | + search: Search, |
| 258 | +) -> Result<impl Stream<Item = stac_api::Result<Map<String, Value>>>> { |
| 259 | + let client = Client::new(&href)?; |
| 260 | + let stream = client.search(search).await?; |
| 261 | + Ok(stream) |
170 | 262 | } |
171 | 263 |
|
172 | 264 | /// Creates a [Search] from Python arguments. |
|
0 commit comments