From 5cf14673aeef95ca435d704965fb09322390bd2c Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Mon, 6 Jan 2025 19:09:23 -0800 Subject: [PATCH 1/6] prototype tools for handling prod issues --- backend/scripts/celery_purge_queue.py | 83 ++++++++++++++++++++++++++ backend/scripts/onyx_redis.py | 84 +++++++++++++++++++++++++++ 2 files changed, 167 insertions(+) create mode 100644 backend/scripts/celery_purge_queue.py create mode 100644 backend/scripts/onyx_redis.py diff --git a/backend/scripts/celery_purge_queue.py b/backend/scripts/celery_purge_queue.py new file mode 100644 index 00000000000..a12a90fda7d --- /dev/null +++ b/backend/scripts/celery_purge_queue.py @@ -0,0 +1,83 @@ +# Tool to run operations on Celery/Redis in production +# this is a work in progress and isn't completely put together yet +# but can serve as a stub for future operations +import argparse +import logging +from logging import getLogger + +from redis import Redis + +from onyx.background.celery.celery_redis import celery_get_queue_length +from onyx.configs.app_configs import REDIS_DB_NUMBER_CELERY +from onyx.redis.redis_pool import RedisPool + +# Configure the logger +logging.basicConfig( + level=logging.INFO, # Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", # Log format + handlers=[logging.StreamHandler()], # Output logs to console +) + +logger = getLogger(__name__) + +REDIS_PASSWORD = "" + + +def celery_purge_queue(queue: str, tenant_id: str) -> None: + """Purging a celery queue is extremely difficult because the queue is a list + and the only way an item can be removed from a list is by VALUE, which is + a linear scan. Therefore, to purge the list of many values is roughly + n^2.""" + + pool = RedisPool.create_pool( + host="127.0.0.1", + port=6380, + db=REDIS_DB_NUMBER_CELERY, + password=REDIS_PASSWORD, + ssl=True, + ssl_cert_reqs="optional", + ssl_ca_certs=None, + ) + + r = Redis(connection_pool=pool) + + length = celery_get_queue_length(queue, r) + + logger.info(f"queue={queue} length={length}") + + # processed = 0 + # deleted = 0 + # for i in range(len(OnyxCeleryPriority)): + # queue_name = queue + # if i > 0: + # queue_name += CELERY_SEPARATOR + # queue_name += str(i) + + # length = r.llen(queue_name) + # for i in range(length): + # task_raw: bytes | None = r.lindex(queue_name, i) + # if not task_raw: + # break + + # processed += 1 + # task_str = task_raw.decode("utf-8") + # task = json.loads(task_str) + # task_kwargs_str = task["headers"]["kwargsrepr"] + # task_kwargs = json.loads(task_kwargs_str) + # task_tenant_id = task_kwargs["tenant_id"] + # if task_tenant_id and task_tenant_id == "tenant_id": + # print("Delete tenant_id={tenant_id}") + # if + # deleted += 1 + + # logger.info(f"processed={processed} deleted={deleted}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Purge celery queue by tenant id") + parser.add_argument("--queue", type=str, help="Queue to purge", required=True) + + parser.add_argument("--tenant", type=str, help="Tenant ID to purge", required=True) + + args = parser.parse_args() + celery_purge_queue(queue=args.queue, tenant_id=args.tenant) diff --git a/backend/scripts/onyx_redis.py b/backend/scripts/onyx_redis.py new file mode 100644 index 00000000000..bd203d84a52 --- /dev/null +++ b/backend/scripts/onyx_redis.py @@ -0,0 +1,84 @@ +# Tool to run helpful operations on Redis in production +import argparse +import logging +import sys +import time +from logging import getLogger +from typing import cast + +from redis import Redis + +from onyx.redis.redis_pool import RedisPool + +# Configure the logger +logging.basicConfig( + level=logging.INFO, # Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", # Log format + handlers=[logging.StreamHandler()], # Output logs to console +) + +logger = getLogger(__name__) + +REDIS_PASSWORD = "" + + +def onyx_redis(command: str) -> int: + pool = RedisPool.create_pool( + host="127.0.0.1", + port=6380, + password=REDIS_PASSWORD, + ssl=True, + ssl_cert_reqs="optional", + ssl_ca_certs=None, + ) + + r = Redis(connection_pool=pool) + + if command == "purge_connectorsync": + """Purge connector tasksets. Used when the tasks represented in the tasksets + have been purged.""" + return purge_by_match_and_type("*connectorsync_taskset*", "set", r) + + return 255 + + +def purge_by_match_and_type(match_pattern: str, match_type: str, r: Redis) -> int: + """match_pattern: glob style expression + match_type: https://redis.io/docs/latest/commands/type/ + """ + + # cursor = "0" + # while cursor != 0: + # cursor, data = self.scan( + # cursor=cursor, match=match, count=count, _type=_type, **kwargs + # ) + + start = time.monotonic() + + count = 0 + for key in r.scan_iter(match_pattern, count=10000): + key_type = r.type(key) + if key_type != match_type.encode("utf-8"): + continue + + key = cast(bytes, key) + key_str = key.decode("utf-8") + + count += 1 + logger.info(f"Deleting item {count}: {key_str}") + r.delete(key) + + logger.info(f"Found {count} matches.") + + elapsed = time.monotonic() - start + logger.info(f"Time elapsed: {elapsed:.2f}s") + return 0 + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Onyx Redis Tools") + parser.add_argument("--command", type=str, help="Operation to run", required=True) + + args = parser.parse_args() + exitcode = onyx_redis(command=args.command) + sys.exit(exitcode) From 01459e60a0ad2de3a8569b23490362ce2bdf9db6 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Tue, 7 Jan 2025 09:44:23 -0800 Subject: [PATCH 2/6] add some commands --- backend/scripts/onyx_redis.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/backend/scripts/onyx_redis.py b/backend/scripts/onyx_redis.py index bd203d84a52..79e44b1c820 100644 --- a/backend/scripts/onyx_redis.py +++ b/backend/scripts/onyx_redis.py @@ -34,10 +34,16 @@ def onyx_redis(command: str) -> int: r = Redis(connection_pool=pool) - if command == "purge_connectorsync": + if command == "purge_connectorsync_taskset": """Purge connector tasksets. Used when the tasks represented in the tasksets have been purged.""" return purge_by_match_and_type("*connectorsync_taskset*", "set", r) + elif command == "purge_documentset_taskset": + return purge_by_match_and_type("*documentset_taskset*", "set", r) + elif command == "purge_usergroup_taskset": + return purge_by_match_and_type("*usergroup_taskset*", "set", r) + else: + pass return 255 From 15f7a9c3245c2a5b29311dc25a4764a3d5927d14 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Tue, 7 Jan 2025 11:59:10 -0800 Subject: [PATCH 3/6] add batching and dry run options --- backend/scripts/onyx_redis.py | 68 ++++++++++++++++++++++++++++------- 1 file changed, 56 insertions(+), 12 deletions(-) diff --git a/backend/scripts/onyx_redis.py b/backend/scripts/onyx_redis.py index 79e44b1c820..c9a9bb655a3 100644 --- a/backend/scripts/onyx_redis.py +++ b/backend/scripts/onyx_redis.py @@ -22,7 +22,7 @@ REDIS_PASSWORD = "" -def onyx_redis(command: str) -> int: +def onyx_redis(command: str, batch: int, dry_run: bool) -> int: pool = RedisPool.create_pool( host="127.0.0.1", port=6380, @@ -37,18 +37,28 @@ def onyx_redis(command: str) -> int: if command == "purge_connectorsync_taskset": """Purge connector tasksets. Used when the tasks represented in the tasksets have been purged.""" - return purge_by_match_and_type("*connectorsync_taskset*", "set", r) + return purge_by_match_and_type( + "*connectorsync_taskset*", "set", batch, dry_run, r + ) elif command == "purge_documentset_taskset": - return purge_by_match_and_type("*documentset_taskset*", "set", r) + return purge_by_match_and_type( + "*documentset_taskset*", "set", batch, dry_run, r + ) elif command == "purge_usergroup_taskset": - return purge_by_match_and_type("*usergroup_taskset*", "set", r) + return purge_by_match_and_type("*usergroup_taskset*", "set", batch, dry_run, r) + elif command == "purge_vespa_syncing": + return purge_by_match_and_type( + "*connectorsync:vespa_syncing*", "string", batch, dry_run, r + ) else: pass return 255 -def purge_by_match_and_type(match_pattern: str, match_type: str, r: Redis) -> int: +def purge_by_match_and_type( + match_pattern: str, match_type: str, batch_size: int, dry_run: bool, r: Redis +) -> int: """match_pattern: glob style expression match_type: https://redis.io/docs/latest/commands/type/ """ @@ -62,17 +72,38 @@ def purge_by_match_and_type(match_pattern: str, match_type: str, r: Redis) -> in start = time.monotonic() count = 0 - for key in r.scan_iter(match_pattern, count=10000): - key_type = r.type(key) - if key_type != match_type.encode("utf-8"): - continue + batch_keys: list[bytes] = [] + for key in r.scan_iter(match_pattern, count=10000, _type=match_type): + # key_type = r.type(key) + # if key_type != match_type.encode("utf-8"): + # continue key = cast(bytes, key) key_str = key.decode("utf-8") count += 1 + if dry_run: + logger.info(f"(DRY-RUN) Deleting item {count}: {key_str}") + continue + logger.info(f"Deleting item {count}: {key_str}") - r.delete(key) + + batch_keys.append(key) + if len(batch_keys) >= batch_size: + logger.info(f"Flushing {len(batch_keys)} operations to Redis.") + with r.pipeline() as pipe: + for batch_key in batch_keys: + pipe.delete(batch_key) + pipe.execute() + batch_keys.clear() + + if len(batch_keys) >= batch_size: + logger.info(f"Flushing {len(batch_keys)} operations to Redis.") + with r.pipeline() as pipe: + for batch_key in batch_keys: + pipe.delete(batch_key) + pipe.execute() + batch_keys.clear() logger.info(f"Found {count} matches.") @@ -82,9 +113,22 @@ def purge_by_match_and_type(match_pattern: str, match_type: str, r: Redis) -> in if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Onyx Redis Tools") + parser = argparse.ArgumentParser(description="Onyx Redis Manager") parser.add_argument("--command", type=str, help="Operation to run", required=True) + parser.add_argument( + "--batch", + type=int, + default=1000, + help="Size of operation batches to send to Redis", + required=False, + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Perform a dry run without actually executing modifications", + required=False, + ) args = parser.parse_args() - exitcode = onyx_redis(command=args.command) + exitcode = onyx_redis(command=args.command, batch=args.batch, dry_run=args.dry_run) sys.exit(exitcode) From 75154094527773f9f1b4008bebed1c9b3b946bb8 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 9 Jan 2025 11:30:16 -0800 Subject: [PATCH 4/6] custom redis tool --- backend/scripts/onyx_redis.py | 82 +++++++++++++++++++++++++++-------- 1 file changed, 63 insertions(+), 19 deletions(-) diff --git a/backend/scripts/onyx_redis.py b/backend/scripts/onyx_redis.py index c9a9bb655a3..1beab70025f 100644 --- a/backend/scripts/onyx_redis.py +++ b/backend/scripts/onyx_redis.py @@ -1,4 +1,6 @@ # Tool to run helpful operations on Redis in production +# This is targeted for internal usage and may not have all the necessary parameters +# for general usage across custom deployments import argparse import logging import sys @@ -19,14 +21,17 @@ logger = getLogger(__name__) -REDIS_PASSWORD = "" +SCAN_ITER_COUNT = 10000 +BATCH_DEFAULT = 1000 -def onyx_redis(command: str, batch: int, dry_run: bool) -> int: +def onyx_redis( + command: str, batch: int, dry_run: bool, host: str, port: int, password: str | None +) -> int: pool = RedisPool.create_pool( - host="127.0.0.1", - port=6380, - password=REDIS_PASSWORD, + host=host, + port=port, + password=password if password else "", ssl=True, ssl_cert_reqs="optional", ssl_ca_certs=None, @@ -34,6 +39,12 @@ def onyx_redis(command: str, batch: int, dry_run: bool) -> int: r = Redis(connection_pool=pool) + try: + r.ping() + except: + logger.exception("Redis ping exceptioned") + raise + if command == "purge_connectorsync_taskset": """Purge connector tasksets. Used when the tasks represented in the tasksets have been purged.""" @@ -56,6 +67,14 @@ def onyx_redis(command: str, batch: int, dry_run: bool) -> int: return 255 +def flush_batch_delete(batch_keys: list[bytes], r: Redis) -> None: + logger.info(f"Flushing {len(batch_keys)} operations to Redis.") + with r.pipeline() as pipe: + for batch_key in batch_keys: + pipe.delete(batch_key) + pipe.execute() + + def purge_by_match_and_type( match_pattern: str, match_type: str, batch_size: int, dry_run: bool, r: Redis ) -> int: @@ -73,7 +92,7 @@ def purge_by_match_and_type( count = 0 batch_keys: list[bytes] = [] - for key in r.scan_iter(match_pattern, count=10000, _type=match_type): + for key in r.scan_iter(match_pattern, count=SCAN_ITER_COUNT, _type=match_type): # key_type = r.type(key) # if key_type != match_type.encode("utf-8"): # continue @@ -90,22 +109,14 @@ def purge_by_match_and_type( batch_keys.append(key) if len(batch_keys) >= batch_size: - logger.info(f"Flushing {len(batch_keys)} operations to Redis.") - with r.pipeline() as pipe: - for batch_key in batch_keys: - pipe.delete(batch_key) - pipe.execute() + flush_batch_delete(batch_keys, r) batch_keys.clear() if len(batch_keys) >= batch_size: - logger.info(f"Flushing {len(batch_keys)} operations to Redis.") - with r.pipeline() as pipe: - for batch_key in batch_keys: - pipe.delete(batch_key) - pipe.execute() + flush_batch_delete(batch_keys, r) batch_keys.clear() - logger.info(f"Found {count} matches.") + logger.info(f"Deleted {count} matches.") elapsed = time.monotonic() - start logger.info(f"Time elapsed: {elapsed:.2f}s") @@ -115,13 +126,39 @@ def purge_by_match_and_type( if __name__ == "__main__": parser = argparse.ArgumentParser(description="Onyx Redis Manager") parser.add_argument("--command", type=str, help="Operation to run", required=True) + + parser.add_argument( + "--host", + type=str, + default="127.0.0.1", + help="The redis host", + required=False, + ) + + parser.add_argument( + "--port", + type=int, + default="6380", + help="The redis port", + required=False, + ) + + parser.add_argument( + "--password", + type=str, + default=None, + help="The redis password", + required=False, + ) + parser.add_argument( "--batch", type=int, - default=1000, + default=BATCH_DEFAULT, help="Size of operation batches to send to Redis", required=False, ) + parser.add_argument( "--dry-run", action="store_true", @@ -130,5 +167,12 @@ def purge_by_match_and_type( ) args = parser.parse_args() - exitcode = onyx_redis(command=args.command, batch=args.batch, dry_run=args.dry_run) + exitcode = onyx_redis( + command=args.command, + batch=args.batch, + dry_run=args.dry_run, + host=args.host, + port=args.port, + password=args.password, + ) sys.exit(exitcode) From 641d4670e04d984b3cd10fa4bf235ce7381a4b6d Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 9 Jan 2025 11:33:46 -0800 Subject: [PATCH 5/6] comment --- backend/scripts/celery_purge_queue.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/backend/scripts/celery_purge_queue.py b/backend/scripts/celery_purge_queue.py index a12a90fda7d..cbaed2de4fe 100644 --- a/backend/scripts/celery_purge_queue.py +++ b/backend/scripts/celery_purge_queue.py @@ -27,7 +27,11 @@ def celery_purge_queue(queue: str, tenant_id: str) -> None: """Purging a celery queue is extremely difficult because the queue is a list and the only way an item can be removed from a list is by VALUE, which is a linear scan. Therefore, to purge the list of many values is roughly - n^2.""" + n^2. + + The other alternative is to pop values and push them back, but that raises + questions about behavior while operating on a live queue. + """ pool = RedisPool.create_pool( host="127.0.0.1", From 326b64a2085660eb89313d49c77c65e658e9f0f1 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 9 Jan 2025 11:45:48 -0800 Subject: [PATCH 6/6] default to app config settings for redis --- backend/scripts/onyx_redis.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/backend/scripts/onyx_redis.py b/backend/scripts/onyx_redis.py index 1beab70025f..c7eb7fbef5c 100644 --- a/backend/scripts/onyx_redis.py +++ b/backend/scripts/onyx_redis.py @@ -10,6 +10,10 @@ from redis import Redis +from onyx.configs.app_configs import REDIS_DB_NUMBER +from onyx.configs.app_configs import REDIS_HOST +from onyx.configs.app_configs import REDIS_PASSWORD +from onyx.configs.app_configs import REDIS_PORT from onyx.redis.redis_pool import RedisPool # Configure the logger @@ -26,11 +30,18 @@ def onyx_redis( - command: str, batch: int, dry_run: bool, host: str, port: int, password: str | None + command: str, + batch: int, + dry_run: bool, + host: str, + port: int, + db: int, + password: str | None, ) -> int: pool = RedisPool.create_pool( host=host, port=port, + db=db, password=password if password else "", ssl=True, ssl_cert_reqs="optional", @@ -130,7 +141,7 @@ def purge_by_match_and_type( parser.add_argument( "--host", type=str, - default="127.0.0.1", + default=REDIS_HOST, help="The redis host", required=False, ) @@ -138,15 +149,23 @@ def purge_by_match_and_type( parser.add_argument( "--port", type=int, - default="6380", + default=REDIS_PORT, help="The redis port", required=False, ) + parser.add_argument( + "--db", + type=int, + default=REDIS_DB_NUMBER, + help="The redis db", + required=False, + ) + parser.add_argument( "--password", type=str, - default=None, + default=REDIS_PASSWORD, help="The redis password", required=False, ) @@ -173,6 +192,7 @@ def purge_by_match_and_type( dry_run=args.dry_run, host=args.host, port=args.port, + db=args.db, password=args.password, ) sys.exit(exitcode)