From 89983bd97845e1ce740d829450def814d91275ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A7alo=20Silva?= Date: Tue, 15 Apr 2025 00:38:36 +0100 Subject: [PATCH 1/2] Migrate from attrs to dataclasses --- .pre-commit-config.yaml | 4 +- poetry.lock | 88 ++++++++++++---- pyproject.toml | 166 ++++++++++++++++--------------- sqs_workers/__init__.py | 4 +- sqs_workers/async_task.py | 8 +- sqs_workers/backoff_policies.py | 2 +- sqs_workers/codecs.py | 12 +-- sqs_workers/config.py | 13 ++- sqs_workers/core.py | 6 +- sqs_workers/deadletter_queue.py | 17 ++-- sqs_workers/memory_sqs.py | 88 +++++++--------- sqs_workers/processors.py | 17 ++-- sqs_workers/queue.py | 42 ++++---- sqs_workers/shutdown_policies.py | 6 +- sqs_workers/sqs_env.py | 44 ++++---- sqs_workers/sqs_manage.py | 5 +- sqs_workers/utils.py | 3 +- tests/test_sqs.py | 9 +- 18 files changed, 276 insertions(+), 258 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 18084a1..7d3005d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,10 +13,10 @@ repos: - id: debug-statements - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.1.13 + rev: v0.11.5 hooks: # Run the linter - id: ruff - args: ['--fix'] + args: ["--fix"] # Run the formatter - id: ruff-format diff --git a/poetry.lock b/poetry.lock index c10fe18..b86a391 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,25 +1,5 @@ # This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. -[[package]] -name = "attrs" -version = "23.2.0" -description = "Classes Without Boilerplate" -optional = false -python-versions = ">=3.7" -groups = ["main"] -files = [ - {file = "attrs-23.2.0-py3-none-any.whl", hash = "sha256:99b87a485a5820b23b879f04c2305b44b951b502fd64be915879d77a7e8fc6f1"}, - {file = "attrs-23.2.0.tar.gz", hash = "sha256:935dc3b529c262f6cf76e50877d35a4bd3c1de194fd41f47a2b7ae8f19971f30"}, -] - -[package.extras] -cov = ["attrs[tests]", "coverage[toml] (>=5.3)"] -dev = ["attrs[tests]", "pre-commit"] -docs = ["furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-towncrier", "towncrier", "zope-interface"] -tests = ["attrs[tests-no-zope]", "zope-interface"] -tests-mypy = ["mypy (>=1.6) ; platform_python_implementation == \"CPython\" and python_version >= \"3.8\"", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.8\""] -tests-no-zope = ["attrs[tests-mypy]", "cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "pympler", "pytest (>=4.3.0)", "pytest-xdist[psutil]"] - [[package]] name = "boto3" version = "1.37.2" @@ -212,6 +192,72 @@ boto3 = "*" [package.extras] test = ["black", "coverage", "flake8", "isort", "localstack", "pytest"] +[[package]] +name = "mypy" +version = "1.15.0" +description = "Optional static typing for Python" +optional = false +python-versions = ">=3.9" +groups = ["dev"] +files = [ + {file = "mypy-1.15.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:979e4e1a006511dacf628e36fadfecbcc0160a8af6ca7dad2f5025529e082c13"}, + {file = "mypy-1.15.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c4bb0e1bd29f7d34efcccd71cf733580191e9a264a2202b0239da95984c5b559"}, + {file = "mypy-1.15.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:be68172e9fd9ad8fb876c6389f16d1c1b5f100ffa779f77b1fb2176fcc9ab95b"}, + {file = "mypy-1.15.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c7be1e46525adfa0d97681432ee9fcd61a3964c2446795714699a998d193f1a3"}, + {file = "mypy-1.15.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:2e2c2e6d3593f6451b18588848e66260ff62ccca522dd231cd4dd59b0160668b"}, + {file = "mypy-1.15.0-cp310-cp310-win_amd64.whl", hash = "sha256:6983aae8b2f653e098edb77f893f7b6aca69f6cffb19b2cc7443f23cce5f4828"}, + {file = "mypy-1.15.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:2922d42e16d6de288022e5ca321cd0618b238cfc5570e0263e5ba0a77dbef56f"}, + {file = "mypy-1.15.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2ee2d57e01a7c35de00f4634ba1bbf015185b219e4dc5909e281016df43f5ee5"}, + {file = "mypy-1.15.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:973500e0774b85d9689715feeffcc980193086551110fd678ebe1f4342fb7c5e"}, + {file = "mypy-1.15.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5a95fb17c13e29d2d5195869262f8125dfdb5c134dc8d9a9d0aecf7525b10c2c"}, + {file = "mypy-1.15.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:1905f494bfd7d85a23a88c5d97840888a7bd516545fc5aaedff0267e0bb54e2f"}, + {file = "mypy-1.15.0-cp311-cp311-win_amd64.whl", hash = "sha256:c9817fa23833ff189db061e6d2eff49b2f3b6ed9856b4a0a73046e41932d744f"}, + {file = "mypy-1.15.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:aea39e0583d05124836ea645f412e88a5c7d0fd77a6d694b60d9b6b2d9f184fd"}, + {file = "mypy-1.15.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:2f2147ab812b75e5b5499b01ade1f4a81489a147c01585cda36019102538615f"}, + {file = "mypy-1.15.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ce436f4c6d218a070048ed6a44c0bbb10cd2cc5e272b29e7845f6a2f57ee4464"}, + {file = "mypy-1.15.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8023ff13985661b50a5928fc7a5ca15f3d1affb41e5f0a9952cb68ef090b31ee"}, + {file = "mypy-1.15.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1124a18bc11a6a62887e3e137f37f53fbae476dc36c185d549d4f837a2a6a14e"}, + {file = "mypy-1.15.0-cp312-cp312-win_amd64.whl", hash = "sha256:171a9ca9a40cd1843abeca0e405bc1940cd9b305eaeea2dda769ba096932bb22"}, + {file = "mypy-1.15.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:93faf3fdb04768d44bf28693293f3904bbb555d076b781ad2530214ee53e3445"}, + {file = "mypy-1.15.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:811aeccadfb730024c5d3e326b2fbe9249bb7413553f15499a4050f7c30e801d"}, + {file = "mypy-1.15.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:98b7b9b9aedb65fe628c62a6dc57f6d5088ef2dfca37903a7d9ee374d03acca5"}, + {file = "mypy-1.15.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c43a7682e24b4f576d93072216bf56eeff70d9140241f9edec0c104d0c515036"}, + {file = "mypy-1.15.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:baefc32840a9f00babd83251560e0ae1573e2f9d1b067719479bfb0e987c6357"}, + {file = "mypy-1.15.0-cp313-cp313-win_amd64.whl", hash = "sha256:b9378e2c00146c44793c98b8d5a61039a048e31f429fb0eb546d93f4b000bedf"}, + {file = "mypy-1.15.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:e601a7fa172c2131bff456bb3ee08a88360760d0d2f8cbd7a75a65497e2df078"}, + {file = "mypy-1.15.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:712e962a6357634fef20412699a3655c610110e01cdaa6180acec7fc9f8513ba"}, + {file = "mypy-1.15.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f95579473af29ab73a10bada2f9722856792a36ec5af5399b653aa28360290a5"}, + {file = "mypy-1.15.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8f8722560a14cde92fdb1e31597760dc35f9f5524cce17836c0d22841830fd5b"}, + {file = "mypy-1.15.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:1fbb8da62dc352133d7d7ca90ed2fb0e9d42bb1a32724c287d3c76c58cbaa9c2"}, + {file = "mypy-1.15.0-cp39-cp39-win_amd64.whl", hash = "sha256:d10d994b41fb3497719bbf866f227b3489048ea4bbbb5015357db306249f7980"}, + {file = "mypy-1.15.0-py3-none-any.whl", hash = "sha256:5469affef548bd1895d86d3bf10ce2b44e33d86923c29e4d675b3e323437ea3e"}, + {file = "mypy-1.15.0.tar.gz", hash = "sha256:404534629d51d3efea5c800ee7c42b72a6554d6c400e6a79eafe15d11341fd43"}, +] + +[package.dependencies] +mypy_extensions = ">=1.0.0" +tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} +typing_extensions = ">=4.6.0" + +[package.extras] +dmypy = ["psutil (>=4.0)"] +faster-cache = ["orjson"] +install-types = ["pip"] +mypyc = ["setuptools (>=50)"] +reports = ["lxml"] + +[[package]] +name = "mypy-extensions" +version = "1.0.0" +description = "Type system extensions for programs checked with the mypy type checker." +optional = false +python-versions = ">=3.5" +groups = ["dev"] +files = [ + {file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"}, + {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, +] + [[package]] name = "nodeenv" version = "1.9.1" @@ -627,4 +673,4 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess [metadata] lock-version = "2.1" python-versions = "^3.9,<3.14" -content-hash = "a5187df6b9cce410e678a1fee89aa77ea3335a526522e71894fb1f61d29747b4" +content-hash = "3da5ed3088c13899968d3f24b9eefb47d0060c60ca91245b16ad06207479d7f3" diff --git a/pyproject.toml b/pyproject.toml index 5ef98fe..6d4efc8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,15 +5,15 @@ description = "An opinionated queue processor for Amazon SQS" authors = ["Doist Developers "] license = "MIT" classifiers = [ - "Development Status :: 5 - Production/Stable", - "License :: OSI Approved :: MIT License", - "Programming Language :: Python", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", - "Programming Language :: Python :: 3.12", + "Development Status :: 5 - Production/Stable", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", ] homepage = "https://github.com/Doist/sqs-workers" readme = "README.md" @@ -22,15 +22,15 @@ exclude = ['tests/'] [tool.poetry.dependencies] python = "^3.9,<3.14" boto3 = "^1.27.33" -attrs = "^23.1.0" [tool.poetry.group.dev.dependencies] -pytest-runner = "^6.0.1" +tox = "^4.15.1" pytest = "^8.3.5" +pytest-runner = "^6.0.1" localstack-client = "^2.10" pre-commit = "^4.2.0" +mypy = "1.15" ruff = "^0.11.5" -tox = "^4.15.1" typing-extensions = "^4.13.2" [build-system] @@ -38,34 +38,45 @@ requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" [tool.ruff] +extend-exclude = ["env", "runtime"] + +[tool.ruff.lint] select = [ - "ASYNC", # flake8-async - "C4", # flake8-comprehensions - "D", # pydocstyle, - "E", "W", # pycodestyle - "F", # pyflakes - "I", # isort - "PL", # pylint - "RUF", # ruff - "S", # flake8-bandit - "SIM", # flake8-simplify - "UP", # pyupgrade + "ASYNC", # flake8-async + "C4", # flake8-comprehensions + "D", # pydocstyle, + "E", # pycodestyle errors + "W", # pycodestyle warnings + "W", # pycodestyle + "F", # pyflakes + "I", # isort + "PL", # pylint + "RUF", # ruff + "S", # flake8-bandit + "SIM", # flake8-simplify + "UP", # pyupgrade ] -# By default, always show source code snippets. -show-source = true - ignore = [ ## D - pydocstyle ## # D1XX errors are OK. Don't force people into over-documenting. - "D100", "D101", "D102", "D103", "D104", "D105", "D107", + "D100", + "D101", + "D102", + "D103", + "D104", + "D105", + "D107", # These need to be fixed. - "D202", "D205", "D400", "D401", + "D202", + "D205", + "D400", + "D401", ## E / W - pycodestyle ## - "E501", # line too long - "E203", # whitespace-before-punctuation - "E741", # ambiguous variable name + "E501", # line too long + "E203", # whitespace-before-punctuation + "E741", # ambiguous variable name ## PL - pylint ## # Commented-out rules are rules that we disable in pylint but are not supported by ruff yet. @@ -73,7 +84,7 @@ ignore = [ # Import order issues # "PLC0411", # wrong-import-order # "PLC0412", # wrong-import-position - "PLC0414", # ungrouped-imports + "PLC0414", # ungrouped-imports # Documentation issues # "C0114", # missing-module-docstring @@ -83,69 +94,64 @@ ignore = [ # "PLC0302", # too-many-lines # "PLR1702", # too-many-nested-blocks # "PLR0902", # too-many-instance-attributes - "PLR0911", # too-many-return-statements - "PLR0915", # too-many-statements - "PLR0912", # too-many-branches + "PLR0911", # too-many-return-statements + "PLR0915", # too-many-statements + "PLR0912", # too-many-branches # "PLR0903", # too-few-public-methods # "PLR0914", # too-many-locals # "PLC0301", # line-too-long - "PLR0913", # too-many-arguments - "PLR2004", # magic-value-comparison - "PLR5501", # collapsible-else-if - "PLW0603", # global-statement - "PLW2901", # redefined-loop-name - "PLC1901", # compare-to-empty-string + "PLR0913", # too-many-arguments + "PLR2004", # magic-value-comparison + "PLR5501", # collapsible-else-if + "PLW0603", # global-statement + "PLW2901", # redefined-loop-name + "PLC1901", # compare-to-empty-string ## RUF - ruff ## - "RUF001", # ambiguous-unicode-character-string - "RUF002", # ambiguous-unicode-character-docstring - "RUF003", # ambiguous-unicode-character-comment - "RUF012", # mutable-class-default + "RUF001", # ambiguous-unicode-character-string + "RUF002", # ambiguous-unicode-character-docstring + "RUF003", # ambiguous-unicode-character-comment + "RUF012", # mutable-class-default # Enable when Poetry supports PEP 621 and we migrate our confguration to it. # See: https://github.com/python-poetry/poetry-core/pull/567 "RUF200", - "S101", # assert - "S104", # hardcoded-bind-all-interfaces - "S105", # hardcoded-password-string - "S106", # hardcoded-password-func-arg - "S107", # hardcoded-password-default - "S110", # try-except-pass - "S301", # suspicious-pickle-usage - "S303", # suspicious-insecure-hash-usage - "S310", # suspicious-url-open-usage - "S311", # suspicious-non-cryptographic-random-usage - "S324", # hashlib-insecure-hash-function - "S603", # subprocess-without-shell-equals-true - "S607", # start-process-with-partial-path - "S608", # hardcoded-sql-expression + "S101", # assert + "S104", # hardcoded-bind-all-interfaces + "S105", # hardcoded-password-string + "S106", # hardcoded-password-func-arg + "S107", # hardcoded-password-default + "S110", # try-except-pass + "S301", # suspicious-pickle-usage + "S303", # suspicious-insecure-hash-usage + "S310", # suspicious-url-open-usage + "S311", # suspicious-non-cryptographic-random-usage + "S324", # hashlib-insecure-hash-function + "S603", # subprocess-without-shell-equals-true + "S607", # start-process-with-partial-path + "S608", # hardcoded-sql-expression ## SIM - flake8-simplify ## - "SIM102", # collapsible-if - "SIM105", # suppressible-exception - "SIM108", # if-else-block-instead-of-if-exp - "SIM114", # if-with-same-arms - "SIM116", # if-else-block-instead-of-dict-lookup - "SIM117", # multiple-with-statements + "SIM102", # collapsible-if + "SIM105", # suppressible-exception + "SIM108", # if-else-block-instead-of-if-exp + "SIM114", # if-with-same-arms + "SIM116", # if-else-block-instead-of-dict-lookup + "SIM117", # multiple-with-statements # Enable when the rule is out of preview and false-positives are handled. # See: https://docs.astral.sh/ruff/rules/in-dict-keys/ - "SIM118", # in-dict-keys -] - -extend-exclude = [ - "env", - "runtime", + "SIM118", # in-dict-keys ] -[tool.ruff.per-file-ignores] +[tool.ruff.lint.per-file-ignores] # These files have only a bunch of imports in them to force code loading. "todoist/workers/todoist_handlers.py" = ["F401"] "todoist/signals/signal_handlers.py" = ["F401"] "todoist/workers/scripts_handlers.py" = ["F401"] -"scripts/**" = ["S101"] # Allow assert statement in scripts. -"tests/**" = ["S101"] # Allow assert statement in tests. +"scripts/**" = ["S101"] # Allow assert statement in scripts. +"tests/**" = ["S101"] # Allow assert statement in tests. # We allow module-level imports to be not at the top of scripts, cron, # and configs because we initialize Todoist environment there @@ -160,13 +166,9 @@ extend-exclude = [ # Importing and using the fixture makes it be shadowed. "test_*.py" = ["F401", "F811"] -"tests/minimal/conftest.py" = [ - "E402", - "F401", - "F403", -] +"tests/minimal/conftest.py" = ["E402", "F401", "F403"] -[tool.ruff.isort] +[tool.ruff.lint.isort] section-order = [ "future", "standard-library", @@ -178,12 +180,12 @@ section-order = [ "local-folder", ] -[tool.ruff.isort.sections] +[tool.ruff.lint.isort.sections] "parts" = ["parts"] "td-models" = ["todoist.models"] "td-apps" = ["todoist.apps"] -[tool.ruff.pydocstyle] +[tool.ruff.lint.pydocstyle] convention = "pep257" [tool.ruff.lint.pyupgrade] diff --git a/sqs_workers/__init__.py b/sqs_workers/__init__.py index f75e6d0..0acb828 100644 --- a/sqs_workers/__init__.py +++ b/sqs_workers/__init__.py @@ -19,11 +19,11 @@ "IMMEDIATE_RETURN", "ConstantBackoff", "ExponentialBackoff", - "SQSError", - "MemorySession", "JobQueue", + "MemorySession", "RawQueue", "SQSEnv", + "SQSError", "create_fifo_queue", "create_standard_queue", "delete_queue", diff --git a/sqs_workers/async_task.py b/sqs_workers/async_task.py index c0185ac..3476c59 100644 --- a/sqs_workers/async_task.py +++ b/sqs_workers/async_task.py @@ -1,11 +1,11 @@ from __future__ import annotations +from collections.abc import Generator from contextlib import contextmanager from typing import ( TYPE_CHECKING, Any, Callable, - Generator, Generic, NoReturn, Optional, @@ -63,10 +63,10 @@ def run(self, *args: P.args, **kwargs: P.kwargs) -> Any: if len(args) > 0: raise TypeError("Must use keyword arguments only for batch read queues") kwargs = bind_arguments(self.processor, [[kwargs]], {}) - return self.processor(**kwargs) + return self.processor(**kwargs) # type:ignore[call-arg] else: kwargs = bind_arguments(self.processor, args, kwargs) - return self.processor(**kwargs) + return self.processor(**kwargs) # type:ignore[call-arg] @contextmanager def batch(self) -> Generator[None, None, None]: @@ -118,4 +118,4 @@ def delay(self) -> None: self.async_task.delay(*self.args, **self.kwargs) def __repr__(self) -> str: - return "BakedAsyncTask(%r, ...)" % self.async_task + return f"BakedAsyncTask({self.async_task!r}, ...)" diff --git a/sqs_workers/backoff_policies.py b/sqs_workers/backoff_policies.py index 0dcaae0..dcbd61b 100644 --- a/sqs_workers/backoff_policies.py +++ b/sqs_workers/backoff_policies.py @@ -42,7 +42,7 @@ def get_visibility_timeout(self, message) -> int: visibility_timeout = random.normalvariate(mu, sigma) visibility_timeout = max(self.min_visibility_timeout, visibility_timeout) visibility_timeout = min(self.max_visibility_timeout, visibility_timeout) - return int(round(visibility_timeout)) + return round(visibility_timeout) DEFAULT_BACKOFF = ExponentialBackoff() diff --git a/sqs_workers/codecs.py b/sqs_workers/codecs.py index ac012b3..f4695d0 100644 --- a/sqs_workers/codecs.py +++ b/sqs_workers/codecs.py @@ -2,7 +2,7 @@ import json import pickle import zlib -from typing import Any, ClassVar, Dict, Protocol, Type +from typing import Any, ClassVar, Protocol DEFAULT_CONTENT_TYPE = "pickle_compat" @@ -12,12 +12,10 @@ class Codec(Protocol): @classmethod - def serialize(cls, message: Any) -> str: - ... + def serialize(cls, message: Any) -> str: ... @classmethod - def deserialize(cls, serialized: str) -> Any: - ... + def deserialize(cls, serialized: str) -> Any: ... class JSONCodec: @@ -50,11 +48,11 @@ class PickleCompatCodec(PickleCodec): protocol = PICKLE_PY2_COMPAT_PROTO -def get_codec(content_type: str) -> Type[Codec]: +def get_codec(content_type: str) -> type[Codec]: return CONTENT_TYPES_CODECS[content_type] -CONTENT_TYPES_CODECS: Dict[str, Type[Codec]] = { +CONTENT_TYPES_CODECS: dict[str, type[Codec]] = { "json": JSONCodec, "pickle": PickleCodec, "pickle_compat": PickleCompatCodec, diff --git a/sqs_workers/config.py b/sqs_workers/config.py index b8ec5a2..ec4bc63 100644 --- a/sqs_workers/config.py +++ b/sqs_workers/config.py @@ -1,6 +1,5 @@ -from typing import Any, Dict, Optional - -import attr +from dataclasses import dataclass, field +from typing import Any, Optional from sqs_workers.utils import ( instantiate_from_dict, @@ -9,13 +8,13 @@ ) -@attr.s(frozen=True) +@dataclass(frozen=True) class Config: """Config object with hierarchy support.""" - parent: Optional["Config"] = attr.ib(repr=False, default=None) - options: Dict[str, Any] = attr.ib(factory=dict) - maker_key = attr.ib(default="maker") + parent: Optional["Config"] = field(repr=False, default=None) + options: dict[str, Any] = field(default_factory=dict) + maker_key: str = field(default="maker") def __setitem__(self, key: str, value): self.options.__setitem__(key, value) diff --git a/sqs_workers/core.py b/sqs_workers/core.py index c43aada..8d54d38 100644 --- a/sqs_workers/core.py +++ b/sqs_workers/core.py @@ -28,11 +28,7 @@ def total_count(self) -> int: return self.succeeded_count() + self.failed_count() def __repr__(self) -> str: - return "".format( - self.queue_name, - self.succeeded_count(), - self.failed_count(), - ) + return f"" class RedrivePolicy: diff --git a/sqs_workers/deadletter_queue.py b/sqs_workers/deadletter_queue.py index 6575407..680fe96 100644 --- a/sqs_workers/deadletter_queue.py +++ b/sqs_workers/deadletter_queue.py @@ -1,8 +1,7 @@ import logging +from dataclasses import dataclass, field from functools import partial -from typing import TYPE_CHECKING - -import attr +from typing import TYPE_CHECKING, Optional from sqs_workers import RawQueue @@ -13,7 +12,7 @@ logger = logging.getLogger(__name__) -@attr.s +@dataclass class DeadLetterQueue(RawQueue): """ Queue to push back messages to the upstream. @@ -36,7 +35,7 @@ class DeadLetterQueue(RawQueue): and quit. """ - upstream_queue: "GenericQueue" = attr.ib(default=None) + upstream_queue: Optional["GenericQueue"] = field(default=None) @classmethod def maker(cls, upstream_queue, **kwargs): @@ -46,11 +45,15 @@ def maker(cls, upstream_queue, **kwargs): ) -@attr.s +@dataclass class PushBackSender: - upstream_queue: "GenericQueue" = attr.ib(default=None) + upstream_queue: Optional["GenericQueue"] = field(default=None) def __call__(self, message): + # We know upstream_queue is set when called + if self.upstream_queue is None: + raise ValueError("upstream_queue not set") + kwargs = { "MessageBody": message.body, "MessageAttributes": message.message_attributes or {}, diff --git a/sqs_workers/memory_sqs.py b/sqs_workers/memory_sqs.py index 471487f..59c0570 100644 --- a/sqs_workers/memory_sqs.py +++ b/sqs_workers/memory_sqs.py @@ -1,43 +1,25 @@ -""" -In memory mockup implementation of essential parts of boto3 SQS objects. - -Used for faster and more predictable processing in tests. - -Lacks some features of "real sqs", and some other features implemented -ineffectively. - -- Redrive policy doesn't work -- There is no differences between standard and FIFO queues -- FIFO queues don't support content-based deduplication -- Delayed tasks executed ineffectively: the task is gotten from the queue, - and if the time hasn't come yet, the task is put back. -- API can return slightly different results - -Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html -""" import logging import uuid +from dataclasses import dataclass, field, replace from datetime import datetime, timedelta, timezone -from typing import Any, Dict, List - -import attr +from typing import Any, Optional logger = logging.getLogger(__name__) -@attr.s +@dataclass class MemoryAWS: """In-memory AWS as a whole.""" - client: "MemoryClient" = attr.ib( - repr=False, - default=attr.Factory(lambda self: MemoryClient(self), takes_self=True), - ) - resource: "ServiceResource" = attr.ib( - repr=False, - default=attr.Factory(lambda self: ServiceResource(self), takes_self=True), - ) - queues: List["MemoryQueue"] = attr.ib(factory=list) + client: Optional["MemoryClient"] = field(repr=False, default=None) + resource: Optional["ServiceResource"] = field(repr=False, default=None) + queues: list["MemoryQueue"] = field(default_factory=list) + + def __post_init__(self): + if self.client is None: + self.client = MemoryClient(self) + if self.resource is None: + self.resource = ServiceResource(self) def create_queue(self, QueueName: str, Attributes) -> "MemoryQueue": queue = MemoryQueue(self, QueueName, Attributes) @@ -48,11 +30,11 @@ def delete_queue(self, QueueUrl: str) -> None: self.queues = [queue for queue in self.queues if queue.url != QueueUrl] -@attr.s +@dataclass class MemorySession: """In memory AWS session.""" - aws = attr.ib(repr=False, factory=MemoryAWS) + aws: MemoryAWS = field(repr=False, default_factory=MemoryAWS) def client(self, service_name: str, **kwargs): assert service_name == "sqs" @@ -63,9 +45,9 @@ def resource(self, service_name: str, **kwargs): return self.aws.resource -@attr.s +@dataclass class MemoryClient: - aws = attr.ib(repr=False) + aws: Any = field(repr=False) def create_queue(self, QueueName: str, Attributes): return self.aws.create_queue(QueueName, Attributes) @@ -83,9 +65,9 @@ def list_queues(self, QueueNamePrefix=""): } -@attr.s +@dataclass class ServiceResource: - aws: MemoryAWS = attr.ib(repr=False) + aws: MemoryAWS = field(repr=False) def create_queue(self, QueueName: str, Attributes): return self.aws.create_queue(QueueName, Attributes) @@ -97,7 +79,7 @@ def get_queue_by_name(self, QueueName: str): return None -@attr.s +@dataclass class MemoryQueue: """ In-memory queue which mimics the subset of SQS Queue object. @@ -106,13 +88,13 @@ class MemoryQueue: services/sqs.html#queue """ - aws: MemoryAWS = attr.ib(repr=False) - name: str = attr.ib() - attributes: Dict[str, Dict[str, str]] = attr.ib() - messages: List["MemoryMessage"] = attr.ib(factory=list) - in_flight: Dict[str, "MemoryMessage"] = attr.ib(factory=dict) + aws: MemoryAWS = field(repr=False) + name: str = field() + attributes: dict[str, dict[str, str]] = field() + messages: list["MemoryMessage"] = field(default_factory=list) + in_flight: dict[str, "MemoryMessage"] = field(default_factory=dict) - def __attrs_post_init__(self): + def __post_init__(self): self.attributes["QueueArn"] = self.name @property @@ -205,7 +187,7 @@ def change_message_visibility_batch(self, Entries): in_flight_message = self.in_flight[e["Id"]] sec = int(e["VisibilityTimeout"]) visible_at = now + timedelta(seconds=sec) - updated_message = attr.evolve(in_flight_message, visible_at=visible_at) + updated_message = replace(in_flight_message, visible_at=visible_at) self.in_flight[e["Id"]] = updated_message else: not_found_entries.append(e) @@ -225,7 +207,7 @@ def __getitem__(self, item): return self.__dict__()[item] -@attr.s(frozen=True) +@dataclass(frozen=True) class MemoryMessage: """ A mock class to mimic the AWS message @@ -234,26 +216,26 @@ class MemoryMessage: services/sqs.html#SQS.Message """ - queue_impl: MemoryQueue = attr.ib(repr=False) + queue_impl: MemoryQueue = field(repr=False) # The message's contents (not URL-encoded). - body: bytes = attr.ib() + body: bytes = field() # Each message attribute consists of a Name, Type, and Value. - message_attributes: Dict[str, Dict[str, str]] = attr.ib(factory=dict) + message_attributes: dict[str, dict[str, str]] = field(default_factory=dict) # A map of the attributes requested in `` ReceiveMessage `` to their # respective values. - attributes: Dict[str, Any] = attr.ib(factory=dict) + attributes: dict[str, Any] = field(default_factory=dict) # Internal attribute which contains the execution time. - visible_at: datetime = attr.ib(factory=lambda: datetime.now(tz=timezone.utc)) + visible_at: datetime = field(default_factory=lambda: datetime.now(tz=timezone.utc)) # A unique identifier for the message - message_id: str = attr.ib(factory=lambda: uuid.uuid4().hex) + message_id: str = field(default_factory=lambda: uuid.uuid4().hex) # The Message's receipt_handle identifier - receipt_handle: str = attr.ib(factory=lambda: uuid.uuid4().hex) + receipt_handle: str = field(default_factory=lambda: uuid.uuid4().hex) @classmethod def from_kwargs(cls, queue_impl, kwargs): @@ -289,7 +271,7 @@ def change_visibility(self, VisibilityTimeout="0", **kwargs): now = datetime.now(tz=timezone.utc) sec = int(VisibilityTimeout) visible_at = now + timedelta(seconds=sec) - updated_message = attr.evolve(self, visible_at=visible_at) + updated_message = replace(self, visible_at=visible_at) self.queue_impl.in_flight[self.message_id] = updated_message else: logger.warning("Tried to change visibility of message not in flight") diff --git a/sqs_workers/processors.py b/sqs_workers/processors.py index f732cd2..51e4c71 100644 --- a/sqs_workers/processors.py +++ b/sqs_workers/processors.py @@ -1,9 +1,8 @@ import logging +from dataclasses import dataclass, field, replace from functools import partial from typing import TYPE_CHECKING, Callable, Optional -import attr - from sqs_workers import codecs from sqs_workers.context import SQSContext from sqs_workers.utils import validate_arguments @@ -16,7 +15,7 @@ DEFAULT_CONTEXT_VAR = "context" -@attr.s +@dataclass class Processor: """ Processor which calls its function for each incoming message. @@ -25,11 +24,11 @@ class Processor: SQS message """ - queue: "GenericQueue" = attr.ib() - fn: Optional[Callable] = attr.ib(default=None) - job_name: str = attr.ib(default="") - pass_context: bool = attr.ib(default=False) - context_var: str = attr.ib(default=DEFAULT_CONTEXT_VAR) + queue: "GenericQueue" = field() + fn: Optional[Callable] = field(default=None) + job_name: str = field(default="") + pass_context: bool = field(default=False) + context_var: str = field(default=DEFAULT_CONTEXT_VAR) @classmethod def maker(cls, **kwargs): @@ -111,7 +110,7 @@ def copy(self, **kwargs): Create a new instance of the processor, optionally updating arguments of the constructor from update_kwargs """ - return attr.evolve(self, **kwargs) + return replace(self, **kwargs) @staticmethod def deserialize_message(message): diff --git a/sqs_workers/queue.py b/sqs_workers/queue.py index 9caff9a..c88e05c 100644 --- a/sqs_workers/queue.py +++ b/sqs_workers/queue.py @@ -2,23 +2,19 @@ import logging import uuid from collections import defaultdict +from collections.abc import Generator, Iterable from contextlib import contextmanager +from dataclasses import dataclass, field from functools import partial from typing import ( TYPE_CHECKING, Any, Callable, - Dict, - Generator, - Iterable, - List, Literal, Optional, - Tuple, TypeVar, ) -import attr from typing_extensions import ParamSpec from sqs_workers import DEFAULT_BACKOFF, codecs @@ -50,13 +46,13 @@ def __init__(self, errors): self.errors = errors -@attr.s +@dataclass class GenericQueue: - env: "SQSEnv" = attr.ib(repr=False) - name: str = attr.ib() - backoff_policy: BackoffPolicy = attr.ib(default=DEFAULT_BACKOFF) - batching_policy: BatchingConfiguration = attr.ib(default=NoBatching()) - _queue = attr.ib(repr=False, default=None) + env: "SQSEnv" = field(repr=False) + name: str = field() + backoff_policy: BackoffPolicy = field(default=DEFAULT_BACKOFF) + batching_policy: BatchingConfiguration = field(default_factory=lambda: NoBatching()) + _queue: Any = field(repr=False, default=None) @classmethod def maker(cls, **kwargs): @@ -110,7 +106,7 @@ def process_batch(self, wait_seconds: int = 0) -> BatchProcessingResult: return self._handle_processed(messages_with_success) - def _handle_processed(self, messages_with_success: Iterable[Tuple[Any, bool]]): + def _handle_processed(self, messages_with_success: Iterable[tuple[Any, bool]]): """ Handles the results of processing messages. @@ -182,7 +178,7 @@ def process_message(self, message: Any) -> bool: """ raise NotImplementedError() - def process_messages(self, messages: List[Any]) -> bool: + def process_messages(self, messages: list[Any]) -> bool: """ Process a batch of messages. @@ -191,7 +187,7 @@ def process_messages(self, messages: List[Any]) -> bool: """ raise NotImplementedError() - def get_raw_messages(self, wait_seconds: int, max_messages: int = 10) -> List[Any]: + def get_raw_messages(self, wait_seconds: int, max_messages: int = 10) -> list[Any]: """Return raw messages from the queue, addressed by its name""" queue = self.get_queue() @@ -264,9 +260,9 @@ def get_queue(self): return self._queue -@attr.s +@dataclass class RawQueue(GenericQueue): - processor: Optional[Callable] = attr.ib(default=None) + processor: Optional[Callable] = field(default=None) def raw_processor(self): """ @@ -356,7 +352,7 @@ def process_message(self, message: Any) -> bool: else: return True - def process_messages(self, messages: List[Any]) -> bool: + def process_messages(self, messages: list[Any]) -> bool: """ Sends a list of messages to the call handler @@ -393,12 +389,12 @@ def process_messages(self, messages: List[Any]) -> bool: return True -@attr.s +@dataclass class JobQueue(GenericQueue): - processors: Dict[str, Processor] = attr.ib(factory=dict) + processors: dict[str, Processor] = field(default_factory=dict) - _batch_level: int = attr.ib(default=0, repr=False) - _batched_messages: List[Dict] = attr.ib(factory=list, repr=False) + _batch_level: int = field(default=0, repr=False) + _batched_messages: list[dict] = field(default_factory=list, repr=False) def processor( self, job_name: str, pass_context: bool = False, context_var=DEFAULT_CONTEXT_VAR @@ -657,7 +653,7 @@ def process_message(self, message: Any) -> bool: else: return self.process_message_fallback(job_name) # type: ignore - def process_messages(self, messages: List[Any]) -> bool: + def process_messages(self, messages: list[Any]) -> bool: """ Sends a list of messages to the call handler diff --git a/sqs_workers/shutdown_policies.py b/sqs_workers/shutdown_policies.py index 5540b87..6e8eedc 100644 --- a/sqs_workers/shutdown_policies.py +++ b/sqs_workers/shutdown_policies.py @@ -3,11 +3,9 @@ class ShutdownPolicy(Protocol): - def update_state(self, batch_processing_result) -> None: - ... + def update_state(self, batch_processing_result) -> None: ... - def need_shutdown(self) -> bool: - ... + def need_shutdown(self) -> bool: ... class NeverShutdown: diff --git a/sqs_workers/sqs_env.py b/sqs_workers/sqs_env.py index 9aea541..b12de17 100644 --- a/sqs_workers/sqs_env.py +++ b/sqs_workers/sqs_env.py @@ -1,19 +1,17 @@ import logging import multiprocessing import warnings +from dataclasses import dataclass, field from typing import ( TYPE_CHECKING, Any, Callable, - Dict, Optional, - Type, TypeVar, Union, overload, ) -import attr import boto3 from botocore.config import Config from typing_extensions import ParamSpec @@ -40,30 +38,30 @@ R = TypeVar("R") -@attr.s +@dataclass class SQSEnv: - session = attr.ib(default=boto3) - queue_prefix = attr.ib(default="") - codec: str = attr.ib(default=codecs.DEFAULT_CONTENT_TYPE) + session: Any = field(default=boto3) + queue_prefix: str = field(default="") + codec: str = field(default=codecs.DEFAULT_CONTENT_TYPE) # retry settings for internal boto - retry_max_attempts: int = attr.ib(default=3) - retry_mode: str = attr.ib(default="standard") + retry_max_attempts: int = field(default=3) + retry_mode: str = field(default="standard") # queue-specific settings - backoff_policy = attr.ib(default=DEFAULT_BACKOFF) + backoff_policy: Any = field(default=DEFAULT_BACKOFF) # jobqueue-specific settings - processor_maker = attr.ib(default=processors.Processor) - context_maker = attr.ib(default=context.SQSContext) + processor_maker: Any = field(default=processors.Processor) + context_maker: Any = field(default=context.SQSContext) # internal attributes - context = attr.ib(default=None) - sqs_client = attr.ib(default=None) - sqs_resource = attr.ib(default=None) - queues: Dict[str, AnyQueue] = attr.ib(init=False, factory=dict) + context: Any = field(default=None) + sqs_client: Any = field(default=None) + sqs_resource: Any = field(default=None) + queues: dict[str, AnyQueue] = field(default_factory=dict) - def __attrs_post_init__(self): + def __post_init__(self): self.context = self.context_maker() # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html retry_dict = {"max_attempts": self.retry_max_attempts, "mode": self.retry_mode} @@ -77,26 +75,24 @@ def __attrs_post_init__(self): def queue( self, queue_name: str, - queue_maker: Union[Type[JobQueue], Callable[..., JobQueue]] = JobQueue, + queue_maker: Union[type[JobQueue], Callable[..., JobQueue]] = JobQueue, batching_policy: BatchingConfiguration = NoBatching(), backoff_policy: Optional["BackoffPolicy"] = None, - ) -> JobQueue: - ... + ) -> JobQueue: ... @overload def queue( self, queue_name: str, - queue_maker: Union[Type[AnyQueueT], Callable[..., AnyQueueT]], + queue_maker: Union[type[AnyQueueT], Callable[..., AnyQueueT]], batching_policy: BatchingConfiguration = NoBatching(), backoff_policy: Optional["BackoffPolicy"] = None, - ) -> AnyQueueT: - ... + ) -> AnyQueueT: ... def queue( self, queue_name: str, - queue_maker: Union[Type[AnyQueue], Callable[..., AnyQueue]] = JobQueue, + queue_maker: Union[type[AnyQueue], Callable[..., AnyQueue]] = JobQueue, batching_policy: BatchingConfiguration = NoBatching(), backoff_policy: Optional["BackoffPolicy"] = None, ) -> AnyQueue: diff --git a/sqs_workers/sqs_manage.py b/sqs_workers/sqs_manage.py index 4081876..2f687e5 100644 --- a/sqs_workers/sqs_manage.py +++ b/sqs_workers/sqs_manage.py @@ -1,5 +1,6 @@ """Helper functions to create and delete queues on SQS.""" -from typing import Any, Dict + +from typing import Any def create_standard_queue( @@ -10,7 +11,7 @@ def create_standard_queue( redrive_policy=None, ): """Create a new standard queue""" - attrs: Dict[str, Any] = {} + attrs: dict[str, Any] = {} kwargs = {"QueueName": env.get_sqs_queue_name(queue_name), "Attributes": attrs} if message_retention_period is not None: attrs["MessageRetentionPeriod"] = str(message_retention_period) diff --git a/sqs_workers/utils.py b/sqs_workers/utils.py index d29af08..cd2a51d 100644 --- a/sqs_workers/utils.py +++ b/sqs_workers/utils.py @@ -1,8 +1,9 @@ import importlib import logging +from collections.abc import Iterable from inspect import Signature from itertools import islice -from typing import Any, Iterable +from typing import Any logger = logging.getLogger(__name__) diff --git a/tests/test_sqs.py b/tests/test_sqs.py index 853995f..9dbbd66 100644 --- a/tests/test_sqs.py +++ b/tests/test_sqs.py @@ -1,5 +1,6 @@ +import contextlib import time -from typing import Dict, List, Optional +from typing import Optional import botocore import pytest @@ -18,9 +19,9 @@ from sqs_workers.processors import Processor from sqs_workers.queue import RawQueue -worker_results: Dict[str, Optional[str]] = {"say_hello": None} +worker_results: dict[str, Optional[str]] = {"say_hello": None} -batch_results: List[str] = [] +batch_results: list[str] = [] def raise_exception(username="Anonymous"): @@ -143,7 +144,7 @@ def test_batch_should_keep_messages_until_overflow(sqs, queue_name): with say_hello_task.batch(): # no messages after 9 tasks for n in range(9): - say_hello_task.delay(username="Homer %d" % n) + say_hello_task.delay(username=f"Homer {n}") assert len(queue.get_raw_messages(0)) == 0 # 2 more: overflow, the first 10 messages are added to the queue From 52398594d09786bb9deab29570d2761cc466b372 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A7alo=20Silva?= Date: Tue, 15 Apr 2025 14:39:31 +0100 Subject: [PATCH 2/2] chore: Simplify dataclass field definition with default value --- sqs_workers/config.py | 2 +- sqs_workers/deadletter_queue.py | 6 +++--- sqs_workers/processors.py | 8 ++++---- sqs_workers/queue.py | 6 +++--- sqs_workers/sqs_env.py | 22 +++++++++++----------- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/sqs_workers/config.py b/sqs_workers/config.py index ec4bc63..93e5978 100644 --- a/sqs_workers/config.py +++ b/sqs_workers/config.py @@ -14,7 +14,7 @@ class Config: parent: Optional["Config"] = field(repr=False, default=None) options: dict[str, Any] = field(default_factory=dict) - maker_key: str = field(default="maker") + maker_key: str = "maker" def __setitem__(self, key: str, value): self.options.__setitem__(key, value) diff --git a/sqs_workers/deadletter_queue.py b/sqs_workers/deadletter_queue.py index 680fe96..0e7a2d1 100644 --- a/sqs_workers/deadletter_queue.py +++ b/sqs_workers/deadletter_queue.py @@ -1,5 +1,5 @@ import logging -from dataclasses import dataclass, field +from dataclasses import dataclass from functools import partial from typing import TYPE_CHECKING, Optional @@ -35,7 +35,7 @@ class DeadLetterQueue(RawQueue): and quit. """ - upstream_queue: Optional["GenericQueue"] = field(default=None) + upstream_queue: Optional["GenericQueue"] = None @classmethod def maker(cls, upstream_queue, **kwargs): @@ -47,7 +47,7 @@ def maker(cls, upstream_queue, **kwargs): @dataclass class PushBackSender: - upstream_queue: Optional["GenericQueue"] = field(default=None) + upstream_queue: Optional["GenericQueue"] = None def __call__(self, message): # We know upstream_queue is set when called diff --git a/sqs_workers/processors.py b/sqs_workers/processors.py index 51e4c71..eae758f 100644 --- a/sqs_workers/processors.py +++ b/sqs_workers/processors.py @@ -25,10 +25,10 @@ class Processor: """ queue: "GenericQueue" = field() - fn: Optional[Callable] = field(default=None) - job_name: str = field(default="") - pass_context: bool = field(default=False) - context_var: str = field(default=DEFAULT_CONTEXT_VAR) + fn: Optional[Callable] = None + job_name: str = "" + pass_context: bool = False + context_var: str = DEFAULT_CONTEXT_VAR @classmethod def maker(cls, **kwargs): diff --git a/sqs_workers/queue.py b/sqs_workers/queue.py index c88e05c..c61d83e 100644 --- a/sqs_workers/queue.py +++ b/sqs_workers/queue.py @@ -50,7 +50,7 @@ def __init__(self, errors): class GenericQueue: env: "SQSEnv" = field(repr=False) name: str = field() - backoff_policy: BackoffPolicy = field(default=DEFAULT_BACKOFF) + backoff_policy: BackoffPolicy = DEFAULT_BACKOFF batching_policy: BatchingConfiguration = field(default_factory=lambda: NoBatching()) _queue: Any = field(repr=False, default=None) @@ -262,7 +262,7 @@ def get_queue(self): @dataclass class RawQueue(GenericQueue): - processor: Optional[Callable] = field(default=None) + processor: Optional[Callable] = None def raw_processor(self): """ @@ -393,7 +393,7 @@ def process_messages(self, messages: list[Any]) -> bool: class JobQueue(GenericQueue): processors: dict[str, Processor] = field(default_factory=dict) - _batch_level: int = field(default=0, repr=False) + _batch_level: int = field(repr=False, default=0) _batched_messages: list[dict] = field(default_factory=list, repr=False) def processor( diff --git a/sqs_workers/sqs_env.py b/sqs_workers/sqs_env.py index b12de17..1690323 100644 --- a/sqs_workers/sqs_env.py +++ b/sqs_workers/sqs_env.py @@ -40,25 +40,25 @@ @dataclass class SQSEnv: - session: Any = field(default=boto3) - queue_prefix: str = field(default="") - codec: str = field(default=codecs.DEFAULT_CONTENT_TYPE) + session: Any = boto3 + queue_prefix: str = "" + codec: str = codecs.DEFAULT_CONTENT_TYPE # retry settings for internal boto - retry_max_attempts: int = field(default=3) - retry_mode: str = field(default="standard") + retry_max_attempts: int = 3 + retry_mode: str = "standard" # queue-specific settings - backoff_policy: Any = field(default=DEFAULT_BACKOFF) + backoff_policy: Any = DEFAULT_BACKOFF # jobqueue-specific settings - processor_maker: Any = field(default=processors.Processor) - context_maker: Any = field(default=context.SQSContext) + processor_maker: Any = processors.Processor + context_maker: Any = context.SQSContext # internal attributes - context: Any = field(default=None) - sqs_client: Any = field(default=None) - sqs_resource: Any = field(default=None) + context: Any = None + sqs_client: Any = None + sqs_resource: Any = None queues: dict[str, AnyQueue] = field(default_factory=dict) def __post_init__(self):