From a81282ddfa9ea66885f77821c06f02b5482b65f8 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Thu, 26 Oct 2023 11:49:25 +0200 Subject: [PATCH 01/15] add logs --- cads_broker/dispatcher.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cads_broker/dispatcher.py b/cads_broker/dispatcher.py index f47fa0bd..3e118774 100644 --- a/cads_broker/dispatcher.py +++ b/cads_broker/dispatcher.py @@ -243,11 +243,13 @@ def submit_requests(self, session: sa.orm.Session, number_of_requests: int) -> N reverse=True, ) requests_counter = 0 + start_can_run = time.time() for request in queue: if self.qos.can_run(request, session=session): self.submit_request(request, session=session) requests_counter += 1 if requests_counter == int(number_of_requests * WORKERS_MULTIPLIER): + logger.info(f"---------> can_run loop: {time.time() - start_can_run}") break def submit_request( From 96f64eb5b10cf5c65b0870b08e6d616c7e29561f Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Thu, 26 Oct 2023 12:13:28 +0200 Subject: [PATCH 02/15] add logs --- cads_broker/qos/QoS.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cads_broker/qos/QoS.py b/cads_broker/qos/QoS.py index b955d7b0..84c6ab31 100644 --- a/cads_broker/qos/QoS.py +++ b/cads_broker/qos/QoS.py @@ -101,6 +101,7 @@ def can_run(self, request, session): if limit.full(request): # performance. avoid interacting with db if limit is already there if limit.get_uid(request) not in request.qos_status.get(limit.name, []): + print(f"--------> write limit for {request.request_uid}") database.set_request_qos_rule(request, limit, session) limits.append(limit) session.commit() From bb8fe56bc015e923e9e8154e88688c9ed98e60bc Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Fri, 27 Oct 2023 15:09:35 +0200 Subject: [PATCH 03/15] improve qos status performance --- .../67957f85d934_create_qos_rules_table.py | 44 ++++++++++ cads_broker/Environment.py | 4 +- cads_broker/database.py | 84 +++++++++++++------ cads_broker/dispatcher.py | 7 +- cads_broker/expressions/functions.py | 5 +- cads_broker/factory.py | 4 + cads_broker/qos/QoS.py | 23 +++-- cads_broker/qos/Rule.py | 4 +- 8 files changed, 133 insertions(+), 42 deletions(-) create mode 100644 alembic/versions/67957f85d934_create_qos_rules_table.py diff --git a/alembic/versions/67957f85d934_create_qos_rules_table.py b/alembic/versions/67957f85d934_create_qos_rules_table.py new file mode 100644 index 00000000..07723266 --- /dev/null +++ b/alembic/versions/67957f85d934_create_qos_rules_table.py @@ -0,0 +1,44 @@ +"""create qos_rules table + +Revision ID: 67957f85d934 +Revises: 01374a5b3c41 +Create Date: 2023-10-27 12:04:01.741917 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB + + +# revision identifiers, used by Alembic. +revision = '67957f85d934' +down_revision = '01374a5b3c41' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "qos_rules", + sa.Column("id", sa.Text, primary_key=True), + sa.Column("rule", sa.Text), + sa.Column("condition", sa.Text), + sa.Column("conclusion", sa.Text), + sa.Column("info", sa.Text), + sa.Column("timestamp", sa.TIMESTAMP, default=sa.func.now()), + ) + op.drop_column("system_requests", "qos_status") + op.add_column( + "system_requests", + sa.Column( + "qos_status_ids", sa.dialects.postgresql.ARRAY(sa.Text), default=[] + ), + ) + op.execute("update system_requests set qos_status_ids=[]") + + +def downgrade() -> None: + op.drop_column("system_requests", "qos_status_ids") + op.add_column("system_requests", sa.Column("qos_status", JSONB, default={})) + op.execute("UPDATE system_requests SET qos_status='{}'") + op.drop_table("qos_rules") diff --git a/cads_broker/Environment.py b/cads_broker/Environment.py index ea7075eb..60a07ac7 100644 --- a/cads_broker/Environment.py +++ b/cads_broker/Environment.py @@ -23,8 +23,8 @@ def wrapped(self, *args, **kwargs): class Environment: - def __init__(self): - self.number_of_workers = None + def __init__(self, number_of_workers=None): + self.number_of_workers = number_of_workers self.lock = threading.RLock() self._enabled = {} self._values = {} diff --git a/cads_broker/database.py b/cads_broker/database.py index 39c6a9dd..51cc5e1e 100644 --- a/cads_broker/database.py +++ b/cads_broker/database.py @@ -41,6 +41,19 @@ class AdaptorProperties(BaseModel): form = sa.Column(JSONB) +class QosRule(BaseModel): + """QoS Rule ORM model.""" + + __tablename__ = "qos_rules" + + id = sa.Column(sa.Text, primary_key=True) + rule = sa.Column(sa.Text) + condition = sa.Column(sa.Text) + conclusion = sa.Column(sa.Text) + info = sa.Column(sa.Text) + timestamp = sa.Column(sa.TIMESTAMP, default=sa.func.now()) + + class SystemRequest(BaseModel): """System Request ORM model.""" @@ -72,7 +85,7 @@ class SystemRequest(BaseModel): sa.Text, sa.ForeignKey("adaptor_properties.hash"), nullable=False ) entry_point = sa.Column(sa.Text) - qos_status = sa.Column(JSONB, default=dict) + qos_status_ids = sa.Column(sa.dialects.postgresql.ARRAY(sa.Text), default=[]) __table_args__: tuple[sa.ForeignKeyConstraint, dict[None, None]] = ( sa.ForeignKeyConstraint( @@ -81,7 +94,6 @@ class SystemRequest(BaseModel): {}, ) - # joined is temporary cache_entry = sa.orm.relationship(cacholote.database.CacheEntry, lazy="joined") adaptor_properties = sa.orm.relationship(AdaptorProperties, lazy="select") @@ -307,40 +319,58 @@ def count_users(status: str, entry_point: str, session: sa.orm.Session) -> int: ) +def drop_qos_rules_table(session: sa.orm.Session): + session.query(QosRule).delete() + session.commit() + + +def add_qos_rule( + rule, + session: sa.orm.Session, +) -> None: + session.add(QosRule( + id=rule.get_uid(), + # this works only if the "request" is not needed in the computation of "conclusion" + conclusion=str(rule.evaluate(request=None)), + info=str(rule.info).replace('"', ""), + condition=str(rule.condition), + )) + session.commit() + + +def get_qos_rule_from_id(qos_rule_id: str, session: sa.orm.Session) -> QosRule | None: + statement = sa.select(QosRule).where( + QosRule.id == qos_rule_id + ) + try: + return session.scalars(statement).one() + except sa.exc.NoResultFound: + return None + + def get_qos_status_from_request( request: SystemRequest, + session: sa.orm.Session, ) -> dict[str, list[tuple[str, str]]]: - ret_value: dict[str, list[str]] = {} - for rule_name, rules in request.qos_status.items(): - ret_value[rule_name] = [] - for rule in rules.values(): - ret_value[rule_name].append( - (rule.get("info", ""), rule.get("conclusion", "")) - ) + ret_value: dict[str, list[tuple[str, str]]] = {} + for qos_rule_id in request.qos_status_ids: + qos_rule = get_qos_rule_from_id(qos_rule_id=qos_rule_id, session=session) + if qos_rule is not None: + ret_value.setdefault(qos_rule.rule, []) + ret_value[qos_rule.rule].append((qos_rule.info, qos_rule.conclusion)) return ret_value -def set_request_qos_rule( +def add_qos_rule_to_request( request: SystemRequest, rule, session: sa.orm.Session, ): - qos_status = request.qos_status - old_rules = qos_status.get(rule.name, {}) - rule_uid = rule.get_uid(request) - if rule_uid in old_rules: - return - old_rules[rule_uid] = { - "conclusion": str(rule.evaluate(request)), - "info": str(rule.info).replace('"', ""), - "condition": str(rule.condition), - } - qos_status[rule.name] = old_rules - session.execute( - sa.update(SystemRequest) - .filter_by(request_uid=request.request_uid) - .values(qos_status=qos_status) - ) + print(request.qos_status_ids) + request.qos_status_ids = request.qos_status_ids + [rule.get_uid(None)] + session.add(request) + print(request.qos_status_ids) + session.commit() def requeue_request( @@ -395,7 +425,7 @@ def set_request_status( request.response_error = {"message": error_message, "reason": error_reason} elif status == "running": request.started_at = sa.func.now() - request.qos_status = {} + request.qos_status_ids = [] # FIXME: logs can't be live updated request.response_log = json.dumps(log) request.response_user_visible_log = json.dumps(user_visible_log) diff --git a/cads_broker/dispatcher.py b/cads_broker/dispatcher.py index 3e118774..f57746e1 100644 --- a/cads_broker/dispatcher.py +++ b/cads_broker/dispatcher.py @@ -89,8 +89,8 @@ def get_tasks_on_scheduler(dask_scheduler: distributed.Scheduler) -> dict[str, s class QoSRules: - def __init__(self) -> None: - self.environment = Environment.Environment() + def __init__(self, number_of_workers) -> None: + self.environment = Environment.Environment(number_of_workers=number_of_workers) self.rules_path = os.getenv("RULES_PATH", "/src/rules.qos") if os.path.exists(self.rules_path): self.rules = self.rules_path @@ -125,7 +125,7 @@ def from_address( session_maker: sa.orm.sessionmaker = None, ): client = distributed.Client(address) - qos_config = QoSRules() + qos_config = QoSRules(get_number_of_workers(client=client)) factory.register_functions() session_maker = db.ensure_session_obj(session_maker) rules_hash = get_rules_hash(qos_config.rules_path) @@ -137,6 +137,7 @@ def from_address( qos_config.rules, qos_config.environment, rules_hash=rules_hash, + session_maker=session_maker, ), address=address, ) diff --git a/cads_broker/expressions/functions.py b/cads_broker/expressions/functions.py index 84b8d892..92278e2a 100644 --- a/cads_broker/expressions/functions.py +++ b/cads_broker/expressions/functions.py @@ -10,6 +10,9 @@ import operator import re +import structlog + +logger: structlog.stdlib.BoundLogger = structlog.get_logger(__name__) class FunctionExpression: def __init__(self, name, args): @@ -26,7 +29,7 @@ def evaluate(self, context): return self.execute(context, *args) except Exception as e: args = ",".join(repr(a) for a in args) - print(f"{self.name}({args}): {e}") + logger.warning(f"{self.name}({args}): {e}") raise diff --git a/cads_broker/factory.py b/cads_broker/factory.py index 330e7c86..7a264e20 100644 --- a/cads_broker/factory.py +++ b/cads_broker/factory.py @@ -33,6 +33,10 @@ def register_functions(): "adaptor", lambda context, *args: context.request.entry_point, ) + expressions.FunctionFactory.FunctionFactory.register_function( + "number_of_workers", + lambda context, *args: context.environment.number_of_workers, + ) expressions.FunctionFactory.FunctionFactory.register_function( "user_request_count", lambda context, seconds: database.count_finished_requests_per_user( diff --git a/cads_broker/qos/QoS.py b/cads_broker/qos/QoS.py index 84c6ab31..c9bd2fd7 100644 --- a/cads_broker/qos/QoS.py +++ b/cads_broker/qos/QoS.py @@ -26,7 +26,7 @@ def wrapped(self, *args, **kwargs): class QoS: - def __init__(self, rules, environment, rules_hash): + def __init__(self, rules, environment, rules_hash, session_maker): self.lock = threading.RLock() self.rules_hash = rules_hash @@ -47,10 +47,11 @@ def __init__(self, rules, environment, rules_hash): self.path = rules self.rules = None # Read the files from the rules file - self.read_rules() + with session_maker() as session: + self.read_rules(session=session) @locked - def read_rules(self): + def read_rules(self, session): """Read the rule files and populate the rule_set.""" # Create a parser to parse the rules file parser = RulesParser(self.path) @@ -60,10 +61,19 @@ def read_rules(self): # Parse the rules parser.parse_rules(self.rules, self.environment) + self.fill_qos_rules_db(session=session) # Print the rules self.rules.dump() + @locked + def fill_qos_rules_db(self, session): + database.drop_qos_rules_table(session=session) + for limit in self.rules.global_limits: + database.add_qos_rule(limit, session=session) + for limit in self.rules.user_limits: + database.add_qos_rule(limit, session=session) + @locked def reload_rules(self, session): """Allow a 'hot' reloading of the rules. @@ -71,7 +81,7 @@ def reload_rules(self, session): For example, a thread could be monitoring the time stamp of the rules file and call this method. """ - self.read_rules() + self.read_rules(session=session) self.reconfigure(session=session) @locked @@ -100,9 +110,8 @@ def can_run(self, request, session): for i, limit in enumerate(properties.limits): if limit.full(request): # performance. avoid interacting with db if limit is already there - if limit.get_uid(request) not in request.qos_status.get(limit.name, []): - print(f"--------> write limit for {request.request_uid}") - database.set_request_qos_rule(request, limit, session) + if limit.get_uid(request) not in request.qos_status_ids: + database.add_qos_rule_to_request(request, limit, session) limits.append(limit) session.commit() permissions = [] diff --git a/cads_broker/qos/Rule.py b/cads_broker/qos/Rule.py index c0dc5150..6661cf76 100644 --- a/cads_broker/qos/Rule.py +++ b/cads_broker/qos/Rule.py @@ -45,9 +45,9 @@ def match(self, request): def dump(self, out): out(self) - def get_uid(self, request): + def get_uid(self, request=None): return str( - hash(f"{self.name} {self.info} {self.condition} : {self.evaluate(request)}") + hash(f"{self.name} {self.info} {self.condition} : {self.evaluate(request=request)}") ) def __repr__(self): From c478352031e21e84094d0fd2537678d48aa4c3f4 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Fri, 27 Oct 2023 15:14:48 +0200 Subject: [PATCH 04/15] fix alembic config --- alembic/versions/67957f85d934_create_qos_rules_table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alembic/versions/67957f85d934_create_qos_rules_table.py b/alembic/versions/67957f85d934_create_qos_rules_table.py index 07723266..a0b809aa 100644 --- a/alembic/versions/67957f85d934_create_qos_rules_table.py +++ b/alembic/versions/67957f85d934_create_qos_rules_table.py @@ -31,7 +31,7 @@ def upgrade() -> None: op.add_column( "system_requests", sa.Column( - "qos_status_ids", sa.dialects.postgresql.ARRAY(sa.Text), default=[] + "qos_status_ids", sa.dialects.postgresql.ARRAY(sa.Text), default="{}" ), ) op.execute("update system_requests set qos_status_ids=[]") From b6bcb9d7302c9472c9f81d11587a30bb617df2e8 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Fri, 27 Oct 2023 15:16:29 +0200 Subject: [PATCH 05/15] fix database definition --- cads_broker/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cads_broker/database.py b/cads_broker/database.py index 51cc5e1e..6f82d47e 100644 --- a/cads_broker/database.py +++ b/cads_broker/database.py @@ -85,7 +85,7 @@ class SystemRequest(BaseModel): sa.Text, sa.ForeignKey("adaptor_properties.hash"), nullable=False ) entry_point = sa.Column(sa.Text) - qos_status_ids = sa.Column(sa.dialects.postgresql.ARRAY(sa.Text), default=[]) + qos_status_ids = sa.Column(sa.dialects.postgresql.ARRAY(sa.Text), default="{}") __table_args__: tuple[sa.ForeignKeyConstraint, dict[None, None]] = ( sa.ForeignKeyConstraint( From 712082039ef7d5f51b4c848992aba824bddf7ba4 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Fri, 27 Oct 2023 15:18:33 +0200 Subject: [PATCH 06/15] fix alembic --- alembic/versions/67957f85d934_create_qos_rules_table.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/alembic/versions/67957f85d934_create_qos_rules_table.py b/alembic/versions/67957f85d934_create_qos_rules_table.py index a0b809aa..92f73593 100644 --- a/alembic/versions/67957f85d934_create_qos_rules_table.py +++ b/alembic/versions/67957f85d934_create_qos_rules_table.py @@ -31,10 +31,10 @@ def upgrade() -> None: op.add_column( "system_requests", sa.Column( - "qos_status_ids", sa.dialects.postgresql.ARRAY(sa.Text), default="{}" + "qos_status_ids", sa.dialects.postgresql.ARRAY(sa.Text), default=[] ), ) - op.execute("update system_requests set qos_status_ids=[]") + op.execute("update system_requests set qos_status_ids='{}'") def downgrade() -> None: From fbaff8e4d48c1f7e811fc1b139d1cfabee335887 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Fri, 27 Oct 2023 15:20:05 +0200 Subject: [PATCH 07/15] drop prints --- cads_broker/database.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/cads_broker/database.py b/cads_broker/database.py index 6f82d47e..f5c7cfd0 100644 --- a/cads_broker/database.py +++ b/cads_broker/database.py @@ -366,10 +366,8 @@ def add_qos_rule_to_request( rule, session: sa.orm.Session, ): - print(request.qos_status_ids) request.qos_status_ids = request.qos_status_ids + [rule.get_uid(None)] session.add(request) - print(request.qos_status_ids) session.commit() From e037fe30b46b55f53ee76d2bae022eb90369173b Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Fri, 27 Oct 2023 15:31:37 +0200 Subject: [PATCH 08/15] test performance --- cads_broker/qos/QoS.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cads_broker/qos/QoS.py b/cads_broker/qos/QoS.py index c9bd2fd7..a15e4a87 100644 --- a/cads_broker/qos/QoS.py +++ b/cads_broker/qos/QoS.py @@ -111,7 +111,8 @@ def can_run(self, request, session): if limit.full(request): # performance. avoid interacting with db if limit is already there if limit.get_uid(request) not in request.qos_status_ids: - database.add_qos_rule_to_request(request, limit, session) + pass + # database.add_qos_rule_to_request(request, limit, session) limits.append(limit) session.commit() permissions = [] From 68185f161c6548a773886077da156e4b341673b7 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Fri, 27 Oct 2023 15:34:03 +0200 Subject: [PATCH 09/15] test performance --- cads_broker/qos/QoS.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cads_broker/qos/QoS.py b/cads_broker/qos/QoS.py index a15e4a87..782059d7 100644 --- a/cads_broker/qos/QoS.py +++ b/cads_broker/qos/QoS.py @@ -110,9 +110,8 @@ def can_run(self, request, session): for i, limit in enumerate(properties.limits): if limit.full(request): # performance. avoid interacting with db if limit is already there - if limit.get_uid(request) not in request.qos_status_ids: - pass - # database.add_qos_rule_to_request(request, limit, session) + # if limit.get_uid(request) not in request.qos_status_ids: + # database.add_qos_rule_to_request(request, limit, session) limits.append(limit) session.commit() permissions = [] From 22b74b4c93addafb3f19558be5e2d14efe736752 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Fri, 27 Oct 2023 15:35:39 +0200 Subject: [PATCH 10/15] test performance --- cads_broker/qos/QoS.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cads_broker/qos/QoS.py b/cads_broker/qos/QoS.py index 782059d7..9e480512 100644 --- a/cads_broker/qos/QoS.py +++ b/cads_broker/qos/QoS.py @@ -108,11 +108,12 @@ def can_run(self, request, session): properties = self._properties(request=request, session=session) limits = [] for i, limit in enumerate(properties.limits): - if limit.full(request): + continue + # if limit.full(request): # performance. avoid interacting with db if limit is already there # if limit.get_uid(request) not in request.qos_status_ids: # database.add_qos_rule_to_request(request, limit, session) - limits.append(limit) + # limits.append(limit) session.commit() permissions = [] for permission in properties.permissions: From e918a8a64e5868a592d334ae50883a7a68fe9694 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Fri, 27 Oct 2023 15:42:10 +0200 Subject: [PATCH 11/15] revert back --- cads_broker/qos/QoS.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/cads_broker/qos/QoS.py b/cads_broker/qos/QoS.py index 9e480512..4fb929f9 100644 --- a/cads_broker/qos/QoS.py +++ b/cads_broker/qos/QoS.py @@ -107,13 +107,12 @@ def can_run(self, request, session): """Check if a request can run.""" properties = self._properties(request=request, session=session) limits = [] - for i, limit in enumerate(properties.limits): - continue - # if limit.full(request): + for limit in properties.limits: + if limit.full(request): # performance. avoid interacting with db if limit is already there - # if limit.get_uid(request) not in request.qos_status_ids: - # database.add_qos_rule_to_request(request, limit, session) - # limits.append(limit) + if limit.get_uid(request) not in request.qos_status_ids: + database.add_qos_rule_to_request(request, limit, session) + limits.append(limit) session.commit() permissions = [] for permission in properties.permissions: From 7418a6619aac7ea25da4a3d2c8712b6411bf82e3 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Fri, 27 Oct 2023 16:11:07 +0200 Subject: [PATCH 12/15] logging --- cads_broker/qos/QoS.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cads_broker/qos/QoS.py b/cads_broker/qos/QoS.py index 4fb929f9..b8ab19ce 100644 --- a/cads_broker/qos/QoS.py +++ b/cads_broker/qos/QoS.py @@ -108,6 +108,7 @@ def can_run(self, request, session): properties = self._properties(request=request, session=session) limits = [] for limit in properties.limits: + print(f"-------> limit: {limit}") if limit.full(request): # performance. avoid interacting with db if limit is already there if limit.get_uid(request) not in request.qos_status_ids: From 72dbc3bc8bf99bd1d137d6d0017b2339466138de Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Fri, 27 Oct 2023 16:12:33 +0200 Subject: [PATCH 13/15] logging --- cads_broker/qos/QoS.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cads_broker/qos/QoS.py b/cads_broker/qos/QoS.py index b8ab19ce..117b97b4 100644 --- a/cads_broker/qos/QoS.py +++ b/cads_broker/qos/QoS.py @@ -108,7 +108,7 @@ def can_run(self, request, session): properties = self._properties(request=request, session=session) limits = [] for limit in properties.limits: - print(f"-------> limit: {limit}") + print(f"-------> limit: {request.request_uid} {limit}") if limit.full(request): # performance. avoid interacting with db if limit is already there if limit.get_uid(request) not in request.qos_status_ids: From 2a508840c59cbe2e98d949490b95321db5584093 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Fri, 27 Oct 2023 16:14:49 +0200 Subject: [PATCH 14/15] drop logging --- cads_broker/dispatcher.py | 1 - cads_broker/qos/QoS.py | 1 - 2 files changed, 2 deletions(-) diff --git a/cads_broker/dispatcher.py b/cads_broker/dispatcher.py index f57746e1..4b300fc6 100644 --- a/cads_broker/dispatcher.py +++ b/cads_broker/dispatcher.py @@ -250,7 +250,6 @@ def submit_requests(self, session: sa.orm.Session, number_of_requests: int) -> N self.submit_request(request, session=session) requests_counter += 1 if requests_counter == int(number_of_requests * WORKERS_MULTIPLIER): - logger.info(f"---------> can_run loop: {time.time() - start_can_run}") break def submit_request( diff --git a/cads_broker/qos/QoS.py b/cads_broker/qos/QoS.py index 117b97b4..4fb929f9 100644 --- a/cads_broker/qos/QoS.py +++ b/cads_broker/qos/QoS.py @@ -108,7 +108,6 @@ def can_run(self, request, session): properties = self._properties(request=request, session=session) limits = [] for limit in properties.limits: - print(f"-------> limit: {request.request_uid} {limit}") if limit.full(request): # performance. avoid interacting with db if limit is already there if limit.get_uid(request) not in request.qos_status_ids: From c5b190e229a4f519687cc5d22848aac873889df2 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Mon, 30 Oct 2023 09:29:20 +0100 Subject: [PATCH 15/15] unused variable --- cads_broker/dispatcher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cads_broker/dispatcher.py b/cads_broker/dispatcher.py index 4b300fc6..3d73aa8c 100644 --- a/cads_broker/dispatcher.py +++ b/cads_broker/dispatcher.py @@ -244,7 +244,6 @@ def submit_requests(self, session: sa.orm.Session, number_of_requests: int) -> N reverse=True, ) requests_counter = 0 - start_can_run = time.time() for request in queue: if self.qos.can_run(request, session=session): self.submit_request(request, session=session)