Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/api/src/flux0_api/types_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ class SessionStream(RootModel[Union[ChunkEventStream, EmittedEventStream]]):


event_creation_params_example: ExampleJson = {
"kind": "message",
"type": "message",
"source": "user",
"content": user_input_content_example,
}
Expand Down
30 changes: 23 additions & 7 deletions packages/nanodb/src/flux0_nanodb/query.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from collections.abc import Mapping as AbcMapping
from dataclasses import dataclass
from typing import Any, List, Literal, Mapping, Union

Expand Down Expand Up @@ -43,27 +44,43 @@ class Or:
# A query filter can be a comparison or a logical combination.
QueryFilter = Union[Comparison, And, Or]

_MISSING = object()


def _get_by_path(obj: Mapping[str, Any], path: str) -> Any:
"""
Resolve a dotted path like 'user.name.first' into nested mappings.
Returns _MISSING if any segment is absent or a non-mapping is encountered.
"""
cur: Any = obj
for part in path.split("."):
if isinstance(cur, AbcMapping) and part in cur:
cur = cur[part]
else:
return _MISSING
return cur


def matches_query(query: QueryFilter, candidate: Mapping[str, Any]) -> bool:
if isinstance(query, Comparison):
path_value = candidate.get(query.path)
path_value = _get_by_path(candidate, query.path)
if path_value is _MISSING:
return False

# Ensure path_value is one of the allowed types.
if not isinstance(path_value, (str, int, float, bool)):
return False # Reject invalid types
return False

if query.op == "$eq":
return path_value == query.value
elif query.op == "$ne":
return path_value != query.value
elif query.op == "$in":
# Ensure that query.value is a list before checking containment
if isinstance(query.value, list):
return path_value in query.value
else:
raise TypeError("$in operator requires a list as the value.")
raise TypeError("$in operator requires a list as the value.")

# Ensure ordered comparisons are done only for int and float.
# Ordered comparisons only for int/float on both sides
if isinstance(path_value, (int, float)) and isinstance(query.value, (int, float)):
if query.op == "$gt":
return path_value > query.value
Expand All @@ -74,7 +91,6 @@ def matches_query(query: QueryFilter, candidate: Mapping[str, Any]) -> bool:
elif query.op == "$lte":
return path_value <= query.value

# If comparison is invalid (e.g., str compared with int), return False.
return False

elif isinstance(query, And):
Expand Down
18 changes: 18 additions & 0 deletions packages/nanodb/tests/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,21 @@ def test_invalid_query_filter_type() -> None:
# Passing a type that is not a valid QueryFilter should raise a TypeError.
with pytest.raises(TypeError):
matches_query(42, {"dummy": "data"}) # type: ignore


def test_nested_path_gt_true() -> None:
query: QueryFilter = Comparison(path="stats.score", op="$gt", value=50)
candidate: Mapping[str, Any] = {"stats": {"score": 60}}
assert matches_query(query, candidate)


def test_nested_path_in_true() -> None:
query: QueryFilter = Comparison(path="profile.age", op="$in", value=[25, 30, 35])
candidate: Mapping[str, Any] = {"profile": {"age": 30}}
assert matches_query(query, candidate)


def test_nested_path_missing_intermediate() -> None:
query: QueryFilter = Comparison(path="user.name", op="$eq", value="Alice")
candidate: Mapping[str, Any] = {"user": None} # non-mapping at intermediate hop
assert not matches_query(query, candidate)