Skip to content

Commit 9dad411

Browse files
committed
update control stream
1 parent d280301 commit 9dad411

File tree

8 files changed

+25
-54
lines changed

8 files changed

+25
-54
lines changed

fastdeploy/worker/gcu_model_runner.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,6 @@ def insert_prefill_inputs(self, req_dicts: List[Request]):
151151
"""
152152
Process inputs for prefill tasks and insert it to share_inputs buffer
153153
"""
154-
if "caches" not in self.share_inputs:
155-
self.initialize_kv_cache()
156154

157155
if req_dicts[-1].disaggregate_info is not None and req_dicts[-1].disaggregate_info["role"] == "prefill":
158156
os.environ["PREFILL_NODE_ONE_STEP_STOP"] = "1"
@@ -1035,11 +1033,11 @@ def update_share_input_block_num(self, num_gpu_blocks: int) -> None:
10351033
Args:
10361034
num_gpu_blocks:
10371035
"""
1036+
self.parallel_config.do_profile = False
10381037
self.num_gcu_blocks = num_gpu_blocks
10391038

10401039
# Reset block table and kv cache with global block num
1041-
if not (self.parallel_config.enable_prefix_caching or self.parallel_config.splitwise_role != "mixed"):
1042-
self.initialize_kv_cache()
1040+
self.initialize_kv_cache()
10431041

10441042
# Reset free list
10451043
free_list = list(
@@ -1057,8 +1055,6 @@ def update_share_input_block_num(self, num_gpu_blocks: int) -> None:
10571055
}
10581056
)
10591057

1060-
self.parallel_config.do_profile = False
1061-
10621058
if self.speculative_method in ["mtp"]:
10631059
self.proposer.update_block_num(num_gpu_blocks)
10641060

fastdeploy/worker/gcu_worker.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ def get_model(self) -> nn.Layer:
9898
""" """
9999
return self.model_runner.get_model()
100100

101-
def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None:
101+
def initialize_cache(self, num_gpu_blocks: int) -> None:
102102
""" """
103-
pass
103+
self.model_runner.update_share_input_block_num(num_gpu_blocks=num_gpu_blocks)
104104

105105
def execute_model(
106106
self,
@@ -134,7 +134,3 @@ def check_health(self) -> bool:
134134
def cal_theortical_kvcache(self) -> int:
135135
""" """
136136
return self.model_runner.cal_theortical_kvcache()
137-
138-
def reinitialize_kv_cache(self, num_gpu_blocks: int) -> None:
139-
""" """
140-
self.model_runner.update_share_input_block_num(num_gpu_blocks=num_gpu_blocks)

fastdeploy/worker/gpu_model_runner.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,6 @@ def insert_prefill_inputs(self, req_dicts: List[Request]):
193193
Process inputs for prefill tasks and insert it to share_inputs buffer
194194
TODO(gongshaotian): Refactor this func
195195
"""
196-
# NOTE(luotingdan): Lazy initialize kv cache
197-
if "caches" not in self.share_inputs:
198-
self.initialize_kv_cache()
199196

200197
# NOTE(luotingdan): Set environment variable of prefill node
201198
if req_dicts[-1].disaggregate_info is not None and req_dicts[-1].disaggregate_info["role"] == "prefill":
@@ -739,7 +736,6 @@ def initialize_kv_cache(self) -> None:
739736

