@@ -67,12 +67,16 @@ def __init__(self):
67
67
# intentionally delayed. Each entry is a tuple of (request_id,
68
68
# timestamp). If a request remains in this queue for too long, it will
69
69
# be force-freed.
70
+ self .record_finished_requests : set [str ] = set ()
70
71
self .delayed_free_requests : deque [Tuple [str , float ]] = deque ()
71
72
72
73
def update_done_task_count (self , request_id : str ):
73
74
with self .done_task_lock :
74
75
self .finished_requests .add (request_id )
75
- self ._remove_delayed_requests (request_id )
76
+ if any (item [0 ] == request_id for item in self .delayed_free_requests ):
77
+ self ._remove_delayed_requests (request_id )
78
+ else :
79
+ self .record_finished_requests .add (request_id )
76
80
77
81
def get_and_clear_finished_requests (self ) -> set [str ]:
78
82
"""
@@ -90,7 +94,10 @@ def get_and_clear_finished_requests(self) -> set[str]:
90
94
def add_delayed_request (self , request_id : str , delay_start_time : float ):
91
95
"""Add a delayed free request."""
92
96
with self .done_task_lock :
93
- self .delayed_free_requests .append ((request_id , delay_start_time ))
97
+ if request_id not in self .record_finished_requests :
98
+ self .delayed_free_requests .append ((request_id , delay_start_time ))
99
+ else :
100
+ self .record_finished_requests .discard (request_id )
94
101
95
102
def _retrieve_expired_requests (self ):
96
103
"""Retrieve all expired delayed requests."""
0 commit comments