Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 61 additions & 21 deletions src/inmanta/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4621,6 +4621,36 @@ def convert_or_ignore(rvid):
)
return out

@classmethod
async def set_deployed_multi(
cls,
environment: uuid.UUID,
resource_ids: list[m.ResourceIdStr],
version: int,
connection: Optional[asyncpg.connection.Connection] = None,
) -> list[m.ResourceIdStr]:
query = "UPDATE resource SET status='deployed' WHERE environment=$1 AND model=$2 AND resource_id =ANY($3) "
async with cls.get_connection(connection) as connection:
await connection.execute(query, environment, version, resource_ids)

@classmethod
async def get_resource_ids_with_status(
cls,
environment: uuid.UUID,
resource_version_ids: list[m.ResourceIdStr],
version: int,
statuses: list[const.ResourceState],
lock: Optional[RowLockMode] = None,
connection: Optional[asyncpg.connection.Connection] = None,
) -> list[m.ResourceIdStr]:
query = "SELECT resource_id as resource_id FROM resource WHERE environment=$1 AND model=$2 AND status = ANY($3) and resource_id =ANY($4) "
if lock:
query += lock.value
async with cls.get_connection(connection) as connection:
return [
r["resource_id"] for r in await connection.fetch(query, environment, version, statuses, resource_version_ids)
]

@classmethod
async def get_undeployable(cls, environment: uuid.UUID, version: int) -> list["Resource"]:
"""
Expand Down Expand Up @@ -4796,27 +4826,38 @@ async def get_resources_for_version_raw_with_persistent_state(
version: int,
projection: Optional[list[str]],
projection_presistent: Optional[list[str]],
project_attributes: Optional[list[str]] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this so we don't have to pull in all attributes for the resource when not needed, this can be a LOT of data

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
project_attributes: Optional[list[str]] = None,
project_attributes: Optional[Sequence[LiteralString]] = None,

See typing.LiteralString docs. I think we should use this ideally because we bake it into the query itself, which is not safe with any user input. Ideally we'd do the same for the other lists but I'll leave that decision up to you.

Copy link
Contributor Author

@wouterdb wouterdb Mar 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mypy doesn't support it python/mypy#12554

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, shame

*,
connection: Optional[Connection] = None,
) -> list[dict[str, object]]:
"""This method performs none of the mangling required to produce valid resources!"""
"""This method performs none of the mangling required to produce valid resources!

project_attributes performs a projection on the json attributes of the resources table
"""

def collect_projection(projection: Optional[list[str]], prefix: str) -> str:
if not projection:
return f"{prefix}.*"
else:
return ",".join(f"{prefix}.{field}" for field in projection)

if project_attributes:
json_projection = "," + ",".join(f"r.attributes->'{v}' as {v}" for v in project_attributes)
else:
json_projection = ""

query = f"""
SELECT {collect_projection(projection, 'r')}, {collect_projection(projection_presistent, 'ps')}
SELECT {collect_projection(projection, 'r')}, {collect_projection(projection_presistent, 'ps')} {json_projection}
FROM {cls.table_name()} r JOIN resource_persistent_state ps ON r.resource_id = ps.resource_id
WHERE r.environment=$1 AND ps.environment = $1 and r.model = $2;"""

resource_records = await cls._fetch_query(query, environment, version, connection=connection)
resources = [dict(record) for record in resource_records]
for res in resources:
if "attributes" in res:
res["attributes"] = json.loads(res["attributes"])
if project_attributes:
for k in project_attributes:
if res[k]:
res[k] = json.loads(res[k])
return resources

@classmethod
Expand Down Expand Up @@ -5403,6 +5444,7 @@ async def get_list(
no_obj: Optional[bool] = None,
lock: Optional[RowLockMode] = None,
connection: Optional[asyncpg.connection.Connection] = None,
no_status: bool = False, # don't load the status field
**query: object,
) -> list["ConfigurationModel"]:
# sanitize and validate order parameters
Expand Down Expand Up @@ -5449,11 +5491,18 @@ async def get_list(
for record in query_result:
record = dict(record)
if no_obj:
record["status"] = await cls._get_status_field(record["environment"], record["status"])
if no_status:
record["status"] = {}
else:
record["status"] = await cls._get_status_field(record["environment"], record["status"])
result.append(record)
else:
done = record.pop("done")
status = await cls._get_status_field(record["environment"], record.pop("status"))
if no_status:
status = {}
record.pop("status")
else:
status = await cls._get_status_field(record["environment"], record.pop("status"))
obj = cls(from_postgres=True, **record)
obj._done = done
obj._status = status
Expand Down Expand Up @@ -5706,7 +5755,6 @@ async def get_increment(
projection_a_resource = [
"resource_id",
"attribute_hash",
"attributes",
"status",
]
projection_a_state = [
Expand All @@ -5715,11 +5763,12 @@ async def get_increment(
"last_deployed_attribute_hash",
"last_non_deploying_status",
]
projection_a_attributes = ["requires", "send_event"]
projection = ["resource_id", "status", "attribute_hash"]

# get resources for agent
resources = await Resource.get_resources_for_version_raw_with_persistent_state(
environment, version, projection_a_resource, projection_a_state, connection=connection
environment, version, projection_a_resource, projection_a_state, projection_a_attributes, connection=connection
)

# to increment
Expand All @@ -5740,20 +5789,11 @@ async def get_increment(
continue
# Now outstanding events
last_success = resource["last_success"] or DATETIME_MIN_UTC
attributes = resource["attributes"]
assert isinstance(attributes, dict) # mypy
for req in attributes["requires"]:
for req in resource["requires"]:
req_res = id_to_resource[req]
assert req_res is not None # todo
req_res_attributes = req_res["attributes"]
assert isinstance(req_res_attributes, dict) # mypy
last_produced_events = req_res["last_produced_events"]
if (
last_produced_events is not None
and last_produced_events > last_success
and "send_event" in req_res_attributes
and req_res_attributes["send_event"]
):
if last_produced_events is not None and last_produced_events > last_success and req_res["send_event"]:
in_increment = True
break

Expand Down Expand Up @@ -5839,9 +5879,9 @@ async def get_increment(

# build lookup tables
for res in resources:
for req in res["attributes"]["requires"]:
for req in res["requires"]:
original_provides[req].append(res["resource_id"])
if "send_event" in res["attributes"] and res["attributes"]["send_event"]:
if res["send_event"]:
send_events.append(res["resource_id"])

# recursively include stuff potentially receiving events from nodes in the increment
Expand Down
48 changes: 41 additions & 7 deletions src/inmanta/server/services/orchestrationservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Contact: code@inmanta.com
"""

