Skip to content

Commit a4f676c

Browse files
yinghaiwuisawesome
authored andcommitted
Add collective_rpc to llm engine (vllm-project#16999)
Signed-off-by: Yinghai Lu <[email protected]>
1 parent 698d162 commit a4f676c

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

vllm/engine/async_llm_engine.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,13 @@ async def add_request_async(
528528
async def check_health_async(self) -> None:
529529
self.model_executor.check_health()
530530

531+
async def collective_rpc_async(self,
532+
method: str,
533+
timeout: Optional[float] = None,
534+
args: tuple = (),
535+
kwargs: Optional[dict] = None):
536+
raise NotImplementedError
537+
531538

532539
async def build_guided_decoding_logits_processor_async(
533540
sampling_params: SamplingParams, tokenizer: AnyTokenizer,
@@ -1236,6 +1243,17 @@ async def is_sleeping(self) -> bool:
12361243
async def add_lora(self, lora_request: LoRARequest) -> None:
12371244
self.engine.add_lora(lora_request)
12381245

1246+
async def collective_rpc(self,
1247+
method: str,
1248+
timeout: Optional[float] = None,
1249+
args: tuple = (),
1250+
kwargs: Optional[dict] = None):
1251+
"""
1252+
Perform a collective RPC call to the given path.
1253+
"""
1254+
return await self.engine.collective_rpc_async(method, timeout, args,
1255+
kwargs)
1256+
12391257

12401258
# TODO(v1): Remove this class proxy when V1 goes default.
12411259
if envs.is_set("VLLM_USE_V1") and envs.VLLM_USE_V1:

vllm/v1/engine/async_llm.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,17 @@ async def pin_lora(self, lora_id: int) -> bool:
492492
"""Prevent an adapter from being evicted."""
493493
return await self.engine_core.pin_lora_async(lora_id)
494494

495+
async def collective_rpc(self,
496+
method: str,
497+
timeout: Optional[float] = None,
498+
args: tuple = (),
499+
kwargs: Optional[dict] = None):
500+
"""
501+
Perform a collective RPC call to the given path.
502+
"""
503+
return await self.engine_core.collective_rpc_async(
504+
method, timeout, args, kwargs)
505+
495506
@property
496507
def is_running(self) -> bool:
497508
# Is None before the loop is started.

0 commit comments

Comments
 (0)