diff --git a/vllm_ascend/models/deepseek_v2.py b/vllm_ascend/models/deepseek_v2.py index bfa86f0ee2..2d48f493a1 100644 --- a/vllm_ascend/models/deepseek_v2.py +++ b/vllm_ascend/models/deepseek_v2.py @@ -68,6 +68,7 @@ make_empty_intermediate_tensors_factory, make_layers, maybe_prefix) from vllm.sequence import IntermediateTensors +import vllm_ascend.envs as envs_ascend from vllm_ascend.ascend_config import get_ascend_config from vllm_ascend.distributed.parallel_state import get_ep_group from vllm_ascend.ops.fused_moe import AscendFusedMoE @@ -407,8 +408,12 @@ def forward(self, experts_hidden_states[0] * self.routed_scaling_factor + experts_hidden_states[1]) if self.all_reduce_merge: - # When all_reduce_merge is in progress, shared_experts does not do all_reduce in mlp, but waits until shared_experts+router_experts are completed before doing all_reduce - hidden_states = tensor_model_parallel_all_reduce(hidden_states) + if envs_ascend.VLLM_ENABLE_FUSED_EXPERTS_ALLGATHER_EP and not is_prefill: + # Prefill uses the AllGatherEP solution (using the VLLM_ENABLE_FUSED_EXPERTS_ALLGATHER_EP switch), and Decode uses the MC2 solution. + ... + else: + # When all_reduce_merge is in progress, shared_experts does not do all_reduce in mlp, but waits until shared_experts+router_experts are completed before doing all_reduce + hidden_states = tensor_model_parallel_all_reduce(hidden_states) return hidden_states diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 1221d8984d..1b0368a3a5 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -1418,6 +1418,14 @@ def forward(self, final_hidden_states = tensor_model_parallel_all_reduce( final_hidden_states) + if tp_size > 1 and envs_ascend.VLLM_ENABLE_FUSED_EXPERTS_ALLGATHER_EP and self.all_reduce_merge and fused_moe_state in [ + FusedMoEState.MC2 + ]: + # Prefill uses the AllGatherEP solution (using the VLLM_ENABLE_FUSED_EXPERTS_ALLGATHER_EP switch), and Decode uses the MC2 solution. + # This solution uses the all_reduce_merge optimization in Prefill, but does not use the all_reduce_merge optimization in the decode part. + shared_hidden_states = tensor_model_parallel_all_reduce( + shared_hidden_states) + if shared_experts: return final_hidden_states, shared_hidden_states else: diff --git a/vllm_ascend/utils.py b/vllm_ascend/utils.py index 634e13cb9e..7beee243f1 100644 --- a/vllm_ascend/utils.py +++ b/vllm_ascend/utils.py @@ -458,6 +458,10 @@ def get_rm_router_logits_state(ep_size: int, dp_size: int, # TODO(ttanzhiqiang): all_reduce merge # When all_reduce_merge is in progress, shared_experts does not do all_reduce in mlp, but waits until shared_experts+router_experts are completed before doing all_reduce # Currently, all_reduce_merge is enabled by default in the AllGather, AllGatherEP and NaiveMulticast scenarios of the deepseek model. +# 1. If Prefill/decode use AllGather or NaiveMulticast solution at the same time, this logic is normal, and this solution is used for optimization +# 2. If Prefill/decode use All2All/MC2 solution at the same time, this logic is also normal, and this solution is not used for optimization +# 3. Prefill uses AllGatherEP solution (use VLLM_ENABLE_FUSED_EXPERTS_ALLGATHER_EP switch), and Decode uses MC2 solution. (Prefill can be merged/Prefill and Decode strategies are different and cannot be merged) +# 4. In the PD separation scenario, the strategies used by P and D are separate, so there will be no impact. def get_all_reduce_merge_state(ep_size: int, is_deepseek_v3_r1: bool): # the fusion operator torch_npu.npu_grouped_matmul_finalize_routing called by allgather ep # only supports deepseek v3/r1