740737
else:
741738
for i in range(self.model_config.num_hidden_layers):
742-
743739
cache_kvs[f"key_caches_{i}"] = paddle.full(
744740
shape=kv_cache_shape,
745741
fill_value=0,
@@ -999,9 +995,6 @@ def capture_model(self) -> None:
999995
time_before_capture = time.perf_counter()
1000996
expected_decode_len = 1
1001997
capture_sizes = self.cudagraph_capture_sizes.copy()
1002-
need_init_cache = "caches" not in self.share_inputs
1003-
if need_init_cache:
1004-
self.initialize_kv_cache()
1005998
for batch_size in sorted(capture_sizes, reverse=True):
1006999
self._dummy_run(
10071000
num_tokens=self.parallel_config.max_num_batched_tokens,
@@ -1010,8 +1003,7 @@ def capture_model(self) -> None:
10101003
expected_decode_len=expected_decode_len,
10111004
)
10121005
logger.info(f"Warm up the model with the batch size:{batch_size}, num tokens:{expected_decode_len}")
1013-
if need_init_cache:
1014-
self.clear_cache()
1006+
10151007
time_after_capture = time.perf_counter()
10161008
logger.info(f"Cuda Graph capturing took {time_after_capture - time_before_capture} seconds")
10171009

@@ -1237,6 +1229,7 @@ def profile_run(self) -> None:
12371229

12381230
if self.speculative_method in ["mtp"]:
12391231
self.proposer.clear_dummy_input()
1232+
self.parallel_config.do_profile = False
12401233

12411234
def update_share_input_block_num(self, num_gpu_blocks: int) -> None:
12421235
"""
@@ -1247,8 +1240,7 @@ def update_share_input_block_num(self, num_gpu_blocks: int) -> None:
12471240
self.num_gpu_blocks = num_gpu_blocks
12481241

12491242
# Reset block table and kv cache with global block num
1250-
if not (self.parallel_config.enable_prefix_caching or self.parallel_config.splitwise_role != "mixed"):
1251-
self.initialize_kv_cache()
1243+
self.initialize_kv_cache()
12521244

12531245
# Reset free list
12541246
free_list = list(
@@ -1266,8 +1258,6 @@ def update_share_input_block_num(self, num_gpu_blocks: int) -> None:
12661258
}
12671259
)
12681260

1269-
self.parallel_config.do_profile = False
1270-
12711261
if self.speculative_method in ["mtp"]:
12721262
self.proposer.update_block_num(num_gpu_blocks)
12731263

fastdeploy/worker/gpu_worker.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,10 @@ def get_model(self) -> nn.Layer:
165165
"""Get current model"""
166166
return self.model_runner.get_model()
167167

168-
def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None:
169-
"""Initizlize the KV Cache"""
170-
pass
168+
def initialize_cache(self, num_gpu_blocks: int) -> None:
169+
"""Initizlize the KV Cache with accurate num_gpu_blocks"""
170+
# accurate cache size
171+
self.model_runner.update_share_input_block_num(num_gpu_blocks=num_gpu_blocks)
171172

172173
def execute_model(
173174
self,
@@ -198,7 +199,3 @@ def check_health(self) -> bool:
198199
def cal_theortical_kvcache(self) -> int:
199200
"""Calculate the block memory required"""
200201
return self.model_runner.cal_theortical_kvcache()
201-
202-
def reinitialize_kv_cache(self, num_gpu_blocks: int) -> None:
203-
"""Reinitialize the kv cache using the parameters from the profile"""
204-
self.model_runner.update_share_input_block_num(num_gpu_blocks=num_gpu_blocks)

fastdeploy/worker/iluvatar_model_runner.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,6 @@ def insert_prefill_inputs(self, req_dicts: List[Request]):
141141
Process inputs for prefill tasks and insert it to share_inputs buffer
142142
TODO(gongshaotian): Refactor this func
143143
"""
144-
# NOTE(luotingdan): Lazy initialize kv cache
145-
if "caches" not in self.share_inputs:
146-
self.initialize_kv_cache()
147144

148145
# NOTE(luotingdan): Set environment variable of prefill node
149146
if req_dicts[-1].disaggregate_info is not None and req_dicts[-1].disaggregate_info["role"] == "prefill":
@@ -1013,11 +1010,11 @@ def update_share_input_block_num(self, num_gpu_blocks: int) -> None:
10131010
Args:
10141011
num_gpu_blocks:
10151012
"""
1013+
self.parallel_config.do_profile = False
10161014
self.num_gpu_blocks = num_gpu_blocks
10171015

10181016
# Reset block table and kv cache with global block num
1019-
if not (self.parallel_config.enable_prefix_caching or self.parallel_config.splitwise_role != "mixed"):
1020-
self.initialize_kv_cache()
1017+
self.initialize_kv_cache()
10211018

10221019
# Reset free list
10231020
free_list = list(
@@ -1035,8 +1032,6 @@ def update_share_input_block_num(self, num_gpu_blocks: int) -> None:
10351032
}
10361033
)
10371034

1038-
self.parallel_config.do_profile = False
1039-
10401035
def cal_theortical_kvcache(self):
10411036
"""
10421037
Calculate the total block memory required at the model level

fastdeploy/worker/iluvatar_worker.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ def get_model(self) -> nn.Layer:
9999
""" """
100100
return self.model_runner.get_model()
101101

102-
def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None:
102+
def initialize_cache(self, num_gpu_blocks: int) -> None:
103103
""" """
104-
pass
104+
self.model_runner.update_share_input_block_num(num_gpu_blocks=num_gpu_blocks)
105105

106106
def execute_model(
107107
self,
@@ -135,7 +135,3 @@ def check_health(self) -> bool:
135135
def cal_theortical_kvcache(self) -> int:
136136
""" """
137137
return self.model_runner.cal_theortical_kvcache()
138-
139-
def reinitialize_kv_cache(self, num_gpu_blocks: int) -> None:
140-
""" """
141-
self.model_runner.update_share_input_block_num(num_gpu_blocks=num_gpu_blocks)

fastdeploy/worker/worker_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def init_device(self) -> None:
6464
raise NotImplementedError
6565

6666
@abstractmethod
67-
def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None:
67+
def initialize_cache(self, num_gpu_blocks: int) -> None:
6868
"""Initizlize the KV Cache with the given size in blocks."""
6969
raise NotImplementedError
7070

fastdeploy/worker/worker_process.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ def event_loop_normal(self) -> None:
347347

348348
self.exist_prefill_task_signal.value[0] = self.worker.prefill_finished()
349349

350-
def determine_num_available_blocks(self) -> None:
350+
def initialize_kv_cache(self) -> None:
351351
"""Profiles the peak memory usage of the model to determine how many
352352
KV blocks may be allocated without OOMs.
353353
@@ -403,10 +403,7 @@ def determine_num_available_blocks(self) -> None:
403403
# logger.info will write in worker_process.log
404404
# Need `print` to triger engine->check_worker_initialize_status->detect_thread
405405
print(f"------- num_blocks_global: {num_blocks_local} --------")
406-
# 4. Updata share inputs
407-
self.worker.reinitialize_kv_cache(num_gpu_blocks=num_blocks_local)
408-
409-
def graph_optimize_and_warm_up_model(self) -> None:
406+
# wait engine launch cache_manager
410407
if self.parallel_config.enable_prefix_caching or self.parallel_config.splitwise_role != "mixed":
411408
launched_cache_manager_signal_data = np.zeros([1], dtype=np.int32)
412409
self.launched_cache_manager_signal = IPCSignal(
@@ -418,6 +415,10 @@ def graph_optimize_and_warm_up_model(self) -> None:
418415
)
419416
while np.any(self.launched_cache_manager_signal.value[0] <= 0):
420417
time.sleep(0.01)
418+
# 4. init kv_cache with accurate num_blocks
419+
self.worker.initialize_cache(num_gpu_blocks=num_blocks_local)
420+
421+
def graph_optimize_and_warm_up_model(self) -> None:
421422
self.worker.graph_optimize_and_warm_up_model()
422423

423424
def init_device(self) -> None:
@@ -731,11 +732,11 @@ def run_worker_proc() -> None:
731732

732733
# Load model
733734
worker_proc.load_model()
734-
logger.info("determine_num_available_blocks")
735-
worker_proc.determine_num_available_blocks()
735+
# logger.info("determine_num_available_blocks")
736+
worker_proc.initialize_kv_cache()
736737

737738
# Trigger CUDAGraph capture
738-
worker_proc.graph_optimize_and_warm_up_model()
739+
worker_proc.worker.graph_optimize_and_warm_up_model()
739740

740741
# Initialize health status
741742
worker_proc.init_health_status()

0 commit comments

Comments
 (0)