import asyncio
import datetime
import logging
import uuid
Expand Down Expand Up @@ -69,6 +70,8 @@
from inmanta.types import Apireturn, JsonType, PrimitiveTypes, ReturnTupple

LOGGER = logging.getLogger(__name__)
PLOGGER = logging.getLogger("performance")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: remove

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo alert



PERFORM_CLEANUP: bool = True
# Kill switch for cleanup, for use when working with historical data
Expand Down Expand Up @@ -411,7 +414,9 @@ async def _purge_versions(self) -> None:
# get available versions
n_versions = await env_item.get(AVAILABLE_VERSIONS_TO_KEEP, connection=connection)
assert isinstance(n_versions, int)
versions = await data.ConfigurationModel.get_list(environment=env_item.id, connection=connection)
versions = await data.ConfigurationModel.get_list(
environment=env_item.id, connection=connection, no_status=True
)
if len(versions) > n_versions:
LOGGER.info("Removing %s available versions from environment %s", len(versions) - n_versions, env_item.id)
version_dict = {x.version: x for x in versions}
Expand Down Expand Up @@ -652,7 +657,7 @@ async def _put_version(
pip_config: Optional[PipConfig] = None,
*,
connection: asyncpg.connection.Connection,
) -> None:
) -> abc.Sequence[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to make this a sequence? In return types I find that 90% of the time the concrete type is the appropriate pick.

"""
:param rid_to_resource: This parameter should contain all the resources when a full compile is done.
When a partial compile is done, it should contain all the resources that belong to the
Expand All @@ -666,6 +671,8 @@ async def _put_version(
sets that are removed by the partial compile. When no resource sets are removed by
a partial compile or when a full compile is done, this parameter can be set to None.

:return: all agents affected

Pre-conditions:
* The requires and provides relationships of the resources in rid_to_resource must be set correctly. For a
partial compile, this means it is assumed to be valid with respect to all absolute constraints that apply to
Expand Down Expand Up @@ -818,13 +825,15 @@ async def _put_version(
await ra.insert(connection=connection)

LOGGER.debug("Successfully stored version %d", version)
return list(all_agents)

async def _trigger_auto_deploy(
self,
env: data.Environment,
version: int,
*,
connection: Optional[Connection],
agents: Optional[abc.Sequence[str]] = None,
) -> None:
"""
Triggers auto-deploy for stored resources. Must be called only after transaction that stores resources has been allowed
Expand All @@ -838,7 +847,7 @@ async def _trigger_auto_deploy(
agent_trigger_method_on_autodeploy = cast(str, await env.get(data.AGENT_TRIGGER_METHOD_ON_AUTO_DEPLOY))
agent_trigger_method_on_autodeploy = const.AgentTriggerMethod[agent_trigger_method_on_autodeploy]
await self.release_version(
Copy link
Contributor Author

@wouterdb wouterdb Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only push agents that are/might be affected

env, version, push_on_auto_deploy, agent_trigger_method_on_autodeploy, connection=connection
env, version, push_on_auto_deploy, agent_trigger_method_on_autodeploy, connection=connection, agents=agents
)

def _create_unknown_parameter_daos_from_api_unknowns(
Expand Down Expand Up @@ -946,6 +955,8 @@ async def put_partial(
"source": str
}
"""
PLOGGER.warning("STARTING PUT PARTIAL")
previous = asyncio.get_running_loop().time()
if resource_state is None:
resource_state = {}
if unknowns is None:
Expand Down Expand Up @@ -978,6 +989,9 @@ async def put_partial(
"Following resource sets are present in the removed resource sets and in the resources that are exported: "
f"{intersection}"
)
t_now = asyncio.get_running_loop().time()
PLOGGER.warning("INPUT VALIDATION: %s", t_now - previous)
previous = t_now

async with data.Resource.get_connection() as con:
async with con.transaction():
Expand Down Expand Up @@ -1020,6 +1034,10 @@ async def put_partial(
set_version=version,
)

t_now = asyncio.get_running_loop().time()
PLOGGER.warning("LOAD STAGE: %s", t_now - previous)
previous = t_now

updated_resource_sets: abc.Set[str] = {sr_name for sr_name in resource_sets.values() if sr_name is not None}
partial_update_merger = await PartialUpdateMerger.create(
env_id=env.id,
Expand All @@ -1033,14 +1051,17 @@ async def put_partial(

# add shared resources
merged_resources = partial_update_merger.merge_updated_and_shared_resources(list(rid_to_resource.values()))
t_now = asyncio.get_running_loop().time()
PLOGGER.warning("MERGE STAGE: %s", t_now - previous)
previous = t_now

await data.Code.copy_versions(env.id, base_version, version, connection=con)

merged_unknowns = await partial_update_merger.merge_unknowns(
unknowns_in_partial_compile=self._create_unknown_parameter_daos_from_api_unknowns(env.id, version, unknowns)
)

await self._put_version(
all_agents = await self._put_version(
env,
version,
merged_resources,
Expand All @@ -1052,17 +1073,24 @@ async def put_partial(
pip_config=pip_config,
connection=con,
)
t_now = asyncio.get_running_loop().time()
PLOGGER.warning("PUT STAGE: %s", t_now - previous)
previous = t_now

returnvalue: ReturnValue[int] = ReturnValue[int](200, response=version)
try:
await self._trigger_auto_deploy(env, version, connection=con)
await self._trigger_auto_deploy(env, version, agents=all_agents, connection=con)
except Conflict as e:
# It is unclear if this condition can ever happen
LOGGER.warning(
"Could not perform auto deploy on version %d in environment %s, because %s", version, env.id, e.log_message
)
returnvalue.add_warnings([f"Could not perform auto deploy: {e.log_message} {e.details}"])

t_now = asyncio.get_running_loop().time()
PLOGGER.warning("AUTO DEPLOY STAGE: %s", t_now - previous)
previous = t_now

return returnvalue

@handle(methods.release_version, version_id="id", env="tid")
Expand All @@ -1074,7 +1102,11 @@ async def release_version(
agent_trigger_method: Optional[const.AgentTriggerMethod] = None,
*,
connection: Optional[asyncpg.connection.Connection] = None,
agents: Optional[abc.Sequence[str]] = None,
) -> ReturnTupple:
"""
:param agents: agents that have to be notified by the push
"""
async with data.ConfigurationModel.get_connection(connection) as connection:
async with connection.transaction():
with ConnectionInTransaction(connection) as connection_holder:
Expand Down Expand Up @@ -1170,8 +1202,10 @@ async def release_version(
# We can't be in a transaction here, or the agent will not see the data that as committed
# This assert prevents anyone from wrapping this method in a transaction by accident
assert not connection.is_in_transaction()
# fetch all resource in this cm and create a list of distinct agents
agents = await data.ConfigurationModel.get_agents(env.id, version_id, connection=connection)

if agents is None:
# fetch all resource in this cm and create a list of distinct agents
agents = await data.ConfigurationModel.get_agents(env.id, version_id, connection=connection)
await self.autostarted_agent_manager._ensure_agents(env, agents, connection=connection)

for agent in agents:
Expand Down
Loading