Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 50 additions & 24 deletions lib/python/picongpu/picmi/copy_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@
import inspect
from typing import Callable

from pydantic import BaseModel, ValidationError


def has_attribute(instance, name):
if isinstance(instance, type) and issubclass(instance, BaseModel):
return name in instance.model_fields or name in map(lambda x: x.alias, instance.model_fields.values())

# It should be this:
#
# return hasattr(instance, name)
Expand Down Expand Up @@ -58,8 +63,8 @@ def copy_attributes(
"""
Copy attributes from one object to another.

This function copies attributes from the `from_instance` to `to` if `to` is an class instance.
If `to` is a class, an instance will be created via `to()`, filled and returned.
This function copies attributes from the `from_instance` to `to` if `to` is a class instance.
If `to` is a class, it will try to construct and return an instance filled with values from `from_instance`.

This function only copies attributes under the following circumstances:
- The attribute exists in `to`.
Expand All @@ -73,26 +78,15 @@ def copy_attributes(
- Otherwise, `value` must be a Callable that takes `from_instance` as first and only argument
and returns the value that `key` is supposed to have in `to`, i.e.
`to.key = value(from_instance)`.
"""
if isinstance(to, type):
try:
to_instance = to()
except TypeError as e:
message = (
"Instantiation failed. The receiving class must be default constructible. "
f"You gave {to} which expects {len(inspect.signature(to.__init__).parameters) - 1} argument in its constructor. "
"You can work with an instance instead of a class in this case."
)
raise ValueError(message) from e
return copy_attributes(
from_instance,
to_instance,
conversions=conversions,
remove_prefix=remove_prefix,
ignore=ignore,
default_converter=default_converter,
)

Further useful features:
- `remove_prefix` allows to remove a prefix from `from_instance` member names
before looking them up and inserting them into `to`.
- `ignore` allows to ignore some attributes in `from_instance` and not copy them.
A custom conversion takes precedence and overrides this behaviour.
- `default_converter` is applied to all values retrieved from `from_instance`
before they are put into `to`.
"""
assignments = {
to_name: _value_generator(from_name)
for from_name, _ in inspect.getmembers(from_instance)
Expand All @@ -101,9 +95,41 @@ def copy_attributes(
and has_attribute(to, to_name := from_name.removeprefix(remove_prefix))
} | _sanitize_conversions(conversions, from_instance, to)

for key, value_generator in assignments.items():
setattr(to, key, default_converter(value_generator(from_instance)))
return to
# This is a two-pass process because after generating the defaults
# we had to apply the custom conversions on top.
assignments = {
key: default_converter(value_generator(from_instance)) for key, value_generator in assignments.items()
}

if isinstance(to, type):
try:
# First case: `to` is a class and can be constructed with a fully-qualified constructor call (pydantic.BaseModel).
return to(**assignments)
except TypeError:
try:
# Second case: `to` is a default-constructible class to which we can copy attributes afterwards.
to_instance = to()
except (ValidationError, TypeError) as e:
message = (
"Instantiation failed. The receiving class must be default constructible. "
f"You gave {to} which expects {len(inspect.signature(to.__init__).parameters) - 1} argument in its constructor. "
"You can work with an instance instead of a class in this case."
)
raise ValueError(message) from e
# We've got an instance now, proceed via the path for instances.
return copy_attributes(
from_instance,
to_instance,
conversions=conversions,
remove_prefix=remove_prefix,
ignore=ignore,
default_converter=default_converter,
)
else:
# Third case: We've been given an instance directly. Copy over attributes.
for key, value in assignments.items():
setattr(to, key, value)
return to


def converts_to(
Expand Down
70 changes: 39 additions & 31 deletions lib/python/picongpu/pypicongpu/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
License: GPLv3+
"""

from . import util
import typeguard
import typing
import enum
from typing import Annotated

from pydantic import BaseModel, Field, PlainSerializer, model_validator
from typing_extensions import Self

from .rendering import RenderedObject


@typeguard.typechecked
class BoundaryCondition(enum.Enum):
"""
Boundary Condition of PIConGPU
Expand All @@ -38,8 +39,27 @@ def get_cfg_str(self) -> str:
return literal_by_boundarycondition[self]


@typeguard.typechecked
class Grid3D(RenderedObject):
def serialise_vec(value) -> dict:
return dict(zip("xyz", value))


Vec3_float = Annotated[tuple[float, float, float], PlainSerializer(serialise_vec)]
Vec3_int = Annotated[tuple[int, int, int], PlainSerializer(serialise_vec)]


def serialise_grid_dist(value):
return (
value
if value is None
else {
"x": [{"device_cells": x} for x in value[0]],
"y": [{"device_cells": x} for x in value[1]],
"z": [{"device_cells": x} for x in value[2]],
}
)


class Grid3D(BaseModel, RenderedObject):
"""
PIConGPU 3 dimensional (cartesian) grid

Expand All @@ -48,47 +68,35 @@ class Grid3D(RenderedObject):
The bounding box is implicitly given as TODO.
"""

cell_size_si = util.build_typesafe_property(tuple[float, float, float])
cell_size: Vec3_float = Field(alias="cell_size_si")
"""Width of individual cell in each direction"""

cell_cnt = util.build_typesafe_property(tuple[int, int, int])
cell_cnt: Vec3_int
"""total number of cells in each direction"""

boundary_condition = util.build_typesafe_property(tuple[BoundaryCondition, BoundaryCondition, BoundaryCondition])
boundary_condition: Annotated[
tuple[BoundaryCondition, BoundaryCondition, BoundaryCondition],
PlainSerializer(lambda x: serialise_vec(map(BoundaryCondition.get_cfg_str, x)), return_type=dict),
]
"""behavior towards particles crossing each boundary"""

n_gpus = util.build_typesafe_property(typing.Tuple[int, int, int])
gpu_cnt: Vec3_int = Field((1, 1, 1), alias="n_gpus")
"""number of GPUs in x y and z direction as 3-integer tuple"""

grid_dist = util.build_typesafe_property(typing.Tuple[list[int], list[int], list[int]] | None)
grid_dist: Annotated[tuple[list[int], list[int], list[int]] | None, PlainSerializer(serialise_grid_dist)] = None
"""distribution of grid cells to GPUs for each axis"""

super_cell_size = util.build_typesafe_property(typing.Tuple[int, int, int])
super_cell_size: Vec3_int
"""size of super cell in x y and z direction as 3-integer tuple in cells"""

def _get_serialized(self) -> dict:
@model_validator(mode="after")
def check(self) -> Self:
"""serialized representation provided for RenderedObject"""
assert all(x > 0 for x in self.cell_cnt), "cell_cnt must be greater than 0"
assert all(x > 0 for x in self.n_gpus), "all n_gpus entries must be greater than 0"
assert all(x > 0 for x in self.gpu_cnt), "all n_gpus entries must be greater than 0"
if self.grid_dist is not None:
assert sum(self.grid_dist[0]) == self.cell_cnt[0], "sum of grid_dists in x must be equal to number_of_cells"
assert sum(self.grid_dist[1]) == self.cell_cnt[1], "sum of grid_dists in y must be equal to number_of_cells"
assert sum(self.grid_dist[2]) == self.cell_cnt[2], "sum of grid_dists in z must be equal to number_of_cells"

result_dict = {
"cell_size": dict(zip("xyz", self.cell_size_si)),
"cell_cnt": dict(zip("xyz", self.cell_cnt)),
"boundary_condition": dict(zip("xyz", map(BoundaryCondition.get_cfg_str, self.boundary_condition))),
"gpu_cnt": dict(zip("xyz", self.n_gpus)),
"super_cell_size": dict(zip("xyz", self.super_cell_size)),
}
if self.grid_dist is not None:
result_dict["grid_dist"] = {
"x": [{"device_cells": x} for x in self.grid_dist[0]],
"y": [{"device_cells": x} for x in self.grid_dist[1]],
"z": [{"device_cells": x} for x in self.grid_dist[2]],
}
else:
result_dict["grid_dist"] = None

return result_dict
return self
39 changes: 11 additions & 28 deletions lib/python/picongpu/pypicongpu/output/auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@
License: GPLv3+
"""

from pydantic import BaseModel, PrivateAttr, computed_field
from .timestepspec import TimeStepSpec
from .. import util
from .plugin import Plugin

import typeguard


@typeguard.typechecked
class Auto(Plugin):
class Auto(Plugin, BaseModel):
"""
Class to provide output **without further configuration**.

Expand All @@ -26,27 +23,13 @@ class Auto(Plugin):
create a separate class.
"""

period = util.build_typesafe_property(TimeStepSpec)
period: TimeStepSpec
"""period to print data at"""

_name = "auto"

def __init__(self):
pass

def check(self) -> None:
"""
validate attributes
"""
pass

def _get_serialized(self) -> dict:
self.check()
return {
"period": self.period.get_rendering_context(),
# helper to avoid repeating code
"png_axis": [
{"axis": "yx"},
{"axis": "yz"},
],
}
_name: str = PrivateAttr("auto")

@computed_field
def png_axis(self) -> list[dict[str, str]]:
return [
{"axis": "yx"},
{"axis": "yz"},
]
39 changes: 22 additions & 17 deletions lib/python/picongpu/pypicongpu/output/timestepspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,33 @@
License: GPLv3+
"""

from typing import Annotated
from pydantic import BaseModel, PlainSerializer, field_validator
from ..rendering.renderedobject import RenderedObject
from ..util import build_typesafe_property

import typeguard


def _serialize(spec):
if isinstance(spec, slice):
return {
"start": spec.start if spec.start is not None else 0,
"stop": spec.stop if spec.stop is not None else -1,
"step": spec.step if spec.step is not None else 1,
}
raise ValueError(f"Unknown serialization for {spec=} as a time step specifier (--period argument).")
class Spec(BaseModel):
start: Annotated[int | None, PlainSerializer(lambda x: x if x is not None else 0)]
stop: Annotated[int | None, PlainSerializer(lambda x: x if x is not None else -1)]
step: Annotated[int | None, PlainSerializer(lambda x: x if x is not None else 1)]


@typeguard.typechecked
class TimeStepSpec(RenderedObject):
specs = build_typesafe_property(list[slice])

def __init__(self, specs: list[slice]):
self.specs = specs

def _get_serialized(self):
return {"specs": list(map(_serialize, self.specs))}
class TimeStepSpec(RenderedObject, BaseModel):
specs: list[Spec]

def __init__(self, *args, **kwargs):
# allow to give specs as positional argument
if len(args) > 0 and "specs" not in kwargs:
kwargs |= {"specs": args[0]}
super(TimeStepSpec, self).__init__(*args[1:], **kwargs)

@field_validator("specs", mode="before")
@classmethod
def validate_specs(cls, value) -> list[Spec]:
try:
return [Spec(start=s.start, stop=s.stop, step=s.step) for s in value]
except AttributeError:
return value
Loading