Skip to content

Commit 183d3de

Browse files
wouterdbinmantaci
authored andcommitted
Improve deploy performance for very large models (Issue #7262, PR #7278)
Pull request opened by the merge tool on behalf of #7278
1 parent a5d9a04 commit 183d3de

File tree

12 files changed

+304
-154
lines changed

12 files changed

+304
-154
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
description: Improve deploy performance for very large models
2+
issue-nr: 7262
3+
change-type: minor
4+
destination-branches: [master, iso7]
5+
sections:
6+
minor-improvement: "{{description}}"
7+

src/inmanta/agent/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@
9191
is_time,
9292
)
9393

94-
agent_deploy_interval = Option(
94+
agent_deploy_interval: Option[int | str] = Option(
9595
"config",
9696
"agent-deploy-interval",
9797
0,

src/inmanta/config.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -133,19 +133,19 @@ def get(cls, section: Optional[str] = None, name: Optional[str] = None, default_
133133

134134
@classmethod
135135
def get_for_option(cls, option: "Option[T]") -> T:
136-
raw_value: Optional[str | T] = cls._get_value(option.section, option.name, option.get_default_value())
136+
raw_value: str | T = cls._get_value(option.section, option.name, option.get_default_value())
137137
return option.validate(raw_value)
138138

139139
@classmethod
140-
def _get_value(cls, section: str, name: str, default_value: Optional[T] = None) -> Optional[str | T]:
140+
def _get_value(cls, section: str, name: str, default_value: T) -> str | T:
141141
cfg: ConfigParser = cls.get_instance()
142142
val: Optional[str] = _get_from_env(section, name)
143143
if val is not None:
144144
LOGGER.debug(f"Setting {section}:{name} was set using an environment variable")
145-
else:
146-
val = cfg.get(section, name, fallback=default_value)
147-
148-
return val
145+
return val
146+
# Typing of this method in the sdk is not entirely accurate
147+
# It just returns the fallback, whatever its type
148+
return cfg.get(section, name, fallback=default_value)
149149

150150
@classmethod
151151
def is_set(cls, section: str, name: str) -> bool:
@@ -205,12 +205,12 @@ def is_float(value: str) -> float:
205205
return float(value)
206206

207207

208-
def is_time(value: str) -> int:
208+
def is_time(value: str | int) -> int:
209209
"""Time, the number of seconds represented as an integer value"""
210210
return int(value)
211211

212212

213-
def is_time_or_cron(value: str) -> Union[int, str]:
213+
def is_time_or_cron(value: str | int) -> Union[int, str]:
214214
"""Time, the number of seconds represented as an integer value or a cron-like expression"""
215215
try:
216216
return is_time(value)
@@ -232,8 +232,10 @@ def is_bool(value: Union[bool, str]) -> bool:
232232
return boolean_states[value.lower()]
233233

234234

235-
def is_list(value: str) -> list[str]:
235+
def is_list(value: str | list[str]) -> list[str]:
236236
"""List of comma-separated values"""
237+
if isinstance(value, list):
238+
return value
237239
return [] if value == "" else [x.strip() for x in value.split(",")]
238240

239241

@@ -304,9 +306,9 @@ def __init__(
304306
self,
305307
section: str,
306308
name: str,
307-
default: Union[T, None, Callable[[], T]],
309+
default: Union[T, Callable[[], T]],
308310
documentation: str,
309-
validator: Callable[[Optional[str | T]], T] = is_str,
311+
validator: Callable[[str | T], T] = is_str,
310312
predecessor_option: Optional["Option"] = None,
311313
) -> None:
312314
self.section = section
@@ -342,10 +344,10 @@ def get_default_desc(self) -> str:
342344
else:
343345
return f"``{defa}``"
344346

345-
def validate(self, value: Optional[str | T]) -> T:
347+
def validate(self, value: str | T) -> T:
346348
return self.validator(value)
347349

348-
def get_default_value(self) -> Optional[T]:
350+
def get_default_value(self) -> T:
349351
defa = self.default
350352
if callable(defa):
351353
return defa()

src/inmanta/data/__init__.py

Lines changed: 74 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4621,6 +4621,40 @@ def convert_or_ignore(rvid):
46214621
)
46224622
return out
46234623

4624+
@classmethod
4625+
async def set_deployed_multi(
4626+
cls,
4627+
environment: uuid.UUID,
4628+
resource_ids: Sequence[m.ResourceIdStr],
4629+
version: int,
4630+
connection: Optional[asyncpg.connection.Connection] = None,
4631+
) -> None:
4632+
query = "UPDATE resource SET status='deployed' WHERE environment=$1 AND model=$2 AND resource_id =ANY($3) "
4633+
async with cls.get_connection(connection) as connection:
4634+
await connection.execute(query, environment, version, resource_ids)
4635+
4636+
@classmethod
4637+
async def get_resource_ids_with_status(
4638+
cls,
4639+
environment: uuid.UUID,
4640+
resource_version_ids: list[m.ResourceIdStr],
4641+
version: int,
4642+
statuses: Sequence[const.ResourceState],
4643+
lock: Optional[RowLockMode] = None,
4644+
connection: Optional[asyncpg.connection.Connection] = None,
4645+
) -> list[m.ResourceIdStr]:
4646+
query = (
4647+
"SELECT resource_id as resource_id FROM resource WHERE "
4648+
"environment=$1 AND model=$2 AND status = ANY($3) and resource_id =ANY($4) "
4649+
)
4650+
if lock:
4651+
query += lock.value
4652+
async with cls.get_connection(connection) as connection:
4653+
return [
4654+
m.ResourceIdStr(cast(str, r["resource_id"]))
4655+
for r in await connection.fetch(query, environment, version, statuses, resource_version_ids)
4656+
]
4657+
46244658
@classmethod
46254659
async def get_undeployable(cls, environment: uuid.UUID, version: int) -> list["Resource"]:
46264660
"""
@@ -4794,29 +4828,42 @@ async def get_resources_for_version_raw_with_persistent_state(
47944828
cls,
47954829
environment: uuid.UUID,
47964830
version: int,
4797-
projection: Optional[list[str]],
4798-
projection_presistent: Optional[list[str]],
4831+
projection: Optional[list[typing.LiteralString]],
4832+
projection_presistent: Optional[list[typing.LiteralString]],
4833+
project_attributes: Optional[list[typing.LiteralString]] = None,
47994834
*,
48004835
connection: Optional[Connection] = None,
48014836
) -> list[dict[str, object]]:
4802-
"""This method performs none of the mangling required to produce valid resources!"""
4837+
"""This method performs none of the mangling required to produce valid resources!
4838+
4839+
project_attributes performs a projection on the json attributes of the resources table
4840+
4841+
all projections must be disjoint, as they become named fields in the output record
4842+
"""
48034843

48044844
def collect_projection(projection: Optional[list[str]], prefix: str) -> str:
48054845
if not projection:
48064846
return f"{prefix}.*"
48074847
else:
48084848
return ",".join(f"{prefix}.{field}" for field in projection)
48094849

4850+
if project_attributes:
4851+
json_projection = "," + ",".join(f"r.attributes->'{v}' as {v}" for v in project_attributes)
4852+
else:
4853+
json_projection = ""
4854+
48104855
query = f"""
4811-
SELECT {collect_projection(projection, 'r')}, {collect_projection(projection_presistent, 'ps')}
4856+
SELECT {collect_projection(projection, 'r')}, {collect_projection(projection_presistent, 'ps')} {json_projection}
48124857
FROM {cls.table_name()} r JOIN resource_persistent_state ps ON r.resource_id = ps.resource_id
48134858
WHERE r.environment=$1 AND ps.environment = $1 and r.model = $2;"""
48144859

48154860
resource_records = await cls._fetch_query(query, environment, version, connection=connection)
48164861
resources = [dict(record) for record in resource_records]
48174862
for res in resources:
4818-
if "attributes" in res:
4819-
res["attributes"] = json.loads(res["attributes"])
4863+
if project_attributes:
4864+
for k in project_attributes:
4865+
if res[k]:
4866+
res[k] = json.loads(res[k])
48204867
return resources
48214868

48224869
@classmethod
@@ -5403,6 +5450,7 @@ async def get_list(
54035450
no_obj: Optional[bool] = None,
54045451
lock: Optional[RowLockMode] = None,
54055452
connection: Optional[asyncpg.connection.Connection] = None,
5453+
no_status: bool = False, # don't load the status field
54065454
**query: object,
54075455
) -> list["ConfigurationModel"]:
54085456
# sanitize and validate order parameters
@@ -5446,14 +5494,21 @@ async def get_list(
54465494
{lock_statement}"""
54475495
query_result = await cls._fetch_query(query_string, *values, connection=connection)
54485496
result = []
5449-
for record in query_result:
5450-
record = dict(record)
5497+
for in_record in query_result:
5498+
record = dict(in_record)
54515499
if no_obj:
5452-
record["status"] = await cls._get_status_field(record["environment"], record["status"])
5500+
if no_status:
5501+
record["status"] = {}
5502+
else:
5503+
record["status"] = await cls._get_status_field(record["environment"], record["status"])
54535504
result.append(record)
54545505
else:
54555506
done = record.pop("done")
5456-
status = await cls._get_status_field(record["environment"], record.pop("status"))
5507+
if no_status:
5508+
status = {}
5509+
record.pop("status")
5510+
else:
5511+
status = await cls._get_status_field(record["environment"], record.pop("status"))
54575512
obj = cls(from_postgres=True, **record)
54585513
obj._done = done
54595514
obj._status = status
@@ -5703,23 +5758,23 @@ async def get_increment(
57035758
deployed and different hash -> increment
57045759
"""
57055760
# Depends on deploying
5706-
projection_a_resource = [
5761+
projection_a_resource: list[typing.LiteralString] = [
57075762
"resource_id",
57085763
"attribute_hash",
5709-
"attributes",
57105764
"status",
57115765
]
5712-
projection_a_state = [
5766+
projection_a_state: list[typing.LiteralString] = [
57135767
"last_success",
57145768
"last_produced_events",
57155769
"last_deployed_attribute_hash",
57165770
"last_non_deploying_status",
57175771
]
5718-
projection = ["resource_id", "status", "attribute_hash"]
5772+
projection_a_attributes: list[typing.LiteralString] = ["requires", "send_event"]
5773+
projection: list[typing.LiteralString] = ["resource_id", "status", "attribute_hash"]
57195774

57205775
# get resources for agent
57215776
resources = await Resource.get_resources_for_version_raw_with_persistent_state(
5722-
environment, version, projection_a_resource, projection_a_state, connection=connection
5777+
environment, version, projection_a_resource, projection_a_state, projection_a_attributes, connection=connection
57235778
)
57245779

57255780
# to increment
@@ -5740,20 +5795,11 @@ async def get_increment(
57405795
continue
57415796
# Now outstanding events
57425797
last_success = resource["last_success"] or DATETIME_MIN_UTC
5743-
attributes = resource["attributes"]
5744-
assert isinstance(attributes, dict) # mypy
5745-
for req in attributes["requires"]:
5798+
for req in resource["requires"]:
57465799
req_res = id_to_resource[req]
57475800
assert req_res is not None # todo
5748-
req_res_attributes = req_res["attributes"]
5749-
assert isinstance(req_res_attributes, dict) # mypy
57505801
last_produced_events = req_res["last_produced_events"]
5751-
if (
5752-
last_produced_events is not None
5753-
and last_produced_events > last_success
5754-
and "send_event" in req_res_attributes
5755-
and req_res_attributes["send_event"]
5756-
):
5802+
if last_produced_events is not None and last_produced_events > last_success and req_res["send_event"]:
57575803
in_increment = True
57585804
break
57595805

@@ -5839,9 +5885,9 @@ async def get_increment(
58395885

58405886
# build lookup tables
58415887
for res in resources:
5842-
for req in res["attributes"]["requires"]:
5888+
for req in res["requires"]:
58435889
original_provides[req].append(res["resource_id"])
5844-
if "send_event" in res["attributes"] and res["attributes"]["send_event"]:
5890+
if res["send_event"]:
58455891
send_events.append(res["resource_id"])
58465892

58475893
# recursively include stuff potentially receiving events from nodes in the increment

src/inmanta/server/agentmanager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1006,7 +1006,7 @@ async def _terminate_agents(self) -> None:
10061006
async def _ensure_agents(
10071007
self,
10081008
env: data.Environment,
1009-
agents: list[str],
1009+
agents: Sequence[str],
10101010
restart: bool = False,
10111011
*,
10121012
connection: Optional[asyncpg.connection.Connection] = None,

src/inmanta/server/config.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -244,18 +244,18 @@ def validate_fact_renew(value: object) -> int:
244244
"server", "purge-resource-action-logs-interval", 3600, "The number of seconds between resource-action log purging", is_time
245245
)
246246

247-
server_resource_action_log_prefix = Option(
247+
server_resource_action_log_prefix: Option[str] = Option(
248248
"server",
249249
"resource_action_log_prefix",
250250
"resource-actions-",
251251
"File prefix in log-dir, containing the resource-action logs. The after the prefix the environment uuid and .log is added",
252252
is_str,
253253
)
254254

255-
server_enabled_extensions = Option(
255+
server_enabled_extensions: Option[list[str]] = Option(
256256
"server",
257257
"enabled_extensions",
258-
"",
258+
list,
259259
"A list of extensions the server must load. Core is always loaded."
260260
"If an extension listed in this list is not available, the server will refuse to start.",
261261
is_list,
@@ -271,9 +271,9 @@ def validate_fact_renew(value: object) -> int:
271271
)
272272

273273

274-
def default_hangtime() -> str:
274+
def default_hangtime() -> int:
275275
""":inmanta.config:option:`server.agent-timeout` *3/4"""
276-
return str(int(agent_timeout.get() * 3 / 4))
276+
return int(agent_timeout.get() * 3 / 4)
277277

278278

279279
agent_hangtime = Option(

0 commit comments

Comments
 (0)