-
Notifications
You must be signed in to change notification settings - Fork 82
refactor(clp-package): Simplify StrEnum and Path serialization via Annotated serializers. #1417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
17ff4c9
4c01485
3e484e2
d441993
68a3a06
fbe91b0
e829a86
79b8043
354b019
e60b39a
3372a4b
68740f5
420f30f
6a584e0
79c12e8
bfca7d7
8673afd
cc9a1f4
96ac165
04929ea
57b2789
0b6f43c
3a57db7
e487f82
4cac503
da5e3e7
bfa44c6
f6abd55
c80de48
1f9bfc9
ca09544
60da510
bd07d8e
0a7b157
78c015b
7b3e9d7
8e587e3
d1a36d6
a6b6c8d
80a7164
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
Field, | ||
field_validator, | ||
model_validator, | ||
PlainSerializer, | ||
PrivateAttr, | ||
) | ||
from strenum import KebabCaseStrEnum, LowercaseStrEnum | ||
|
@@ -21,6 +22,7 @@ | |
read_yaml_config_file, | ||
validate_path_could_be_dir, | ||
) | ||
from .serialization_utils import serialize_path, serialize_str_enum | ||
|
||
# Constants | ||
# Component names | ||
|
@@ -98,6 +100,8 @@ | |
CLP_QUEUE_PASS_ENV_VAR_NAME = "CLP_QUEUE_PASS" | ||
CLP_REDIS_PASS_ENV_VAR_NAME = "CLP_REDIS_PASS" | ||
|
||
# Serializer | ||
StrEnumSerializer = PlainSerializer(serialize_str_enum) | ||
# Generic types | ||
NonEmptyStr = Annotated[str, Field(min_length=1)] | ||
PositiveFloat = Annotated[float, Field(gt=0)] | ||
|
@@ -106,6 +110,7 @@ | |
# TODO: Replace this with pydantic_extra_types.domain.DomainStr. | ||
DomainStr = NonEmptyStr | ||
Port = Annotated[int, Field(gt=0, lt=2**16)] | ||
SerializablePath = Annotated[pathlib.Path, PlainSerializer(serialize_path)] | ||
ZstdCompressionLevel = Annotated[int, Field(ge=1, le=19)] | ||
|
||
|
||
|
@@ -114,17 +119,26 @@ class StorageEngine(KebabCaseStrEnum): | |
CLP_S = auto() | ||
|
||
|
||
StorageEngineStr = Annotated[StorageEngine, StrEnumSerializer] | ||
|
||
|
||
class DatabaseEngine(KebabCaseStrEnum): | ||
MARIADB = auto() | ||
MYSQL = auto() | ||
|
||
|
||
DatabaseEngineStr = Annotated[DatabaseEngine, StrEnumSerializer] | ||
|
||
|
||
class QueryEngine(KebabCaseStrEnum): | ||
CLP = auto() | ||
CLP_S = auto() | ||
PRESTO = auto() | ||
|
||
|
||
QueryEngineStr = Annotated[QueryEngine, StrEnumSerializer] | ||
|
||
|
||
class StorageType(LowercaseStrEnum): | ||
FS = auto() | ||
S3 = auto() | ||
|
@@ -137,9 +151,12 @@ class AwsAuthType(LowercaseStrEnum): | |
ec2 = auto() | ||
|
||
|
||
AwsAuthTypeStr = Annotated[AwsAuthType, StrEnumSerializer] | ||
|
||
|
||
class Package(BaseModel): | ||
storage_engine: StorageEngine = StorageEngine.CLP | ||
query_engine: QueryEngine = QueryEngine.CLP | ||
storage_engine: StorageEngineStr = StorageEngine.CLP | ||
query_engine: QueryEngineStr = QueryEngine.CLP | ||
|
||
@model_validator(mode="after") | ||
def validate_query_engine_package_compatibility(self): | ||
|
@@ -163,15 +180,9 @@ def validate_query_engine_package_compatibility(self): | |
|
||
return self | ||
|
||
def dump_to_primitive_dict(self): | ||
d = self.model_dump() | ||
d["storage_engine"] = d["storage_engine"].value | ||
d["query_engine"] = d["query_engine"].value | ||
return d | ||
|
||
|
||
class Database(BaseModel): | ||
type: DatabaseEngine = DatabaseEngine.MARIADB | ||
type: DatabaseEngineStr = DatabaseEngine.MARIADB | ||
host: DomainStr = "localhost" | ||
port: Port = 3306 | ||
name: NonEmptyStr = "clp-db" | ||
|
@@ -232,7 +243,6 @@ def get_clp_connection_params_and_type(self, disable_localhost_socket_connection | |
|
||
def dump_to_primitive_dict(self): | ||
d = self.model_dump(exclude={"username", "password"}) | ||
d["type"] = d["type"].value | ||
return d | ||
|
||
def load_credentials_from_file(self, credentials_file_path: pathlib.Path): | ||
|
@@ -360,12 +370,7 @@ class S3Credentials(BaseModel): | |
|
||
|
||
class AwsAuthentication(BaseModel): | ||
type: Literal[ | ||
AwsAuthType.credentials.value, | ||
AwsAuthType.profile.value, | ||
AwsAuthType.env_vars.value, | ||
AwsAuthType.ec2.value, | ||
] | ||
type: AwsAuthTypeStr | ||
profile: Optional[NonEmptyStr] = None | ||
credentials: Optional[S3Credentials] = None | ||
|
||
|
@@ -408,13 +413,10 @@ class S3IngestionConfig(BaseModel): | |
type: Literal[StorageType.S3.value] = StorageType.S3.value | ||
aws_authentication: AwsAuthentication | ||
|
||
def dump_to_primitive_dict(self): | ||
return self.model_dump() | ||
|
||
|
||
class FsStorage(BaseModel): | ||
type: Literal[StorageType.FS.value] = StorageType.FS.value | ||
directory: pathlib.Path | ||
directory: SerializablePath | ||
|
||
@field_validator("directory", mode="before") | ||
@classmethod | ||
|
@@ -425,16 +427,11 @@ def validate_directory(cls, value): | |
def make_config_paths_absolute(self, clp_home: pathlib.Path): | ||
self.directory = make_config_path_absolute(clp_home, self.directory) | ||
|
||
def dump_to_primitive_dict(self): | ||
d = self.model_dump() | ||
d["directory"] = str(d["directory"]) | ||
return d | ||
|
||
|
||
class S3Storage(BaseModel): | ||
type: Literal[StorageType.S3.value] = StorageType.S3.value | ||
s3_config: S3Config | ||
staging_directory: pathlib.Path | ||
staging_directory: SerializablePath | ||
|
||
@field_validator("staging_directory", mode="before") | ||
@classmethod | ||
|
@@ -455,30 +452,25 @@ def validate_key_prefix(cls, value): | |
def make_config_paths_absolute(self, clp_home: pathlib.Path): | ||
self.staging_directory = make_config_path_absolute(clp_home, self.staging_directory) | ||
|
||
def dump_to_primitive_dict(self): | ||
d = self.model_dump() | ||
d["staging_directory"] = str(d["staging_directory"]) | ||
return d | ||
|
||
|
||
class FsIngestionConfig(FsStorage): | ||
directory: pathlib.Path = pathlib.Path("/") | ||
directory: SerializablePath = pathlib.Path("/") | ||
|
||
|
||
class ArchiveFsStorage(FsStorage): | ||
directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "archives" | ||
directory: SerializablePath = CLP_DEFAULT_DATA_DIRECTORY_PATH / "archives" | ||
|
||
|
||
class StreamFsStorage(FsStorage): | ||
directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "streams" | ||
directory: SerializablePath = CLP_DEFAULT_DATA_DIRECTORY_PATH / "streams" | ||
|
||
|
||
class ArchiveS3Storage(S3Storage): | ||
staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-archives" | ||
staging_directory: SerializablePath = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-archives" | ||
|
||
|
||
class StreamS3Storage(S3Storage): | ||
staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-streams" | ||
staging_directory: SerializablePath = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-streams" | ||
|
||
|
||
def _get_directory_from_storage_config( | ||
|
@@ -520,11 +512,6 @@ def set_directory(self, directory: pathlib.Path): | |
def get_directory(self) -> pathlib.Path: | ||
return _get_directory_from_storage_config(self.storage) | ||
|
||
def dump_to_primitive_dict(self): | ||
d = self.model_dump() | ||
d["storage"] = self.storage.dump_to_primitive_dict() | ||
return d | ||
|
||
|
||
class StreamOutput(BaseModel): | ||
storage: Union[StreamFsStorage, StreamS3Storage] = StreamFsStorage() | ||
|
@@ -536,11 +523,6 @@ def set_directory(self, directory: pathlib.Path): | |
def get_directory(self) -> pathlib.Path: | ||
return _get_directory_from_storage_config(self.storage) | ||
|
||
def dump_to_primitive_dict(self): | ||
d = self.model_dump() | ||
d["storage"] = self.storage.dump_to_primitive_dict() | ||
return d | ||
|
||
|
||
class WebUi(BaseModel): | ||
host: DomainStr = "localhost" | ||
|
@@ -590,24 +572,26 @@ class CLPConfig(BaseModel): | |
query_worker: QueryWorker = QueryWorker() | ||
webui: WebUi = WebUi() | ||
garbage_collector: GarbageCollector = GarbageCollector() | ||
credentials_file_path: pathlib.Path = CLP_DEFAULT_CREDENTIALS_FILE_PATH | ||
credentials_file_path: SerializablePath = CLP_DEFAULT_CREDENTIALS_FILE_PATH | ||
|
||
presto: Optional[Presto] = None | ||
|
||
archive_output: ArchiveOutput = ArchiveOutput() | ||
stream_output: StreamOutput = StreamOutput() | ||
data_directory: pathlib.Path = pathlib.Path("var") / "data" | ||
logs_directory: pathlib.Path = pathlib.Path("var") / "log" | ||
aws_config_directory: Optional[pathlib.Path] = None | ||
data_directory: SerializablePath = pathlib.Path("var") / "data" | ||
logs_directory: SerializablePath = pathlib.Path("var") / "log" | ||
aws_config_directory: Optional[SerializablePath] = None | ||
sitaowang1998 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
_container_image_id_path: pathlib.Path = PrivateAttr( | ||
_container_image_id_path: SerializablePath = PrivateAttr( | ||
default=CLP_PACKAGE_CONTAINER_IMAGE_ID_PATH | ||
) | ||
_version_file_path: pathlib.Path = PrivateAttr(default=CLP_VERSION_FILE_PATH) | ||
_version_file_path: SerializablePath = PrivateAttr(default=CLP_VERSION_FILE_PATH) | ||
|
||
@field_validator("aws_config_directory") | ||
@classmethod | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we changed the above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I checked other references of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then we have to manually serialize them in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. Sorry if i may have confused you, i meant to say
-> the |
||
def expand_profile_user_home(cls, value: Optional[pathlib.Path]): | ||
def expand_profile_user_home( | ||
cls, value: Optional[SerializablePath] | ||
) -> Optional[SerializablePath]: | ||
if value is not None: | ||
value = value.expanduser() | ||
return value | ||
|
@@ -693,7 +677,7 @@ def validate_aws_config_dir(self): | |
auth_configs.append(self.stream_output.storage.s3_config.aws_authentication) | ||
|
||
for auth in auth_configs: | ||
if AwsAuthType.profile.value == auth.type: | ||
if AwsAuthType.profile == auth.type: | ||
profile_auth_used = True | ||
break | ||
|
||
|
@@ -735,27 +719,14 @@ def get_runnable_components(self) -> Set[str]: | |
|
||
def dump_to_primitive_dict(self): | ||
custom_serialized_fields = { | ||
"package", | ||
"database", | ||
"queue", | ||
"redis", | ||
"logs_input", | ||
"archive_output", | ||
"stream_output", | ||
} | ||
d = self.model_dump(exclude=custom_serialized_fields) | ||
for key in custom_serialized_fields: | ||
d[key] = getattr(self, key).dump_to_primitive_dict() | ||
|
||
# Turn paths into primitive strings | ||
d["credentials_file_path"] = str(self.credentials_file_path) | ||
d["data_directory"] = str(self.data_directory) | ||
d["logs_directory"] = str(self.logs_directory) | ||
if self.aws_config_directory is not None: | ||
d["aws_config_directory"] = str(self.aws_config_directory) | ||
else: | ||
d["aws_config_directory"] = None | ||
|
||
return d | ||
|
||
@model_validator(mode="after") | ||
|
@@ -772,22 +743,12 @@ def validate_presto_config(self): | |
class WorkerConfig(BaseModel): | ||
package: Package = Package() | ||
archive_output: ArchiveOutput = ArchiveOutput() | ||
data_directory: pathlib.Path = CLPConfig().data_directory | ||
data_directory: SerializablePath = CLPConfig().data_directory | ||
|
||
# Only needed by query workers. | ||
stream_output: StreamOutput = StreamOutput() | ||
stream_collection_name: str = ResultsCache().stream_collection_name | ||
|
||
def dump_to_primitive_dict(self): | ||
d = self.model_dump() | ||
d["archive_output"] = self.archive_output.dump_to_primitive_dict() | ||
|
||
# Turn paths into primitive strings | ||
d["data_directory"] = str(self.data_directory) | ||
sitaowang1998 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
d["stream_output"] = self.stream_output.dump_to_primitive_dict() | ||
|
||
return d | ||
|
||
|
||
def get_components_for_target(target: str) -> Set[str]: | ||
if target in TARGET_TO_COMPONENTS: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
import pathlib | ||
|
||
from strenum import StrEnum | ||
|
||
|
||
def serialize_str_enum(member: StrEnum) -> str: | ||
""" | ||
Serializes a `strenum.StrEnum` member to its underlying value. | ||
:param member: | ||
:return: The underlying string value of the enum member. | ||
""" | ||
return member.value | ||
sitaowang1998 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
def serialize_path(path: pathlib.Path) -> str: | ||
""" | ||
Serializes a `pathlib.Path` to its string representation. | ||
:param path: | ||
:return: The string representation of the path. | ||
""" | ||
return str(path) | ||
sitaowang1998 marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in a separate PR, we shall investigate
specifying private attributes like
username
as https://docs.pydantic.dev/latest/api/fields/#pydantic.fields.PrivateAttrand
specifying secrets like
password
as https://docs.pydantic.dev/2.2/usage/types/secrets/so we can completely get rid of such
dump_to_primitive_dict()
methods.@coderabbitai create an issue to track
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Length of output: 42
@junhaoliao I've created issue #1440 to track this investigation: #1440
The issue covers exploring Pydantic's
PrivateAttr
for private attributes likeusername
andSecretStr
for sensitive fields likepassword
, which would allow the removal ofdump_to_primitive_dict()
methods throughout the codebase.🧬 Code Graph Analysis Results
Filename: components/clp-py-utils/clp_py_utils/clp_config.py
Line numbers: N/A (summary)
Summary: Private attributes used for config paths
Summary: dump_to_primitive_dict methods (exclusions to avoid leaking sensitive data)
Summary: Related credential loading methods (context for removal of dump methods)