Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions vllm_ascend/distributed/mooncake_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,16 @@ def __init__(self):
# intentionally delayed. Each entry is a tuple of (request_id,
# timestamp). If a request remains in this queue for too long, it will
# be force-freed.
self.record_finished_requests: set[str] = set()
self.delayed_free_requests: deque[Tuple[str, float]] = deque()

def update_done_task_count(self, request_id: str):
with self.done_task_lock:
self.finished_requests.add(request_id)
self._remove_delayed_requests(request_id)
if any(item[0] == request_id for item in self.delayed_free_requests):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The check any(item[0] == request_id for item in self.delayed_free_requests) performs a linear scan over the delayed_free_requests deque. This is an O(N) operation, where N is the number of delayed requests. Since this code is executed under a lock (self.done_task_lock) and can be on a hot path, this linear scan could become a performance bottleneck if the number of delayed requests grows large.

To improve performance, I recommend maintaining a companion set of request IDs that are present in self.delayed_free_requests. This would change the check to an O(1) operation.

For example, you could introduce self.delayed_free_request_ids: set[str] = set() in KVCacheTaskTracker and update it whenever self.delayed_free_requests is modified. Then, this line could be changed to if request_id in self.delayed_free_request_ids:. This would require changes in other methods that modify delayed_free_requests, such as add_delayed_request, _retrieve_expired_requests, and _remove_delayed_requests.

self._remove_delayed_requests(request_id)
else:
self.record_finished_requests.add(request_id)

def get_and_clear_finished_requests(self) -> set[str]:
"""
Expand All @@ -90,7 +94,10 @@ def get_and_clear_finished_requests(self) -> set[str]:
def add_delayed_request(self, request_id: str, delay_start_time: float):
"""Add a delayed free request."""
with self.done_task_lock:
self.delayed_free_requests.append((request_id, delay_start_time))
if request_id not in self.record_finished_requests:
self.delayed_free_requests.append((request_id, delay_start_time))
else:
self.record_finished_requests.discard(request_id)

def _retrieve_expired_requests(self):
"""Retrieve all expired delayed requests."""
Expand Down
Loading