Skip to content

Implement getField CEL function #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions protovalidate/internal/constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def make_timestamp(msg: message.Message) -> celtypes.TimestampType:


def unwrap(msg: message.Message) -> celtypes.Value:
return _field_to_cel(msg, msg.DESCRIPTOR.fields_by_name["value"])
return field_to_cel(msg, msg.DESCRIPTOR.fields_by_name["value"])


_MSG_TYPE_URL_TO_CTOR: dict[str, typing.Callable[..., celtypes.Value]] = {
Expand Down Expand Up @@ -70,7 +70,7 @@ def __init__(self, msg: message.Message):
for field in self.desc.fields:
if field.containing_oneof is not None and not self.msg.HasField(field.name):
continue
self[field.name] = _field_to_cel(self.msg, field)
self[field.name] = field_to_cel(self.msg, field)

def __getitem__(self, name):
field = self.desc.fields_by_name[name]
Expand Down Expand Up @@ -175,7 +175,7 @@ def _map_field_to_cel(msg: message.Message, field: descriptor.FieldDescriptor) -
return _map_field_value_to_cel(_proto_message_get_field(msg, field), field)


def _field_to_cel(msg: message.Message, field: descriptor.FieldDescriptor) -> celtypes.Value:
def field_to_cel(msg: message.Message, field: descriptor.FieldDescriptor) -> celtypes.Value:
if field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
return _repeated_field_to_cel(msg, field)
elif field.message_type is not None and not _proto_message_has_field(msg, field):
Expand Down Expand Up @@ -374,7 +374,7 @@ def add_rule(
rule_cel = None
if rule_field is not None and self._rules is not None:
rule_value = _proto_message_get_field(self._rules, rule_field)
rule_cel = _field_to_cel(self._rules, rule_field)
rule_cel = field_to_cel(self._rules, rule_field)
self._cel.append(
CelRunner(
runner=prog,
Expand Down
15 changes: 15 additions & 0 deletions protovalidate/internal/extra_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,27 @@
from celpy import celtypes

from protovalidate.internal import string_format
from protovalidate.internal.constraints import MessageType, field_to_cel

# See https://html.spec.whatwg.org/multipage/input.html#valid-e-mail-address
_email_regex = re.compile(
r"^[a-zA-Z0-9.!#$%&'*+/=?^_`{|}~-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$"
)


def cel_get_field(message: celtypes.Value, field_name: celtypes.Value) -> celpy.Result:
if not isinstance(message, MessageType):
msg = "invalid argument, expected message"
raise celpy.CELEvalError(msg)
if not isinstance(field_name, celtypes.StringType):
msg = "invalid argument, expected string"
raise celpy.CELEvalError(msg)
if field_name not in message.desc.fields_by_name:
msg = f"no such field: {field_name}"
raise celpy.CELEvalError(msg)
return field_to_cel(message.msg, message.desc.fields_by_name[field_name])


def cel_is_ip(val: celtypes.Value, ver: typing.Optional[celtypes.Value] = None) -> celpy.Result:
"""Return True if the string is an IPv4 or IPv6 address, optionally limited to a specific version.

Expand Down Expand Up @@ -1545,6 +1559,7 @@ def make_extra_funcs(locale: str) -> dict[str, celpy.CELFunction]:
# Missing standard functions
"format": string_fmt.format,
# protovalidate specific functions
"getField": cel_get_field,
"isNan": cel_is_nan,
"isInf": cel_is_inf,
"isIp": cel_is_ip,
Expand Down
Loading