From e64e1126b535fe9872a0c7eb7a0736ecff98504c Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Mon, 1 Dec 2025 11:10:01 +0000 Subject: [PATCH 01/14] [Refactor] [1/N] to simplify the vLLM serving architecture Signed-off-by: chaunceyjiang --- vllm/entrypoints/dynamic_lora.py | 57 --- vllm/entrypoints/openai/api_server.py | 351 +----------------- vllm/entrypoints/pooling/__init__.py | 8 +- vllm/entrypoints/sagemaker/routes.py | 2 +- vllm/entrypoints/serve/__init__.py | 69 ++++ vllm/entrypoints/serve/elastic_ep/__init__.py | 0 .../serve/elastic_ep/api_router.py | 88 +++++ .../serve/instrumentator/__init__.py | 0 .../serve/instrumentator/health.py | 29 ++ .../serve/instrumentator/metrics.py | 42 +++ vllm/entrypoints/serve/lora/__init__.py | 0 vllm/entrypoints/serve/lora/api_router.py | 56 +++ vllm/entrypoints/serve/profile/__init__.py | 0 vllm/entrypoints/serve/profile/api_router.py | 34 ++ vllm/entrypoints/serve/rl/__init__.py | 0 vllm/entrypoints/serve/rl/api_router.py | 98 +++++ vllm/entrypoints/serve/sleep/__init__.py | 0 vllm/entrypoints/serve/sleep/api_router.py | 48 +++ vllm/entrypoints/serve/tokenize/__init__.py | 0 vllm/entrypoints/serve/tokenize/api_router.py | 111 ++++++ .../tokenize/protocol.py} | 0 21 files changed, 585 insertions(+), 408 deletions(-) delete mode 100644 vllm/entrypoints/dynamic_lora.py create mode 100644 vllm/entrypoints/serve/__init__.py create mode 100644 vllm/entrypoints/serve/elastic_ep/__init__.py create mode 100644 vllm/entrypoints/serve/elastic_ep/api_router.py create mode 100644 vllm/entrypoints/serve/instrumentator/__init__.py create mode 100644 vllm/entrypoints/serve/instrumentator/health.py create mode 100644 vllm/entrypoints/serve/instrumentator/metrics.py create mode 100644 vllm/entrypoints/serve/lora/__init__.py create mode 100644 vllm/entrypoints/serve/lora/api_router.py create mode 100644 vllm/entrypoints/serve/profile/__init__.py create mode 100644 vllm/entrypoints/serve/profile/api_router.py create mode 100644 vllm/entrypoints/serve/rl/__init__.py create mode 100644 vllm/entrypoints/serve/rl/api_router.py create mode 100644 vllm/entrypoints/serve/sleep/__init__.py create mode 100644 vllm/entrypoints/serve/sleep/api_router.py create mode 100644 vllm/entrypoints/serve/tokenize/__init__.py create mode 100644 vllm/entrypoints/serve/tokenize/api_router.py rename vllm/entrypoints/{openai/serving_tokenization.py => serve/tokenize/protocol.py} (100%) diff --git a/vllm/entrypoints/dynamic_lora.py b/vllm/entrypoints/dynamic_lora.py deleted file mode 100644 index cc0f437e5c77..000000000000 --- a/vllm/entrypoints/dynamic_lora.py +++ /dev/null @@ -1,57 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project -import model_hosting_container_standards.sagemaker as sagemaker_standards -from fastapi import APIRouter, Depends, Request -from fastapi.responses import JSONResponse, Response - -from vllm.entrypoints.openai.api_server import models, validate_json_request -from vllm.entrypoints.openai.protocol import ( - ErrorResponse, - LoadLoRAAdapterRequest, - UnloadLoRAAdapterRequest, -) -from vllm.entrypoints.openai.serving_models import OpenAIServingModels -from vllm.logger import init_logger - -logger = init_logger(__name__) - - -def register_dynamic_lora_routes(router: APIRouter): - @sagemaker_standards.register_load_adapter_handler( - request_shape={ - "lora_name": "body.name", - "lora_path": "body.src", - }, - ) - @router.post("/v1/load_lora_adapter", dependencies=[Depends(validate_json_request)]) - async def load_lora_adapter(request: LoadLoRAAdapterRequest, raw_request: Request): - handler: OpenAIServingModels = models(raw_request) - response = await handler.load_lora_adapter(request) - if isinstance(response, ErrorResponse): - return JSONResponse( - content=response.model_dump(), status_code=response.error.code - ) - - return Response(status_code=200, content=response) - - @sagemaker_standards.register_unload_adapter_handler( - request_shape={ - "lora_name": "path_params.adapter_name", - } - ) - @router.post( - "/v1/unload_lora_adapter", dependencies=[Depends(validate_json_request)] - ) - async def unload_lora_adapter( - request: UnloadLoRAAdapterRequest, raw_request: Request - ): - handler: OpenAIServingModels = models(raw_request) - response = await handler.unload_lora_adapter(request) - if isinstance(response, ErrorResponse): - return JSONResponse( - content=response.model_dump(), status_code=response.error.code - ) - - return Response(status_code=200, content=response) - - return router diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index cdc316b65ba7..e86623f66cc1 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -20,21 +20,15 @@ from typing import Annotated, Any, Literal import model_hosting_container_standards.sagemaker as sagemaker_standards -import prometheus_client import pydantic -import regex as re import uvloop from fastapi import APIRouter, Depends, FastAPI, Form, HTTPException, Query, Request from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, Response, StreamingResponse -from prometheus_client import make_asgi_app -from prometheus_fastapi_instrumentator import Instrumentator from starlette.concurrency import iterate_in_threadpool from starlette.datastructures import URL, Headers, MutableHeaders, State -from starlette.routing import Mount from starlette.types import ASGIApp, Message, Receive, Scope, Send -from typing_extensions import assert_never import vllm.envs as envs from vllm.config import VllmConfig @@ -56,8 +50,6 @@ ChatCompletionResponse, CompletionRequest, CompletionResponse, - DetokenizeRequest, - DetokenizeResponse, ErrorInfo, ErrorResponse, GenerateRequest, @@ -65,8 +57,6 @@ ResponsesRequest, ResponsesResponse, StreamingResponsesResponse, - TokenizeRequest, - TokenizeResponse, TranscriptionRequest, TranscriptionResponseVariant, TranslationRequest, @@ -80,7 +70,6 @@ OpenAIServingModels, ) from vllm.entrypoints.openai.serving_responses import OpenAIServingResponses -from vllm.entrypoints.openai.serving_tokenization import OpenAIServingTokenization from vllm.entrypoints.openai.serving_tokens import ServingTokens from vllm.entrypoints.openai.serving_transcription import ( OpenAIServingTranscription, @@ -92,6 +81,7 @@ from vllm.entrypoints.pooling.embed.serving import OpenAIServingEmbedding from vllm.entrypoints.pooling.pooling.serving import OpenAIServingPooling from vllm.entrypoints.pooling.score.serving import ServingScores +from vllm.entrypoints.serve.tokenize.protocol import OpenAIServingTokenization from vllm.entrypoints.tool_server import DemoToolServer, MCPToolServer, ToolServer from vllm.entrypoints.utils import ( cli_env_setup, @@ -109,8 +99,6 @@ from vllm.utils.gc_utils import freeze_gc_heap from vllm.utils.network_utils import is_valid_ipv6_address from vllm.utils.system_utils import decorate_logs, set_ulimit -from vllm.v1.engine.exceptions import EngineDeadError -from vllm.v1.metrics.prometheus import get_prometheus_registry from vllm.version import __version__ as VLLM_VERSION prometheus_multiproc_dir: tempfile.TemporaryDirectory @@ -245,39 +233,6 @@ async def build_async_engine_client_from_engine_args( router = APIRouter() -class PrometheusResponse(Response): - media_type = prometheus_client.CONTENT_TYPE_LATEST - - -def mount_metrics(app: FastAPI): - """Mount prometheus metrics to a FastAPI app.""" - - registry = get_prometheus_registry() - - # `response_class=PrometheusResponse` is needed to return an HTTP response - # with header "Content-Type: text/plain; version=0.0.4; charset=utf-8" - # instead of the default "application/json" which is incorrect. - # See https://github.com/trallnag/prometheus-fastapi-instrumentator/issues/163#issue-1296092364 - Instrumentator( - excluded_handlers=[ - "/metrics", - "/health", - "/load", - "/ping", - "/version", - "/server_info", - ], - registry=registry, - ).add().instrument(app).expose(app, response_class=PrometheusResponse) - - # Add prometheus asgi middleware to route /metrics requests - metrics_route = Mount("/metrics", make_asgi_app(registry=registry)) - - # Workaround for 307 Redirect for /metrics - metrics_route.path_regex = re.compile("^/metrics(?P.*)$") - app.routes.append(metrics_route) - - def base(request: Request) -> OpenAIServing: # Reuse the existing instance return tokenization(request) @@ -323,16 +278,6 @@ def generate_tokens(request: Request) -> ServingTokens | None: return request.app.state.serving_tokens -@router.get("/health", response_class=Response) -async def health(raw_request: Request) -> Response: - """Health check.""" - try: - await engine_client(raw_request).check_health() - return Response(status_code=200) - except EngineDeadError: - return Response(status_code=503) - - @router.get("/load") async def get_server_load_metrics(request: Request): # This endpoint returns the current server load metrics. @@ -352,167 +297,6 @@ async def get_server_load_metrics(request: Request): return JSONResponse(content={"server_load": request.app.state.server_load_metrics}) -@router.post("/pause") -async def pause_generation( - raw_request: Request, - wait_for_inflight_requests: bool = Query(False), - clear_cache: bool = Query(True), -) -> JSONResponse: - """Pause generation requests to allow weight updates. - - Args: - wait_for_inflight_requests: When ``True`` waits for in-flight - requests to finish before pausing. When ``False`` (default), - aborts any in-flight requests immediately. - clear_cache: Whether to clear KV/prefix caches after draining. - """ - - engine = engine_client(raw_request) - - try: - await engine.pause_generation( - wait_for_inflight_requests=wait_for_inflight_requests, - clear_cache=clear_cache, - ) - return JSONResponse( - content={"status": "paused"}, - status_code=HTTPStatus.OK.value, - ) - - except ValueError as err: - return JSONResponse( - content={"error": str(err)}, - status_code=HTTPStatus.BAD_REQUEST.value, - ) - except Exception as err: # pragma: no cover - defensive - logger.exception("Failed to pause generation") - return JSONResponse( - content={"error": f"Failed to pause generation: {err}"}, - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, - ) - - -@router.post("/resume") -async def resume_generation(raw_request: Request) -> JSONResponse: - """Resume generation after a pause.""" - - engine = engine_client(raw_request) - - try: - await engine.resume_generation() - return JSONResponse( - content={"status": "resumed"}, - status_code=HTTPStatus.OK.value, - ) - except Exception as err: # pragma: no cover - defensive - logger.exception("Failed to resume generation") - return JSONResponse( - content={"error": f"Failed to resume generation: {err}"}, - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, - ) - - -@router.get("/is_paused") -async def is_paused(raw_request: Request) -> JSONResponse: - """Return the current pause status.""" - - engine = engine_client(raw_request) - - try: - paused = await engine.is_paused() - except Exception as err: # pragma: no cover - defensive - logger.exception("Failed to fetch pause status") - return JSONResponse( - content={"error": f"Failed to fetch pause status: {err}"}, - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, - ) - - return JSONResponse(content={"is_paused": paused}) - - -@router.post( - "/tokenize", - dependencies=[Depends(validate_json_request)], - responses={ - HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, - HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, - HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, - HTTPStatus.NOT_IMPLEMENTED.value: {"model": ErrorResponse}, - }, -) -@with_cancellation -async def tokenize(request: TokenizeRequest, raw_request: Request): - handler = tokenization(raw_request) - - try: - generator = await handler.create_tokenize(request, raw_request) - except NotImplementedError as e: - raise HTTPException( - status_code=HTTPStatus.NOT_IMPLEMENTED.value, detail=str(e) - ) from e - except Exception as e: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) - ) from e - - if isinstance(generator, ErrorResponse): - return JSONResponse( - content=generator.model_dump(), status_code=generator.error.code - ) - elif isinstance(generator, TokenizeResponse): - return JSONResponse(content=generator.model_dump()) - - assert_never(generator) - - -@router.post( - "/detokenize", - dependencies=[Depends(validate_json_request)], - responses={ - HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, - HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, - HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, - }, -) -@with_cancellation -async def detokenize(request: DetokenizeRequest, raw_request: Request): - handler = tokenization(raw_request) - - try: - generator = await handler.create_detokenize(request, raw_request) - except OverflowError as e: - raise RequestValidationError(errors=[str(e)]) from e - except Exception as e: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) - ) from e - - if isinstance(generator, ErrorResponse): - return JSONResponse( - content=generator.model_dump(), status_code=generator.error.code - ) - elif isinstance(generator, DetokenizeResponse): - return JSONResponse(content=generator.model_dump()) - - assert_never(generator) - - -def maybe_register_tokenizer_info_endpoint(args): - """Conditionally register the tokenizer info endpoint if enabled.""" - if getattr(args, "enable_tokenizer_info_endpoint", False): - - @router.get("/tokenizer_info") - async def get_tokenizer_info(raw_request: Request): - """Get comprehensive tokenizer information.""" - result = await tokenization(raw_request).get_tokenizer_info() - return JSONResponse( - content=result.model_dump(), - status_code=result.error.code - if isinstance(result, ErrorResponse) - else 200, - ) - - @router.get("/v1/models") async def show_available_models(raw_request: Request): handler = models(raw_request) @@ -898,33 +682,6 @@ async def reset_mm_cache(raw_request: Request): await engine_client(raw_request).reset_mm_cache() return Response(status_code=200) - @router.post("/sleep") - async def sleep(raw_request: Request): - # get POST params - level = raw_request.query_params.get("level", "1") - await engine_client(raw_request).sleep(int(level)) - # FIXME: in v0 with frontend multiprocessing, the sleep command - # is sent but does not finish yet when we return a response. - return Response(status_code=200) - - @router.post("/wake_up") - async def wake_up(raw_request: Request): - tags = raw_request.query_params.getlist("tags") - if tags == []: - # set to None to wake up all tags if no tags are provided - tags = None - logger.info("wake up the engine with tags: %s", tags) - await engine_client(raw_request).wake_up(tags) - # FIXME: in v0 with frontend multiprocessing, the wake-up command - # is sent but does not finish yet when we return a response. - return Response(status_code=200) - - @router.get("/is_sleeping") - async def is_sleeping(raw_request: Request): - logger.info("check whether the engine is sleeping") - is_sleeping = await engine_client(raw_request).is_sleeping() - return JSONResponse(content={"is_sleeping": is_sleeping}) - @router.post("/collective_rpc") async def collective_rpc(raw_request: Request): try: @@ -952,76 +709,13 @@ async def collective_rpc(raw_request: Request): return Response(status_code=200) response: list[Any] = [] for result in results: - if result is None or isinstance(result, (dict, list)): + if result is None or isinstance(result, dict | list): response.append(result) else: response.append(str(result)) return JSONResponse(content={"results": response}) -@router.post( - "/scale_elastic_ep", - dependencies=[Depends(validate_json_request)], - responses={ - HTTPStatus.OK.value: {"model": dict}, - HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, - HTTPStatus.REQUEST_TIMEOUT.value: {"model": ErrorResponse}, - HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, - }, -) -async def scale_elastic_ep(raw_request: Request): - try: - body = await raw_request.json() - except json.JSONDecodeError as e: - raise HTTPException(status_code=400, detail="Invalid JSON format") from e # noqa: B904 - - new_data_parallel_size = body.get("new_data_parallel_size") - drain_timeout = body.get("drain_timeout", 120) # Default 2 minutes - - if new_data_parallel_size is None: - raise HTTPException( - status_code=400, detail="new_data_parallel_size is required" - ) - - if not isinstance(new_data_parallel_size, int) or new_data_parallel_size <= 0: - raise HTTPException( - status_code=400, detail="new_data_parallel_size must be a positive integer" - ) - - if not isinstance(drain_timeout, int) or drain_timeout <= 0: - raise HTTPException( - status_code=400, detail="drain_timeout must be a positive integer" - ) - - # Set scaling flag to prevent new requests - global _scaling_elastic_ep - _scaling_elastic_ep = True - client = engine_client(raw_request) - try: - await client.scale_elastic_ep(new_data_parallel_size, drain_timeout) - return JSONResponse( - { - "message": f"Scaled to {new_data_parallel_size} data parallel engines", - } - ) - except TimeoutError as e: - raise HTTPException( - status_code=408, - detail="Scale failed due to request drain timeout " - f"after {drain_timeout} seconds", - ) from e - except Exception as e: - logger.error("Scale failed: %s", e) - raise HTTPException(status_code=500, detail="Scale failed") from e - finally: - _scaling_elastic_ep = False - - -@router.post("/is_scaling_elastic_ep") -async def is_scaling_elastic_ep(raw_request: Request): - return JSONResponse({"is_scaling_elastic_ep": _scaling_elastic_ep}) - - @router.post( "/inference/v1/generate", dependencies=[Depends(validate_json_request)], @@ -1057,33 +751,6 @@ async def generate(request: GenerateRequest, raw_request: Request): return StreamingResponse(content=generator, media_type="text/event-stream") -if envs.VLLM_TORCH_PROFILER_DIR: - logger.warning_once( - "Torch Profiler is enabled in the API server. This should ONLY be " - "used for local development!" - ) -elif envs.VLLM_TORCH_CUDA_PROFILE: - logger.warning_once( - "CUDA Profiler is enabled in the API server. This should ONLY be " - "used for local development!" - ) -if envs.VLLM_TORCH_PROFILER_DIR or envs.VLLM_TORCH_CUDA_PROFILE: - - @router.post("/start_profile") - async def start_profile(raw_request: Request): - logger.info("Starting profiler...") - await engine_client(raw_request).start_profile() - logger.info("Profiler started.") - return Response(status_code=200) - - @router.post("/stop_profile") - async def stop_profile(raw_request: Request): - logger.info("Stopping profiler...") - await engine_client(raw_request).stop_profile() - logger.info("Profiler stopped.") - return Response(status_code=200) - - def load_log_config(log_config_file: str | None) -> dict | None: if not log_config_file: return None @@ -1354,14 +1021,9 @@ def build_app(args: Namespace) -> FastAPI: else: app = FastAPI(lifespan=lifespan) - if envs.VLLM_ALLOW_RUNTIME_LORA_UPDATING: - logger.warning( - "LoRA dynamic loading & unloading is enabled in the API server. " - "This should ONLY be used for local development!" - ) - from vllm.entrypoints.dynamic_lora import register_dynamic_lora_routes + from vllm.entrypoints.serve import register_vllm_serve_api_routers - register_dynamic_lora_routes(router) + register_vllm_serve_api_routers(app, args=args) from vllm.entrypoints.sagemaker.routes import register_sagemaker_routes @@ -1370,8 +1032,6 @@ def build_app(args: Namespace) -> FastAPI: app.root_path = args.root_path - mount_metrics(app) - from vllm.entrypoints.pooling import register_pooling_api_routers register_pooling_api_routers(app) @@ -1515,7 +1175,7 @@ async def init_app_state( state.engine_client = engine_client state.log_stats = not args.disable_log_stats state.vllm_config = vllm_config - + state.args = args supported_tasks = await engine_client.get_supported_tasks() logger.info("Supported tasks: %s", supported_tasks) @@ -1839,7 +1499,6 @@ async def run_server_worker( args, client_config=client_config, ) as engine_client: - maybe_register_tokenizer_info_endpoint(args) app = build_app(args) await init_app_state(engine_client, app.state, args) diff --git a/vllm/entrypoints/pooling/__init__.py b/vllm/entrypoints/pooling/__init__.py index 789fd8bd262b..027addc892ec 100644 --- a/vllm/entrypoints/pooling/__init__.py +++ b/vllm/entrypoints/pooling/__init__.py @@ -10,7 +10,7 @@ def register_pooling_api_routers(app: FastAPI): from vllm.entrypoints.pooling.pooling.api_router import router as pooling_router from vllm.entrypoints.pooling.score.api_router import router as score_router - app.include_router(classify_router) - app.include_router(embed_router) - app.include_router(score_router) - app.include_router(pooling_router) + # app.include_router(classify_router) + # app.include_router(embed_router) + # app.include_router(score_router) + # app.include_router(pooling_router) diff --git a/vllm/entrypoints/sagemaker/routes.py b/vllm/entrypoints/sagemaker/routes.py index 108fdd773e32..ea88c0fc4b97 100644 --- a/vllm/entrypoints/sagemaker/routes.py +++ b/vllm/entrypoints/sagemaker/routes.py @@ -16,7 +16,6 @@ completion, create_chat_completion, create_completion, - health, validate_json_request, ) from vllm.entrypoints.openai.protocol import ( @@ -38,6 +37,7 @@ score, ) from vllm.entrypoints.pooling.score.protocol import RerankRequest, ScoreRequest +from vllm.entrypoints.serve.instrumentator.health import health # TODO: RequestType = TypeForm[BaseModel] when recognized by type checkers # (requires typing_extensions >= 4.13) diff --git a/vllm/entrypoints/serve/__init__.py b/vllm/entrypoints/serve/__init__.py new file mode 100644 index 000000000000..d5b1c41ab066 --- /dev/null +++ b/vllm/entrypoints/serve/__init__.py @@ -0,0 +1,69 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +from argparse import Namespace + +from fastapi import FastAPI + +from vllm import envs +from vllm.logger import init_logger + +logger = init_logger("vllm.entrypoints.serve") + + +def register_vllm_serve_api_routers(app: FastAPI, args: Namespace): + if envs.VLLM_ALLOW_RUNTIME_LORA_UPDATING: + logger.warning( + "LoRA dynamic loading & unloading is enabled in the API server. " + "This should ONLY be used for local development!" + ) + from vllm.entrypoints.serve.lora.api_router import router as lora_router + + app.include_router(lora_router) + from vllm.entrypoints.serve.elastic_ep.api_router import ( + router as elastic_ep_router, + ) + + app.include_router(elastic_ep_router) + + if envs.VLLM_TORCH_PROFILER_DIR: + logger.warning_once( + "Torch Profiler is enabled in the API server. This should ONLY be " + "used for local development!" + ) + elif envs.VLLM_TORCH_CUDA_PROFILE: + logger.warning_once( + "CUDA Profiler is enabled in the API server. This should ONLY be " + "used for local development!" + ) + if envs.VLLM_TORCH_PROFILER_DIR or envs.VLLM_TORCH_CUDA_PROFILE: + from vllm.entrypoints.serve.profile.api_router import ( + router as profile_router, + ) + + app.include_router(profile_router) + if envs.VLLM_SERVER_DEV_MODE: + logger.warning( + "SECURITY WARNING: Development endpoints are enabled! " + "This should NOT be used in production!" + ) + from vllm.entrypoints.serve.sleep.api_router import router as sleep_router + + app.include_router(sleep_router) + + from vllm.entrypoints.serve.tokenize.api_router import router as tokenize_router + from vllm.entrypoints.serve.tokenize.api_router import tokenizer_info_router + + app.include_router(tokenize_router) + if getattr(args, "enable_tokenizer_info_endpoint", False): + """Conditionally register the tokenizer info endpoint if enabled.""" + app.include_router(tokenizer_info_router) + from vllm.entrypoints.serve.instrumentator.health import ( + router as health_router, + ) + + app.include_router(health_router) + + from vllm.entrypoints.serve.instrumentator.metrics import mount_metrics + + mount_metrics(app) diff --git a/vllm/entrypoints/serve/elastic_ep/__init__.py b/vllm/entrypoints/serve/elastic_ep/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/vllm/entrypoints/serve/elastic_ep/api_router.py b/vllm/entrypoints/serve/elastic_ep/api_router.py new file mode 100644 index 000000000000..b7eea3a56a1c --- /dev/null +++ b/vllm/entrypoints/serve/elastic_ep/api_router.py @@ -0,0 +1,88 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + + +import json +from http import HTTPStatus + +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.responses import JSONResponse + +from vllm.engine.protocol import EngineClient +from vllm.entrypoints.openai.api_server import validate_json_request +from vllm.entrypoints.openai.protocol import ( + ErrorResponse, +) +from vllm.logger import init_logger + +logger = init_logger(__name__) + + +router = APIRouter() + + +def engine_client(request: Request) -> EngineClient: + return request.app.state.engine_client + + +@router.post( + "/scale_elastic_ep", + dependencies=[Depends(validate_json_request)], + responses={ + HTTPStatus.OK.value: {"model": dict}, + HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, + HTTPStatus.REQUEST_TIMEOUT.value: {"model": ErrorResponse}, + HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, + }, +) +async def scale_elastic_ep(raw_request: Request): + try: + body = await raw_request.json() + except json.JSONDecodeError as e: + raise HTTPException(status_code=400, detail="Invalid JSON format") from e # noqa: B904 + + new_data_parallel_size = body.get("new_data_parallel_size") + drain_timeout = body.get("drain_timeout", 120) # Default 2 minutes + + if new_data_parallel_size is None: + raise HTTPException( + status_code=400, detail="new_data_parallel_size is required" + ) + + if not isinstance(new_data_parallel_size, int) or new_data_parallel_size <= 0: + raise HTTPException( + status_code=400, detail="new_data_parallel_size must be a positive integer" + ) + + if not isinstance(drain_timeout, int) or drain_timeout <= 0: + raise HTTPException( + status_code=400, detail="drain_timeout must be a positive integer" + ) + + # Set scaling flag to prevent new requests + global _scaling_elastic_ep + _scaling_elastic_ep = True + client = engine_client(raw_request) + try: + await client.scale_elastic_ep(new_data_parallel_size, drain_timeout) + return JSONResponse( + { + "message": f"Scaled to {new_data_parallel_size} data parallel engines", + } + ) + except TimeoutError as e: + raise HTTPException( + status_code=408, + detail="Scale failed due to request drain timeout " + f"after {drain_timeout} seconds", + ) from e + except Exception as e: + logger.error("Scale failed: %s", e) + raise HTTPException(status_code=500, detail="Scale failed") from e + finally: + _scaling_elastic_ep = False + + +@router.post("/is_scaling_elastic_ep") +async def is_scaling_elastic_ep(raw_request: Request): + return JSONResponse({"is_scaling_elastic_ep": _scaling_elastic_ep}) diff --git a/vllm/entrypoints/serve/instrumentator/__init__.py b/vllm/entrypoints/serve/instrumentator/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/vllm/entrypoints/serve/instrumentator/health.py b/vllm/entrypoints/serve/instrumentator/health.py new file mode 100644 index 000000000000..8b079ce31618 --- /dev/null +++ b/vllm/entrypoints/serve/instrumentator/health.py @@ -0,0 +1,29 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + + +from fastapi import APIRouter, Request +from fastapi.responses import Response + +from vllm.engine.protocol import EngineClient +from vllm.logger import init_logger +from vllm.v1.engine.exceptions import EngineDeadError + +logger = init_logger(__name__) + + +router = APIRouter() + + +def engine_client(request: Request) -> EngineClient: + return request.app.state.engine_client + + +@router.get("/health", response_class=Response) +async def health(raw_request: Request) -> Response: + """Health check.""" + try: + await engine_client(raw_request).check_health() + return Response(status_code=200) + except EngineDeadError: + return Response(status_code=503) diff --git a/vllm/entrypoints/serve/instrumentator/metrics.py b/vllm/entrypoints/serve/instrumentator/metrics.py new file mode 100644 index 000000000000..e998c6c00074 --- /dev/null +++ b/vllm/entrypoints/serve/instrumentator/metrics.py @@ -0,0 +1,42 @@ +import re + +import prometheus_client +from fastapi import FastAPI, Response +from prometheus_client import make_asgi_app +from prometheus_fastapi_instrumentator import Instrumentator +from starlette.routing import Mount + +from vllm.v1.metrics.prometheus import get_prometheus_registry + + +class PrometheusResponse(Response): + media_type = prometheus_client.CONTENT_TYPE_LATEST + + +def mount_metrics(app: FastAPI): + """Mount prometheus metrics to a FastAPI app.""" + + registry = get_prometheus_registry() + + # `response_class=PrometheusResponse` is needed to return an HTTP response + # with header "Content-Type: text/plain; version=0.0.4; charset=utf-8" + # instead of the default "application/json" which is incorrect. + # See https://github.com/trallnag/prometheus-fastapi-instrumentator/issues/163#issue-1296092364 + Instrumentator( + excluded_handlers=[ + "/metrics", + "/health", + "/load", + "/ping", + "/version", + "/server_info", + ], + registry=registry, + ).add().instrument(app).expose(app, response_class=PrometheusResponse) + + # Add prometheus asgi middleware to route /metrics requests + metrics_route = Mount("/metrics", make_asgi_app(registry=registry)) + + # Workaround for 307 Redirect for /metrics + metrics_route.path_regex = re.compile("^/metrics(?P.*)$") + app.routes.append(metrics_route) diff --git a/vllm/entrypoints/serve/lora/__init__.py b/vllm/entrypoints/serve/lora/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/vllm/entrypoints/serve/lora/api_router.py b/vllm/entrypoints/serve/lora/api_router.py new file mode 100644 index 000000000000..5cb39d36ebe5 --- /dev/null +++ b/vllm/entrypoints/serve/lora/api_router.py @@ -0,0 +1,56 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + + +import model_hosting_container_standards.sagemaker as sagemaker_standards +from fastapi import APIRouter, Depends, Request +from fastapi.responses import JSONResponse, Response + +from vllm.entrypoints.openai.api_server import models, validate_json_request +from vllm.entrypoints.openai.protocol import ( + ErrorResponse, + LoadLoRAAdapterRequest, + UnloadLoRAAdapterRequest, +) +from vllm.entrypoints.openai.serving_models import OpenAIServingModels +from vllm.logger import init_logger + +logger = init_logger(__name__) + + +router = APIRouter() + + +@sagemaker_standards.register_load_adapter_handler( + request_shape={ + "lora_name": "body.name", + "lora_path": "body.src", + }, +) +@router.post("/v1/load_lora_adapter", dependencies=[Depends(validate_json_request)]) +async def load_lora_adapter(request: LoadLoRAAdapterRequest, raw_request: Request): + handler: OpenAIServingModels = models(raw_request) + response = await handler.load_lora_adapter(request) + if isinstance(response, ErrorResponse): + return JSONResponse( + content=response.model_dump(), status_code=response.error.code + ) + + return Response(status_code=200, content=response) + + +@sagemaker_standards.register_unload_adapter_handler( + request_shape={ + "lora_name": "path_params.adapter_name", + } +) +@router.post("/v1/unload_lora_adapter", dependencies=[Depends(validate_json_request)]) +async def unload_lora_adapter(request: UnloadLoRAAdapterRequest, raw_request: Request): + handler: OpenAIServingModels = models(raw_request) + response = await handler.unload_lora_adapter(request) + if isinstance(response, ErrorResponse): + return JSONResponse( + content=response.model_dump(), status_code=response.error.code + ) + + return Response(status_code=200, content=response) diff --git a/vllm/entrypoints/serve/profile/__init__.py b/vllm/entrypoints/serve/profile/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/vllm/entrypoints/serve/profile/api_router.py b/vllm/entrypoints/serve/profile/api_router.py new file mode 100644 index 000000000000..9d69e232c562 --- /dev/null +++ b/vllm/entrypoints/serve/profile/api_router.py @@ -0,0 +1,34 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + + +from fastapi import APIRouter, Request +from fastapi.responses import Response + +from vllm.engine.protocol import EngineClient +from vllm.logger import init_logger + +logger = init_logger(__name__) + + +router = APIRouter() + + +def engine_client(request: Request) -> EngineClient: + return request.app.state.engine_client + + +@router.post("/start_profile") +async def start_profile(raw_request: Request): + logger.info("Starting profiler...") + await engine_client(raw_request).start_profile() + logger.info("Profiler started.") + return Response(status_code=200) + + +@router.post("/stop_profile") +async def stop_profile(raw_request: Request): + logger.info("Stopping profiler...") + await engine_client(raw_request).stop_profile() + logger.info("Profiler stopped.") + return Response(status_code=200) diff --git a/vllm/entrypoints/serve/rl/__init__.py b/vllm/entrypoints/serve/rl/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/vllm/entrypoints/serve/rl/api_router.py b/vllm/entrypoints/serve/rl/api_router.py new file mode 100644 index 000000000000..c5c011d97bfa --- /dev/null +++ b/vllm/entrypoints/serve/rl/api_router.py @@ -0,0 +1,98 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + + +from http import HTTPStatus + +from fastapi import APIRouter, Query, Request +from fastapi.responses import JSONResponse + +from vllm.engine.protocol import EngineClient +from vllm.logger import init_logger + +logger = init_logger(__name__) + + +def engine_client(request: Request) -> EngineClient: + return request.app.state.engine_client + + +router = APIRouter() + + +@router.post("/pause") +async def pause_generation( + raw_request: Request, + wait_for_inflight_requests: bool = Query(False), + clear_cache: bool = Query(True), +) -> JSONResponse: + """Pause generation requests to allow weight updates. + + Args: + wait_for_inflight_requests: When ``True`` waits for in-flight + requests to finish before pausing. When ``False`` (default), + aborts any in-flight requests immediately. + clear_cache: Whether to clear KV/prefix caches after draining. + """ + + engine = engine_client(raw_request) + + try: + await engine.pause_generation( + wait_for_inflight_requests=wait_for_inflight_requests, + clear_cache=clear_cache, + ) + return JSONResponse( + content={"status": "paused"}, + status_code=HTTPStatus.OK.value, + ) + + except ValueError as err: + return JSONResponse( + content={"error": str(err)}, + status_code=HTTPStatus.BAD_REQUEST.value, + ) + except Exception as err: # pragma: no cover - defensive + logger.exception("Failed to pause generation") + return JSONResponse( + content={"error": f"Failed to pause generation: {err}"}, + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, + ) + + +@router.post("/resume") +async def resume_generation(raw_request: Request) -> JSONResponse: + """Resume generation after a pause.""" + + engine = engine_client(raw_request) + + try: + await engine.resume_generation() + return JSONResponse( + content={"status": "resumed"}, + status_code=HTTPStatus.OK.value, + ) + except Exception as err: # pragma: no cover - defensive + logger.exception("Failed to resume generation") + return JSONResponse( + content={"error": f"Failed to resume generation: {err}"}, + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, + ) + + +@router.get("/is_paused") +async def is_paused(raw_request: Request) -> JSONResponse: + """Return the current pause status.""" + + engine = engine_client(raw_request) + + try: + paused = await engine.is_paused() + except Exception as err: # pragma: no cover - defensive + logger.exception("Failed to fetch pause status") + return JSONResponse( + content={"error": f"Failed to fetch pause status: {err}"}, + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, + ) + + return JSONResponse(content={"is_paused": paused}) diff --git a/vllm/entrypoints/serve/sleep/__init__.py b/vllm/entrypoints/serve/sleep/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/vllm/entrypoints/serve/sleep/api_router.py b/vllm/entrypoints/serve/sleep/api_router.py new file mode 100644 index 000000000000..b94b2c00e468 --- /dev/null +++ b/vllm/entrypoints/serve/sleep/api_router.py @@ -0,0 +1,48 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + + +from fastapi import APIRouter, Request +from fastapi.responses import JSONResponse, Response + +from vllm.engine.protocol import EngineClient +from vllm.logger import init_logger + +logger = init_logger(__name__) + + +router = APIRouter() + + +def engine_client(request: Request) -> EngineClient: + return request.app.state.engine_client + + +@router.post("/sleep") +async def sleep(raw_request: Request): + # get POST params + level = raw_request.query_params.get("level", "1") + await engine_client(raw_request).sleep(int(level)) + # FIXME: in v0 with frontend multiprocessing, the sleep command + # is sent but does not finish yet when we return a response. + return Response(status_code=200) + + +@router.post("/wake_up") +async def wake_up(raw_request: Request): + tags = raw_request.query_params.getlist("tags") + if tags == []: + # set to None to wake up all tags if no tags are provided + tags = None + logger.info("wake up the engine with tags: %s", tags) + await engine_client(raw_request).wake_up(tags) + # FIXME: in v0 with frontend multiprocessing, the wake-up command + # is sent but does not finish yet when we return a response. + return Response(status_code=200) + + +@router.get("/is_sleeping") +async def is_sleeping(raw_request: Request): + logger.info("check whether the engine is sleeping") + is_sleeping = await engine_client(raw_request).is_sleeping() + return JSONResponse(content={"is_sleeping": is_sleeping}) diff --git a/vllm/entrypoints/serve/tokenize/__init__.py b/vllm/entrypoints/serve/tokenize/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/vllm/entrypoints/serve/tokenize/api_router.py b/vllm/entrypoints/serve/tokenize/api_router.py new file mode 100644 index 000000000000..2082691cf162 --- /dev/null +++ b/vllm/entrypoints/serve/tokenize/api_router.py @@ -0,0 +1,111 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + + +from http import HTTPStatus + +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.exceptions import RequestValidationError +from fastapi.responses import JSONResponse +from typing_extensions import assert_never + +from vllm.entrypoints.openai.api_server import validate_json_request +from vllm.entrypoints.openai.protocol import ( + DetokenizeRequest, + DetokenizeResponse, + ErrorResponse, + TokenizeRequest, + TokenizeResponse, +) +from vllm.entrypoints.serve.tokenize.protocol import OpenAIServingTokenization +from vllm.entrypoints.utils import ( + with_cancellation, +) +from vllm.logger import init_logger + +logger = init_logger(__name__) + + +router = APIRouter() +tokenizer_info_router = APIRouter() + + +def tokenization(request: Request) -> OpenAIServingTokenization: + return request.app.state.openai_serving_tokenization + + +@router.post( + "/tokenize", + dependencies=[Depends(validate_json_request)], + responses={ + HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, + HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, + HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, + HTTPStatus.NOT_IMPLEMENTED.value: {"model": ErrorResponse}, + }, +) +@with_cancellation +async def tokenize(request: TokenizeRequest, raw_request: Request): + handler = tokenization(raw_request) + + try: + generator = await handler.create_tokenize(request, raw_request) + except NotImplementedError as e: + raise HTTPException( + status_code=HTTPStatus.NOT_IMPLEMENTED.value, detail=str(e) + ) from e + except Exception as e: + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) + ) from e + + if isinstance(generator, ErrorResponse): + return JSONResponse( + content=generator.model_dump(), status_code=generator.error.code + ) + elif isinstance(generator, TokenizeResponse): + return JSONResponse(content=generator.model_dump()) + + assert_never(generator) + + +@router.post( + "/detokenize", + dependencies=[Depends(validate_json_request)], + responses={ + HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, + HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, + HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, + }, +) +@with_cancellation +async def detokenize(request: DetokenizeRequest, raw_request: Request): + handler = tokenization(raw_request) + + try: + generator = await handler.create_detokenize(request, raw_request) + except OverflowError as e: + raise RequestValidationError(errors=[str(e)]) from e + except Exception as e: + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) + ) from e + + if isinstance(generator, ErrorResponse): + return JSONResponse( + content=generator.model_dump(), status_code=generator.error.code + ) + elif isinstance(generator, DetokenizeResponse): + return JSONResponse(content=generator.model_dump()) + + assert_never(generator) + + +@tokenizer_info_router.get("/tokenizer_info") +async def get_tokenizer_info(raw_request: Request): + """Get comprehensive tokenizer information.""" + result = await tokenization(raw_request).get_tokenizer_info() + return JSONResponse( + content=result.model_dump(), + status_code=result.error.code if isinstance(result, ErrorResponse) else 200, + ) diff --git a/vllm/entrypoints/openai/serving_tokenization.py b/vllm/entrypoints/serve/tokenize/protocol.py similarity index 100% rename from vllm/entrypoints/openai/serving_tokenization.py rename to vllm/entrypoints/serve/tokenize/protocol.py From 8d926095869d08a1f6ef13a7503e70cc3516fe5e Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Mon, 1 Dec 2025 11:11:01 +0000 Subject: [PATCH 02/14] [Refactor] [1/N] to simplify the vLLM serving architecture Signed-off-by: chaunceyjiang --- vllm/entrypoints/pooling/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/vllm/entrypoints/pooling/__init__.py b/vllm/entrypoints/pooling/__init__.py index 027addc892ec..789fd8bd262b 100644 --- a/vllm/entrypoints/pooling/__init__.py +++ b/vllm/entrypoints/pooling/__init__.py @@ -10,7 +10,7 @@ def register_pooling_api_routers(app: FastAPI): from vllm.entrypoints.pooling.pooling.api_router import router as pooling_router from vllm.entrypoints.pooling.score.api_router import router as score_router - # app.include_router(classify_router) - # app.include_router(embed_router) - # app.include_router(score_router) - # app.include_router(pooling_router) + app.include_router(classify_router) + app.include_router(embed_router) + app.include_router(score_router) + app.include_router(pooling_router) From 28f94d5951463cc5c18be65b790f557fa3997f19 Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Mon, 1 Dec 2025 11:18:57 +0000 Subject: [PATCH 03/14] [Refactor] [1/N] to simplify the vLLM serving architecture Signed-off-by: chaunceyjiang --- vllm/entrypoints/serve/instrumentator/metrics.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/vllm/entrypoints/serve/instrumentator/metrics.py b/vllm/entrypoints/serve/instrumentator/metrics.py index e998c6c00074..396e859e1e09 100644 --- a/vllm/entrypoints/serve/instrumentator/metrics.py +++ b/vllm/entrypoints/serve/instrumentator/metrics.py @@ -1,3 +1,7 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + + import re import prometheus_client From c1a72c68a0bc194745b2c5e9a585750e78c70898 Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Mon, 1 Dec 2025 13:45:20 +0000 Subject: [PATCH 04/14] [Refactor] [1/N] to simplify the vLLM serving architecture Signed-off-by: chaunceyjiang --- vllm/entrypoints/openai/api_server.py | 102 +---------------- vllm/entrypoints/serve/__init__.py | 10 ++ vllm/entrypoints/serve/disagg/__init__.py | 0 vllm/entrypoints/serve/disagg/api_router.py | 106 ++++++++++++++++++ vllm/entrypoints/serve/disagg/protocol.py | 90 +++++++++++++++ .../disagg/serving.py} | 2 + .../serve/elastic_ep/api_router.py | 11 +- vllm/entrypoints/serve/elastic_ep/protocol.py | 49 ++++++++ 8 files changed, 268 insertions(+), 102 deletions(-) create mode 100644 vllm/entrypoints/serve/disagg/__init__.py create mode 100644 vllm/entrypoints/serve/disagg/api_router.py create mode 100644 vllm/entrypoints/serve/disagg/protocol.py rename vllm/entrypoints/{openai/serving_tokens.py => serve/disagg/serving.py} (99%) create mode 100644 vllm/entrypoints/serve/elastic_ep/protocol.py diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index e86623f66cc1..084a58c1b3f4 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -52,8 +52,6 @@ CompletionResponse, ErrorInfo, ErrorResponse, - GenerateRequest, - GenerateResponse, ResponsesRequest, ResponsesResponse, StreamingResponsesResponse, @@ -70,7 +68,6 @@ OpenAIServingModels, ) from vllm.entrypoints.openai.serving_responses import OpenAIServingResponses -from vllm.entrypoints.openai.serving_tokens import ServingTokens from vllm.entrypoints.openai.serving_transcription import ( OpenAIServingTranscription, OpenAIServingTranslation, @@ -81,6 +78,10 @@ from vllm.entrypoints.pooling.embed.serving import OpenAIServingEmbedding from vllm.entrypoints.pooling.pooling.serving import OpenAIServingPooling from vllm.entrypoints.pooling.score.serving import ServingScores +from vllm.entrypoints.serve.disagg.serving import ServingTokens +from vllm.entrypoints.serve.elastic_ep.protocol import ( + ScalingMiddleware, +) from vllm.entrypoints.serve.tokenize.protocol import OpenAIServingTokenization from vllm.entrypoints.tool_server import DemoToolServer, MCPToolServer, ToolServer from vllm.entrypoints.utils import ( @@ -716,41 +717,6 @@ async def collective_rpc(raw_request: Request): return JSONResponse(content={"results": response}) -@router.post( - "/inference/v1/generate", - dependencies=[Depends(validate_json_request)], - responses={ - HTTPStatus.OK.value: {"content": {"text/event-stream": {}}}, - HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, - HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, - HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, - }, -) -@with_cancellation -@load_aware_call -async def generate(request: GenerateRequest, raw_request: Request): - handler = generate_tokens(raw_request) - if handler is None: - return base(raw_request).create_error_response( - message="The model does not support generate tokens API" - ) - try: - generator = await handler.serve_tokens(request, raw_request) - except Exception as e: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) - ) from e - if isinstance(generator, ErrorResponse): - return JSONResponse( - content=generator.model_dump(), status_code=generator.error.code - ) - - elif isinstance(generator, GenerateResponse): - return JSONResponse(content=generator.model_dump()) - - return StreamingResponse(content=generator, media_type="text/event-stream") - - def load_log_config(log_config_file: str | None) -> dict | None: if not log_config_file: return None @@ -843,41 +809,6 @@ async def send_with_request_id(message: Message) -> None: return self.app(scope, receive, send_with_request_id) -# Global variable to track scaling state -_scaling_elastic_ep = False - - -class ScalingMiddleware: - """ - Middleware that checks if the model is currently scaling and - returns a 503 Service Unavailable response if it is. - - This middleware applies to all HTTP requests and prevents - processing when the model is in a scaling state. - """ - - def __init__(self, app: ASGIApp) -> None: - self.app = app - - def __call__(self, scope: Scope, receive: Receive, send: Send) -> Awaitable[None]: - if scope["type"] != "http": - return self.app(scope, receive, send) - - # Check global scaling state - global _scaling_elastic_ep - if _scaling_elastic_ep: - # Return 503 Service Unavailable response - response = JSONResponse( - content={ - "error": "The model is currently scaling. Please try again later." - }, - status_code=503, - ) - return response(scope, receive, send) - - return self.app(scope, receive, send) - - def _extract_content_from_chunk(chunk_data: dict) -> str: """Extract content from a streaming response chunk.""" try: @@ -1122,31 +1053,6 @@ async def log_response(request: Request, call_next): ) app = sagemaker_standards.bootstrap(app) - # Optional endpoints - if args.tokens_only: - - @app.post("/abort_requests") - async def abort_requests(raw_request: Request): - """ - Abort one or more requests. To be used in a - Disaggregated Everything setup. - """ - try: - body = await raw_request.json() - except json.JSONDecodeError as e: - raise HTTPException( - status_code=HTTPStatus.BAD_REQUEST.value, - detail=f"JSON decode error: {e}", - ) from e - request_ids = body.get("request_ids") - if request_ids is None: - raise HTTPException( - status_code=HTTPStatus.BAD_REQUEST.value, - detail="Missing 'request_ids' in request body", - ) - # Abort requests in background - asyncio.create_task(engine_client(raw_request).abort(request_ids)) - return Response(status_code=200) return app diff --git a/vllm/entrypoints/serve/__init__.py b/vllm/entrypoints/serve/__init__.py index d5b1c41ab066..fd4f53148266 100644 --- a/vllm/entrypoints/serve/__init__.py +++ b/vllm/entrypoints/serve/__init__.py @@ -58,6 +58,16 @@ def register_vllm_serve_api_routers(app: FastAPI, args: Namespace): if getattr(args, "enable_tokenizer_info_endpoint", False): """Conditionally register the tokenizer info endpoint if enabled.""" app.include_router(tokenizer_info_router) + + from vllm.entrypoints.serve.disagg.api_router import router as disagg_router + + app.include_router(disagg_router) + # Optional endpoints + if getattr(args, "tokens_only", False): + """Conditionally register the disagg abort endpoint if enabled.""" + from vllm.entrypoints.serve.disagg.api_router import abort_router + + app.include_router(abort_router) from vllm.entrypoints.serve.instrumentator.health import ( router as health_router, ) diff --git a/vllm/entrypoints/serve/disagg/__init__.py b/vllm/entrypoints/serve/disagg/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/vllm/entrypoints/serve/disagg/api_router.py b/vllm/entrypoints/serve/disagg/api_router.py new file mode 100644 index 000000000000..7548f0d56b9d --- /dev/null +++ b/vllm/entrypoints/serve/disagg/api_router.py @@ -0,0 +1,106 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + + +import asyncio +import json +from http import HTTPStatus + +from fastapi import APIRouter, Depends, HTTPException, Request, Response +from fastapi.responses import JSONResponse, StreamingResponse + +from vllm.engine.protocol import EngineClient +from vllm.entrypoints.openai.api_server import validate_json_request +from vllm.entrypoints.openai.protocol import ( + ErrorResponse, +) +from vllm.entrypoints.serve.disagg.protocol import ( + GenerateRequest, + GenerateResponse, +) +from vllm.entrypoints.serve.disagg.serving import ( + ServingTokens, +) +from vllm.entrypoints.serve.tokenize.protocol import OpenAIServingTokenization +from vllm.entrypoints.utils import ( + load_aware_call, + with_cancellation, +) +from vllm.logger import init_logger + +logger = init_logger(__name__) + + +router = APIRouter() +abort_router = APIRouter() + + +def tokenization(request: Request) -> OpenAIServingTokenization: + return request.app.state.openai_serving_tokenization + + +def generate_tokens(request: Request) -> ServingTokens | None: + return request.app.state.serving_tokens + + +def engine_client(request: Request) -> EngineClient: + return request.app.state.engine_client + + +@router.post( + "/inference/v1/generate", + dependencies=[Depends(validate_json_request)], + responses={ + HTTPStatus.OK.value: {"content": {"text/event-stream": {}}}, + HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, + HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, + HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, + }, +) +@with_cancellation +@load_aware_call +async def generate(request: GenerateRequest, raw_request: Request): + handler = generate_tokens(raw_request) + if handler is None: + return tokenization(raw_request).create_error_response( + message="The model does not support generate tokens API" + ) + try: + generator = await handler.serve_tokens(request, raw_request) + except Exception as e: + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) + ) from e + if isinstance(generator, ErrorResponse): + return JSONResponse( + content=generator.model_dump(), status_code=generator.error.code + ) + + elif isinstance(generator, GenerateResponse): + return JSONResponse(content=generator.model_dump()) + + return StreamingResponse(content=generator, media_type="text/event-stream") + + +@abort_router.post("/abort_requests") +async def abort_requests(raw_request: Request): + """ + Abort one or more requests. To be used in a + Disaggregated Everything setup. + """ + try: + body = await raw_request.json() + except json.JSONDecodeError as e: + raise HTTPException( + status_code=HTTPStatus.BAD_REQUEST.value, + detail=f"JSON decode error: {e}", + ) from e + request_ids = body.get("request_ids") + if request_ids is None: + raise HTTPException( + status_code=HTTPStatus.BAD_REQUEST.value, + detail="Missing 'request_ids' in request body", + ) + # Abort requests in background + asyncio.create_task(engine_client(raw_request).abort(request_ids)) + return Response(status_code=200) diff --git a/vllm/entrypoints/serve/disagg/protocol.py b/vllm/entrypoints/serve/disagg/protocol.py new file mode 100644 index 000000000000..251fcf12ed7d --- /dev/null +++ b/vllm/entrypoints/serve/disagg/protocol.py @@ -0,0 +1,90 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +from typing import Any + +from pydantic import BaseModel, Field + +from vllm.entrypoints.openai.protocol import ( + ChatCompletionLogProbs, + Logprob, + SamplingParams, + StreamOptions, +) +from vllm.utils import random_uuid + + +####### Tokens IN <> Tokens OUT ####### +class GenerateRequest(BaseModel): + request_id: str = Field( + default_factory=lambda: f"{random_uuid()}", + description=( + "The request_id related to this request. If the caller does " + "not set it, a random_uuid will be generated. This id is used " + "through out the inference process and return in response." + ), + ) + token_ids: list[int] + """The token ids to generate text from.""" + + # features: MultiModalFeatureSpec + # TODO (NickLucche): implement once Renderer work is completed + features: str | None = None + """The processed MM inputs for the model.""" + + sampling_params: SamplingParams + """The sampling parameters for the model.""" + + model: str | None = None + + stream: bool | None = False + stream_options: StreamOptions | None = None + cache_salt: str | None = Field( + default=None, + description=( + "If specified, the prefix cache will be salted with the provided " + "string to prevent an attacker to guess prompts in multi-user " + "environments. The salt should be random, protected from " + "access by 3rd parties, and long enough to be " + "unpredictable (e.g., 43 characters base64-encoded, corresponding " + "to 256 bit)." + ), + ) + priority: int = Field( + default=0, + description=( + "The priority of the request (lower means earlier handling; " + "default: 0). Any priority other than 0 will raise an error " + "if the served model does not use priority scheduling." + ), + ) + kv_transfer_params: dict[str, Any] | None = Field( + default=None, + description="KVTransfer parameters used for disaggregated serving.", + ) + + +class GenerateResponseChoice(BaseModel): + index: int + logprobs: ChatCompletionLogProbs | None = None + # per OpenAI spec this is the default + finish_reason: str | None = "stop" + token_ids: list[int] | None = None + + +class GenerateResponse(BaseModel): + request_id: str = Field( + default_factory=lambda: f"{random_uuid()}", + description=( + "The request_id related to this request. If the caller does " + "not set it, a random_uuid will be generated. This id is used " + "through out the inference process and return in response." + ), + ) + choices: list[GenerateResponseChoice] + + prompt_logprobs: list[dict[int, Logprob] | None] | None = None + + kv_transfer_params: dict[str, Any] | None = Field( + default=None, + description="KVTransfer parameters used for disaggregated serving.", + ) diff --git a/vllm/entrypoints/openai/serving_tokens.py b/vllm/entrypoints/serve/disagg/serving.py similarity index 99% rename from vllm/entrypoints/openai/serving_tokens.py rename to vllm/entrypoints/serve/disagg/serving.py index daa739e41fa0..f5f23fcaa17d 100644 --- a/vllm/entrypoints/openai/serving_tokens.py +++ b/vllm/entrypoints/serve/disagg/serving.py @@ -1,5 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project + + import asyncio import time from collections.abc import AsyncGenerator diff --git a/vllm/entrypoints/serve/elastic_ep/api_router.py b/vllm/entrypoints/serve/elastic_ep/api_router.py index b7eea3a56a1c..f7805f9abbdb 100644 --- a/vllm/entrypoints/serve/elastic_ep/api_router.py +++ b/vllm/entrypoints/serve/elastic_ep/api_router.py @@ -13,6 +13,10 @@ from vllm.entrypoints.openai.protocol import ( ErrorResponse, ) +from vllm.entrypoints.serve.elastic_ep.protocol import ( + get_scaling_elastic_ep, + set_scaling_elastic_ep, +) from vllm.logger import init_logger logger = init_logger(__name__) @@ -60,8 +64,7 @@ async def scale_elastic_ep(raw_request: Request): ) # Set scaling flag to prevent new requests - global _scaling_elastic_ep - _scaling_elastic_ep = True + set_scaling_elastic_ep(True) client = engine_client(raw_request) try: await client.scale_elastic_ep(new_data_parallel_size, drain_timeout) @@ -80,9 +83,9 @@ async def scale_elastic_ep(raw_request: Request): logger.error("Scale failed: %s", e) raise HTTPException(status_code=500, detail="Scale failed") from e finally: - _scaling_elastic_ep = False + set_scaling_elastic_ep(False) @router.post("/is_scaling_elastic_ep") async def is_scaling_elastic_ep(raw_request: Request): - return JSONResponse({"is_scaling_elastic_ep": _scaling_elastic_ep}) + return JSONResponse({"is_scaling_elastic_ep": get_scaling_elastic_ep()}) diff --git a/vllm/entrypoints/serve/elastic_ep/protocol.py b/vllm/entrypoints/serve/elastic_ep/protocol.py new file mode 100644 index 000000000000..373897b83cbd --- /dev/null +++ b/vllm/entrypoints/serve/elastic_ep/protocol.py @@ -0,0 +1,49 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +from collections.abc import Awaitable + +from fastapi.responses import JSONResponse +from starlette.types import ASGIApp, Receive, Scope, Send + +# Global variable to track scaling state +_scaling_elastic_ep = None + + +def get_scaling_elastic_ep(): + return _scaling_elastic_ep + + +def set_scaling_elastic_ep(value): + global _scaling_elastic_ep + _scaling_elastic_ep = value + + +class ScalingMiddleware: + """ + Middleware that checks if the model is currently scaling and + returns a 503 Service Unavailable response if it is. + + This middleware applies to all HTTP requests and prevents + processing when the model is in a scaling state. + """ + + def __init__(self, app: ASGIApp) -> None: + self.app = app + + def __call__(self, scope: Scope, receive: Receive, send: Send) -> Awaitable[None]: + if scope["type"] != "http": + return self.app(scope, receive, send) + + # Check global scaling state + if get_scaling_elastic_ep(): + # Return 503 Service Unavailable response + response = JSONResponse( + content={ + "error": "The model is currently scaling. Please try again later." + }, + status_code=503, + ) + return response(scope, receive, send) + + return self.app(scope, receive, send) From fab2ea761eb80694b8ee77690df821b9e62f103a Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Mon, 1 Dec 2025 13:48:46 +0000 Subject: [PATCH 05/14] [Refactor] [1/N] to simplify the vLLM serving architecture Signed-off-by: chaunceyjiang --- vllm/entrypoints/openai/serving_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index 1d89aa011af2..4453f5a12a52 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -74,7 +74,6 @@ ErrorResponse, FunctionCall, FunctionDefinition, - GenerateRequest, GenerateResponse, ResponsesRequest, TokenizeChatRequest, @@ -87,6 +86,7 @@ from vllm.entrypoints.openai.serving_models import OpenAIServingModels from vllm.entrypoints.openai.tool_parsers import ToolParser, ToolParserManager from vllm.entrypoints.renderer import BaseRenderer, CompletionRenderer, RenderConfig +from vllm.entrypoints.serve.disagg.protocol import GenerateRequest from vllm.entrypoints.utils import _validate_truncation_size from vllm.inputs.data import PromptType from vllm.inputs.data import TokensPrompt as EngineTokensPrompt From 2a6023641d02f30ddba754dfd6fa8ea436225a60 Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Mon, 1 Dec 2025 13:51:35 +0000 Subject: [PATCH 06/14] [Refactor] [1/N] to simplify the vLLM serving architecture Signed-off-by: chaunceyjiang --- vllm/entrypoints/openai/serving_engine.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index 4453f5a12a52..67291f45a925 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -74,7 +74,6 @@ ErrorResponse, FunctionCall, FunctionDefinition, - GenerateResponse, ResponsesRequest, TokenizeChatRequest, TokenizeCompletionRequest, @@ -86,7 +85,7 @@ from vllm.entrypoints.openai.serving_models import OpenAIServingModels from vllm.entrypoints.openai.tool_parsers import ToolParser, ToolParserManager from vllm.entrypoints.renderer import BaseRenderer, CompletionRenderer, RenderConfig -from vllm.entrypoints.serve.disagg.protocol import GenerateRequest +from vllm.entrypoints.serve.disagg.protocol import GenerateRequest, GenerateResponse from vllm.entrypoints.utils import _validate_truncation_size from vllm.inputs.data import PromptType from vllm.inputs.data import TokensPrompt as EngineTokensPrompt From e358bd0b3fa363fe9867b6a0f4682b477d82ed54 Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Mon, 1 Dec 2025 14:02:01 +0000 Subject: [PATCH 07/14] [Refactor] [1/N] to simplify the vLLM serving architecture Signed-off-by: chaunceyjiang --- vllm/entrypoints/serve/disagg/serving.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/vllm/entrypoints/serve/disagg/serving.py b/vllm/entrypoints/serve/disagg/serving.py index f5f23fcaa17d..5c1d17156a90 100644 --- a/vllm/entrypoints/serve/disagg/serving.py +++ b/vllm/entrypoints/serve/disagg/serving.py @@ -16,15 +16,17 @@ ChatCompletionLogProbs, ChatCompletionLogProbsContent, ErrorResponse, - GenerateRequest, - GenerateResponse, - GenerateResponseChoice, PromptTokenUsageInfo, RequestResponseMetadata, UsageInfo, ) from vllm.entrypoints.openai.serving_engine import OpenAIServing, clamp_prompt_logprobs from vllm.entrypoints.openai.serving_models import OpenAIServingModels +from vllm.entrypoints.serve.disagg.protocol import ( + GenerateRequest, + GenerateResponse, + GenerateResponseChoice, +) from vllm.inputs.data import TokensPrompt as EngineTokensPrompt from vllm.logger import init_logger from vllm.logprobs import Logprob From 39341cdc981778801f6ff0fd2e3fd0b143242a2d Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Mon, 1 Dec 2025 15:21:03 +0000 Subject: [PATCH 08/14] [Refactor] [1/N] to simplify the vLLM serving architecture Signed-off-by: chaunceyjiang --- vllm/entrypoints/api_server.py | 1 + vllm/entrypoints/openai/api_server.py | 6 +- vllm/entrypoints/serve/__init__.py | 103 +++++------ vllm/entrypoints/serve/disagg/api_router.py | 126 +++++++------- .../serve/elastic_ep/api_router.py | 132 ++++++++------- .../elastic_ep/{protocol.py => middleware.py} | 2 +- .../serve/instrumentator/metrics.py | 2 +- vllm/entrypoints/serve/lora/api_router.py | 85 ++++++---- vllm/entrypoints/serve/profile/api_router.py | 50 ++++-- vllm/entrypoints/serve/rl/api_router.py | 98 ----------- .../serve/{rl => rlhf}/__init__.py | 0 vllm/entrypoints/serve/rlhf/api_router.py | 98 +++++++++++ vllm/entrypoints/serve/sleep/api_router.py | 69 ++++---- vllm/entrypoints/serve/tokenize/api_router.py | 160 +++++++++--------- 14 files changed, 479 insertions(+), 453 deletions(-) rename vllm/entrypoints/serve/elastic_ep/{protocol.py => middleware.py} (98%) delete mode 100644 vllm/entrypoints/serve/rl/api_router.py rename vllm/entrypoints/serve/{rl => rlhf}/__init__.py (100%) create mode 100644 vllm/entrypoints/serve/rlhf/api_router.py diff --git a/vllm/entrypoints/api_server.py b/vllm/entrypoints/api_server.py index 154cdeb42a3e..b59f7120551e 100644 --- a/vllm/entrypoints/api_server.py +++ b/vllm/entrypoints/api_server.py @@ -118,6 +118,7 @@ async def init_app( ) ) app.state.engine_client = engine + app.state.args = args return app diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 084a58c1b3f4..4033b4be4dff 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -79,7 +79,7 @@ from vllm.entrypoints.pooling.pooling.serving import OpenAIServingPooling from vllm.entrypoints.pooling.score.serving import ServingScores from vllm.entrypoints.serve.disagg.serving import ServingTokens -from vllm.entrypoints.serve.elastic_ep.protocol import ( +from vllm.entrypoints.serve.elastic_ep.middleware import ( ScalingMiddleware, ) from vllm.entrypoints.serve.tokenize.protocol import OpenAIServingTokenization @@ -951,10 +951,10 @@ def build_app(args: Namespace) -> FastAPI: ) else: app = FastAPI(lifespan=lifespan) - + app.state.args = args from vllm.entrypoints.serve import register_vllm_serve_api_routers - register_vllm_serve_api_routers(app, args=args) + register_vllm_serve_api_routers(app) from vllm.entrypoints.sagemaker.routes import register_sagemaker_routes diff --git a/vllm/entrypoints/serve/__init__.py b/vllm/entrypoints/serve/__init__.py index fd4f53148266..58b3fa185b46 100644 --- a/vllm/entrypoints/serve/__init__.py +++ b/vllm/entrypoints/serve/__init__.py @@ -1,79 +1,64 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project -from argparse import Namespace from fastapi import FastAPI -from vllm import envs from vllm.logger import init_logger logger = init_logger("vllm.entrypoints.serve") -def register_vllm_serve_api_routers(app: FastAPI, args: Namespace): - if envs.VLLM_ALLOW_RUNTIME_LORA_UPDATING: - logger.warning( - "LoRA dynamic loading & unloading is enabled in the API server. " - "This should ONLY be used for local development!" - ) - from vllm.entrypoints.serve.lora.api_router import router as lora_router +def register_vllm_serve_api_routers(app: FastAPI): + from vllm.entrypoints.serve.lora.api_router import ( + attach_router as attach_lora_router, + ) - app.include_router(lora_router) + attach_lora_router(app) from vllm.entrypoints.serve.elastic_ep.api_router import ( - router as elastic_ep_router, + attach_router as attach_elastic_ep_router, + ) + + attach_elastic_ep_router(app) + + from vllm.entrypoints.serve.profile.api_router import ( + attach_router as attach_profile_router, + ) + + attach_profile_router(app) + + from vllm.entrypoints.serve.sleep.api_router import ( + attach_router as attach_sleep_router, + ) + + attach_sleep_router(app) + + from vllm.entrypoints.serve.tokenize.api_router import ( + attach_router as attach_tokenize_router, + ) + + attach_tokenize_router(app) + + from vllm.entrypoints.serve.disagg.api_router import ( + attach_router as attach_disagg_router, + ) + + attach_disagg_router(app) + + from vllm.entrypoints.serve.rlhf.api_router import ( + attach_router as attach_rlhf_router, ) - app.include_router(elastic_ep_router) - - if envs.VLLM_TORCH_PROFILER_DIR: - logger.warning_once( - "Torch Profiler is enabled in the API server. This should ONLY be " - "used for local development!" - ) - elif envs.VLLM_TORCH_CUDA_PROFILE: - logger.warning_once( - "CUDA Profiler is enabled in the API server. This should ONLY be " - "used for local development!" - ) - if envs.VLLM_TORCH_PROFILER_DIR or envs.VLLM_TORCH_CUDA_PROFILE: - from vllm.entrypoints.serve.profile.api_router import ( - router as profile_router, - ) - - app.include_router(profile_router) - if envs.VLLM_SERVER_DEV_MODE: - logger.warning( - "SECURITY WARNING: Development endpoints are enabled! " - "This should NOT be used in production!" - ) - from vllm.entrypoints.serve.sleep.api_router import router as sleep_router - - app.include_router(sleep_router) - - from vllm.entrypoints.serve.tokenize.api_router import router as tokenize_router - from vllm.entrypoints.serve.tokenize.api_router import tokenizer_info_router - - app.include_router(tokenize_router) - if getattr(args, "enable_tokenizer_info_endpoint", False): - """Conditionally register the tokenizer info endpoint if enabled.""" - app.include_router(tokenizer_info_router) - - from vllm.entrypoints.serve.disagg.api_router import router as disagg_router - - app.include_router(disagg_router) - # Optional endpoints - if getattr(args, "tokens_only", False): - """Conditionally register the disagg abort endpoint if enabled.""" - from vllm.entrypoints.serve.disagg.api_router import abort_router - - app.include_router(abort_router) + attach_rlhf_router(app) + + from vllm.entrypoints.serve.instrumentator.metrics import ( + attach_router as attach_metrics_router, + ) + + attach_metrics_router(app) + from vllm.entrypoints.serve.instrumentator.health import ( router as health_router, ) app.include_router(health_router) - - from vllm.entrypoints.serve.instrumentator.metrics import mount_metrics - - mount_metrics(app) diff --git a/vllm/entrypoints/serve/disagg/api_router.py b/vllm/entrypoints/serve/disagg/api_router.py index 7548f0d56b9d..eb3ee6f3b283 100644 --- a/vllm/entrypoints/serve/disagg/api_router.py +++ b/vllm/entrypoints/serve/disagg/api_router.py @@ -6,7 +6,7 @@ import json from http import HTTPStatus -from fastapi import APIRouter, Depends, HTTPException, Request, Response +from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request, Response from fastapi.responses import JSONResponse, StreamingResponse from vllm.engine.protocol import EngineClient @@ -31,10 +31,6 @@ logger = init_logger(__name__) -router = APIRouter() -abort_router = APIRouter() - - def tokenization(request: Request) -> OpenAIServingTokenization: return request.app.state.openai_serving_tokenization @@ -47,60 +43,66 @@ def engine_client(request: Request) -> EngineClient: return request.app.state.engine_client -@router.post( - "/inference/v1/generate", - dependencies=[Depends(validate_json_request)], - responses={ - HTTPStatus.OK.value: {"content": {"text/event-stream": {}}}, - HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, - HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, - HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, - }, -) -@with_cancellation -@load_aware_call -async def generate(request: GenerateRequest, raw_request: Request): - handler = generate_tokens(raw_request) - if handler is None: - return tokenization(raw_request).create_error_response( - message="The model does not support generate tokens API" - ) - try: - generator = await handler.serve_tokens(request, raw_request) - except Exception as e: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) - ) from e - if isinstance(generator, ErrorResponse): - return JSONResponse( - content=generator.model_dump(), status_code=generator.error.code - ) - - elif isinstance(generator, GenerateResponse): - return JSONResponse(content=generator.model_dump()) - - return StreamingResponse(content=generator, media_type="text/event-stream") - - -@abort_router.post("/abort_requests") -async def abort_requests(raw_request: Request): - """ - Abort one or more requests. To be used in a - Disaggregated Everything setup. - """ - try: - body = await raw_request.json() - except json.JSONDecodeError as e: - raise HTTPException( - status_code=HTTPStatus.BAD_REQUEST.value, - detail=f"JSON decode error: {e}", - ) from e - request_ids = body.get("request_ids") - if request_ids is None: - raise HTTPException( - status_code=HTTPStatus.BAD_REQUEST.value, - detail="Missing 'request_ids' in request body", - ) - # Abort requests in background - asyncio.create_task(engine_client(raw_request).abort(request_ids)) - return Response(status_code=200) +def attach_router(app: FastAPI): + router = APIRouter() + + @router.post( + "/inference/v1/generate", + dependencies=[Depends(validate_json_request)], + responses={ + HTTPStatus.OK.value: {"content": {"text/event-stream": {}}}, + HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, + HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, + HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, + }, + ) + @with_cancellation + @load_aware_call + async def generate(request: GenerateRequest, raw_request: Request): + handler = generate_tokens(raw_request) + if handler is None: + return tokenization(raw_request).create_error_response( + message="The model does not support generate tokens API" + ) + try: + generator = await handler.serve_tokens(request, raw_request) + except Exception as e: + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) + ) from e + if isinstance(generator, ErrorResponse): + return JSONResponse( + content=generator.model_dump(), status_code=generator.error.code + ) + + elif isinstance(generator, GenerateResponse): + return JSONResponse(content=generator.model_dump()) + + return StreamingResponse(content=generator, media_type="text/event-stream") + + if getattr(app.state.args, "tokens_only", False): + + @router.post("/abort_requests") + async def abort_requests(raw_request: Request): + """ + Abort one or more requests. To be used in a + Disaggregated Everything setup. + """ + try: + body = await raw_request.json() + except json.JSONDecodeError as e: + raise HTTPException( + status_code=HTTPStatus.BAD_REQUEST.value, + detail=f"JSON decode error: {e}", + ) from e + request_ids = body.get("request_ids") + if request_ids is None: + raise HTTPException( + status_code=HTTPStatus.BAD_REQUEST.value, + detail="Missing 'request_ids' in request body", + ) + # Abort requests in background + asyncio.create_task(engine_client(raw_request).abort(request_ids)) + return Response(status_code=200) + + app.include_router(router) diff --git a/vllm/entrypoints/serve/elastic_ep/api_router.py b/vllm/entrypoints/serve/elastic_ep/api_router.py index f7805f9abbdb..e02e28a85e1c 100644 --- a/vllm/entrypoints/serve/elastic_ep/api_router.py +++ b/vllm/entrypoints/serve/elastic_ep/api_router.py @@ -5,7 +5,7 @@ import json from http import HTTPStatus -from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request from fastapi.responses import JSONResponse from vllm.engine.protocol import EngineClient @@ -13,7 +13,7 @@ from vllm.entrypoints.openai.protocol import ( ErrorResponse, ) -from vllm.entrypoints.serve.elastic_ep.protocol import ( +from vllm.entrypoints.serve.elastic_ep.middleware import ( get_scaling_elastic_ep, set_scaling_elastic_ep, ) @@ -22,70 +22,72 @@ logger = init_logger(__name__) -router = APIRouter() - - def engine_client(request: Request) -> EngineClient: return request.app.state.engine_client -@router.post( - "/scale_elastic_ep", - dependencies=[Depends(validate_json_request)], - responses={ - HTTPStatus.OK.value: {"model": dict}, - HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, - HTTPStatus.REQUEST_TIMEOUT.value: {"model": ErrorResponse}, - HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, - }, -) -async def scale_elastic_ep(raw_request: Request): - try: - body = await raw_request.json() - except json.JSONDecodeError as e: - raise HTTPException(status_code=400, detail="Invalid JSON format") from e # noqa: B904 - - new_data_parallel_size = body.get("new_data_parallel_size") - drain_timeout = body.get("drain_timeout", 120) # Default 2 minutes - - if new_data_parallel_size is None: - raise HTTPException( - status_code=400, detail="new_data_parallel_size is required" - ) - - if not isinstance(new_data_parallel_size, int) or new_data_parallel_size <= 0: - raise HTTPException( - status_code=400, detail="new_data_parallel_size must be a positive integer" - ) - - if not isinstance(drain_timeout, int) or drain_timeout <= 0: - raise HTTPException( - status_code=400, detail="drain_timeout must be a positive integer" - ) - - # Set scaling flag to prevent new requests - set_scaling_elastic_ep(True) - client = engine_client(raw_request) - try: - await client.scale_elastic_ep(new_data_parallel_size, drain_timeout) - return JSONResponse( - { - "message": f"Scaled to {new_data_parallel_size} data parallel engines", - } - ) - except TimeoutError as e: - raise HTTPException( - status_code=408, - detail="Scale failed due to request drain timeout " - f"after {drain_timeout} seconds", - ) from e - except Exception as e: - logger.error("Scale failed: %s", e) - raise HTTPException(status_code=500, detail="Scale failed") from e - finally: - set_scaling_elastic_ep(False) - - -@router.post("/is_scaling_elastic_ep") -async def is_scaling_elastic_ep(raw_request: Request): - return JSONResponse({"is_scaling_elastic_ep": get_scaling_elastic_ep()}) +def attach_router(app: FastAPI): + router = APIRouter() + + @router.post( + "/scale_elastic_ep", + dependencies=[Depends(validate_json_request)], + responses={ + HTTPStatus.OK.value: {"model": dict}, + HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, + HTTPStatus.REQUEST_TIMEOUT.value: {"model": ErrorResponse}, + HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, + }, + ) + async def scale_elastic_ep(raw_request: Request): + try: + body = await raw_request.json() + except json.JSONDecodeError as e: + raise HTTPException(status_code=400, detail="Invalid JSON format") from e # noqa: B904 + + new_data_parallel_size = body.get("new_data_parallel_size") + drain_timeout = body.get("drain_timeout", 120) # Default 2 minutes + + if new_data_parallel_size is None: + raise HTTPException( + status_code=400, detail="new_data_parallel_size is required" + ) + + if not isinstance(new_data_parallel_size, int) or new_data_parallel_size <= 0: + raise HTTPException( + status_code=400, + detail="new_data_parallel_size must be a positive integer", + ) + + if not isinstance(drain_timeout, int) or drain_timeout <= 0: + raise HTTPException( + status_code=400, detail="drain_timeout must be a positive integer" + ) + + # Set scaling flag to prevent new requests + set_scaling_elastic_ep(True) + client = engine_client(raw_request) + try: + await client.scale_elastic_ep(new_data_parallel_size, drain_timeout) + return JSONResponse( + { + "message": f"Scaled to {new_data_parallel_size} data parallel engines", + } + ) + except TimeoutError as e: + raise HTTPException( + status_code=408, + detail="Scale failed due to request drain timeout " + f"after {drain_timeout} seconds", + ) from e + except Exception as e: + logger.error("Scale failed: %s", e) + raise HTTPException(status_code=500, detail="Scale failed") from e + finally: + set_scaling_elastic_ep(False) + + @router.post("/is_scaling_elastic_ep") + async def is_scaling_elastic_ep(raw_request: Request): + return JSONResponse({"is_scaling_elastic_ep": get_scaling_elastic_ep()}) + + app.include_router(router) diff --git a/vllm/entrypoints/serve/elastic_ep/protocol.py b/vllm/entrypoints/serve/elastic_ep/middleware.py similarity index 98% rename from vllm/entrypoints/serve/elastic_ep/protocol.py rename to vllm/entrypoints/serve/elastic_ep/middleware.py index 373897b83cbd..23f45eafeaa0 100644 --- a/vllm/entrypoints/serve/elastic_ep/protocol.py +++ b/vllm/entrypoints/serve/elastic_ep/middleware.py @@ -7,7 +7,7 @@ from starlette.types import ASGIApp, Receive, Scope, Send # Global variable to track scaling state -_scaling_elastic_ep = None +_scaling_elastic_ep = False def get_scaling_elastic_ep(): diff --git a/vllm/entrypoints/serve/instrumentator/metrics.py b/vllm/entrypoints/serve/instrumentator/metrics.py index 396e859e1e09..efe0c63a9071 100644 --- a/vllm/entrypoints/serve/instrumentator/metrics.py +++ b/vllm/entrypoints/serve/instrumentator/metrics.py @@ -17,7 +17,7 @@ class PrometheusResponse(Response): media_type = prometheus_client.CONTENT_TYPE_LATEST -def mount_metrics(app: FastAPI): +def attach_router(app: FastAPI): """Mount prometheus metrics to a FastAPI app.""" registry = get_prometheus_registry() diff --git a/vllm/entrypoints/serve/lora/api_router.py b/vllm/entrypoints/serve/lora/api_router.py index 5cb39d36ebe5..4fc79b579532 100644 --- a/vllm/entrypoints/serve/lora/api_router.py +++ b/vllm/entrypoints/serve/lora/api_router.py @@ -3,9 +3,10 @@ import model_hosting_container_standards.sagemaker as sagemaker_standards -from fastapi import APIRouter, Depends, Request +from fastapi import APIRouter, Depends, FastAPI, Request from fastapi.responses import JSONResponse, Response +from vllm import envs from vllm.entrypoints.openai.api_server import models, validate_json_request from vllm.entrypoints.openai.protocol import ( ErrorResponse, @@ -18,39 +19,53 @@ logger = init_logger(__name__) -router = APIRouter() +def attach_router(app: FastAPI): + if not envs.VLLM_ALLOW_RUNTIME_LORA_UPDATING: + """If LoRA dynamic loading & unloading is not enabled, do nothing.""" + return + router = APIRouter() + logger.warning( + "LoRA dynamic loading & unloading is enabled in the API server. " + "This should ONLY be used for local development!" + ) -@sagemaker_standards.register_load_adapter_handler( - request_shape={ - "lora_name": "body.name", - "lora_path": "body.src", - }, -) -@router.post("/v1/load_lora_adapter", dependencies=[Depends(validate_json_request)]) -async def load_lora_adapter(request: LoadLoRAAdapterRequest, raw_request: Request): - handler: OpenAIServingModels = models(raw_request) - response = await handler.load_lora_adapter(request) - if isinstance(response, ErrorResponse): - return JSONResponse( - content=response.model_dump(), status_code=response.error.code - ) - - return Response(status_code=200, content=response) - - -@sagemaker_standards.register_unload_adapter_handler( - request_shape={ - "lora_name": "path_params.adapter_name", - } -) -@router.post("/v1/unload_lora_adapter", dependencies=[Depends(validate_json_request)]) -async def unload_lora_adapter(request: UnloadLoRAAdapterRequest, raw_request: Request): - handler: OpenAIServingModels = models(raw_request) - response = await handler.unload_lora_adapter(request) - if isinstance(response, ErrorResponse): - return JSONResponse( - content=response.model_dump(), status_code=response.error.code - ) - - return Response(status_code=200, content=response) + @sagemaker_standards.register_load_adapter_handler( + request_shape={ + "lora_name": "body.name", + "lora_path": "body.src", + }, + ) + @router.post("/v1/load_lora_adapter", dependencies=[Depends(validate_json_request)]) + async def load_lora_adapter(request: LoadLoRAAdapterRequest, raw_request: Request): + handler: OpenAIServingModels = models(raw_request) + response = await handler.load_lora_adapter(request) + if isinstance(response, ErrorResponse): + return JSONResponse( + content=response.model_dump(), status_code=response.error.code + ) + + return Response(status_code=200, content=response) + + @sagemaker_standards.register_unload_adapter_handler( + request_shape={ + "lora_name": "path_params.adapter_name", + } + ) + @router.post( + "/v1/unload_lora_adapter", dependencies=[Depends(validate_json_request)] + ) + async def unload_lora_adapter( + request: UnloadLoRAAdapterRequest, raw_request: Request + ): + handler: OpenAIServingModels = models(raw_request) + response = await handler.unload_lora_adapter(request) + if isinstance(response, ErrorResponse): + return JSONResponse( + content=response.model_dump(), status_code=response.error.code + ) + + return Response(status_code=200, content=response) + + # register the router + app.include_router(router) diff --git a/vllm/entrypoints/serve/profile/api_router.py b/vllm/entrypoints/serve/profile/api_router.py index 9d69e232c562..f47926f8b212 100644 --- a/vllm/entrypoints/serve/profile/api_router.py +++ b/vllm/entrypoints/serve/profile/api_router.py @@ -2,33 +2,47 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project -from fastapi import APIRouter, Request +from fastapi import APIRouter, FastAPI, Request from fastapi.responses import Response +import vllm.envs as envs from vllm.engine.protocol import EngineClient from vllm.logger import init_logger logger = init_logger(__name__) -router = APIRouter() - - def engine_client(request: Request) -> EngineClient: return request.app.state.engine_client -@router.post("/start_profile") -async def start_profile(raw_request: Request): - logger.info("Starting profiler...") - await engine_client(raw_request).start_profile() - logger.info("Profiler started.") - return Response(status_code=200) - - -@router.post("/stop_profile") -async def stop_profile(raw_request: Request): - logger.info("Stopping profiler...") - await engine_client(raw_request).stop_profile() - logger.info("Profiler stopped.") - return Response(status_code=200) +def attach_router(app: FastAPI): + router = APIRouter() + + if envs.VLLM_TORCH_PROFILER_DIR: + logger.warning_once( + "Torch Profiler is enabled in the API server. This should ONLY be " + "used for local development!" + ) + elif envs.VLLM_TORCH_CUDA_PROFILE: + logger.warning_once( + "CUDA Profiler is enabled in the API server. This should ONLY be " + "used for local development!" + ) + if envs.VLLM_TORCH_PROFILER_DIR or envs.VLLM_TORCH_CUDA_PROFILE: + + @router.post("/start_profile") + async def start_profile(raw_request: Request): + logger.info("Starting profiler...") + await engine_client(raw_request).start_profile() + logger.info("Profiler started.") + return Response(status_code=200) + + @router.post("/stop_profile") + async def stop_profile(raw_request: Request): + logger.info("Stopping profiler...") + await engine_client(raw_request).stop_profile() + logger.info("Profiler stopped.") + return Response(status_code=200) + + app.include_router(router) diff --git a/vllm/entrypoints/serve/rl/api_router.py b/vllm/entrypoints/serve/rl/api_router.py deleted file mode 100644 index c5c011d97bfa..000000000000 --- a/vllm/entrypoints/serve/rl/api_router.py +++ /dev/null @@ -1,98 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project - - -from http import HTTPStatus - -from fastapi import APIRouter, Query, Request -from fastapi.responses import JSONResponse - -from vllm.engine.protocol import EngineClient -from vllm.logger import init_logger - -logger = init_logger(__name__) - - -def engine_client(request: Request) -> EngineClient: - return request.app.state.engine_client - - -router = APIRouter() - - -@router.post("/pause") -async def pause_generation( - raw_request: Request, - wait_for_inflight_requests: bool = Query(False), - clear_cache: bool = Query(True), -) -> JSONResponse: - """Pause generation requests to allow weight updates. - - Args: - wait_for_inflight_requests: When ``True`` waits for in-flight - requests to finish before pausing. When ``False`` (default), - aborts any in-flight requests immediately. - clear_cache: Whether to clear KV/prefix caches after draining. - """ - - engine = engine_client(raw_request) - - try: - await engine.pause_generation( - wait_for_inflight_requests=wait_for_inflight_requests, - clear_cache=clear_cache, - ) - return JSONResponse( - content={"status": "paused"}, - status_code=HTTPStatus.OK.value, - ) - - except ValueError as err: - return JSONResponse( - content={"error": str(err)}, - status_code=HTTPStatus.BAD_REQUEST.value, - ) - except Exception as err: # pragma: no cover - defensive - logger.exception("Failed to pause generation") - return JSONResponse( - content={"error": f"Failed to pause generation: {err}"}, - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, - ) - - -@router.post("/resume") -async def resume_generation(raw_request: Request) -> JSONResponse: - """Resume generation after a pause.""" - - engine = engine_client(raw_request) - - try: - await engine.resume_generation() - return JSONResponse( - content={"status": "resumed"}, - status_code=HTTPStatus.OK.value, - ) - except Exception as err: # pragma: no cover - defensive - logger.exception("Failed to resume generation") - return JSONResponse( - content={"error": f"Failed to resume generation: {err}"}, - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, - ) - - -@router.get("/is_paused") -async def is_paused(raw_request: Request) -> JSONResponse: - """Return the current pause status.""" - - engine = engine_client(raw_request) - - try: - paused = await engine.is_paused() - except Exception as err: # pragma: no cover - defensive - logger.exception("Failed to fetch pause status") - return JSONResponse( - content={"error": f"Failed to fetch pause status: {err}"}, - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, - ) - - return JSONResponse(content={"is_paused": paused}) diff --git a/vllm/entrypoints/serve/rl/__init__.py b/vllm/entrypoints/serve/rlhf/__init__.py similarity index 100% rename from vllm/entrypoints/serve/rl/__init__.py rename to vllm/entrypoints/serve/rlhf/__init__.py diff --git a/vllm/entrypoints/serve/rlhf/api_router.py b/vllm/entrypoints/serve/rlhf/api_router.py new file mode 100644 index 000000000000..d7f016cf543b --- /dev/null +++ b/vllm/entrypoints/serve/rlhf/api_router.py @@ -0,0 +1,98 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + + +from http import HTTPStatus + +from fastapi import APIRouter, FastAPI, Query, Request +from fastapi.responses import JSONResponse + +from vllm.engine.protocol import EngineClient +from vllm.logger import init_logger + +logger = init_logger(__name__) + + +def engine_client(request: Request) -> EngineClient: + return request.app.state.engine_client + + +def attach_router(app: FastAPI): + router = APIRouter() + + @router.post("/pause") + async def pause_generation( + raw_request: Request, + wait_for_inflight_requests: bool = Query(False), + clear_cache: bool = Query(True), + ) -> JSONResponse: + """Pause generation requests to allow weight updates. + + Args: + wait_for_inflight_requests: When ``True`` waits for in-flight + requests to finish before pausing. When ``False`` (default), + aborts any in-flight requests immediately. + clear_cache: Whether to clear KV/prefix caches after draining. + """ + + engine = engine_client(raw_request) + + try: + await engine.pause_generation( + wait_for_inflight_requests=wait_for_inflight_requests, + clear_cache=clear_cache, + ) + return JSONResponse( + content={"status": "paused"}, + status_code=HTTPStatus.OK.value, + ) + + except ValueError as err: + return JSONResponse( + content={"error": str(err)}, + status_code=HTTPStatus.BAD_REQUEST.value, + ) + except Exception as err: # pragma: no cover - defensive + logger.exception("Failed to pause generation") + return JSONResponse( + content={"error": f"Failed to pause generation: {err}"}, + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, + ) + + @router.post("/resume") + async def resume_generation(raw_request: Request) -> JSONResponse: + """Resume generation after a pause.""" + + engine = engine_client(raw_request) + + try: + await engine.resume_generation() + return JSONResponse( + content={"status": "resumed"}, + status_code=HTTPStatus.OK.value, + ) + except Exception as err: # pragma: no cover - defensive + logger.exception("Failed to resume generation") + return JSONResponse( + content={"error": f"Failed to resume generation: {err}"}, + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, + ) + + @router.get("/is_paused") + async def is_paused(raw_request: Request) -> JSONResponse: + """Return the current pause status.""" + + engine = engine_client(raw_request) + + try: + paused = await engine.is_paused() + except Exception as err: # pragma: no cover - defensive + logger.exception("Failed to fetch pause status") + return JSONResponse( + content={"error": f"Failed to fetch pause status: {err}"}, + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, + ) + + return JSONResponse(content={"is_paused": paused}) + + app.include_router(router) diff --git a/vllm/entrypoints/serve/sleep/api_router.py b/vllm/entrypoints/serve/sleep/api_router.py index b94b2c00e468..624d060973b3 100644 --- a/vllm/entrypoints/serve/sleep/api_router.py +++ b/vllm/entrypoints/serve/sleep/api_router.py @@ -2,47 +2,52 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project -from fastapi import APIRouter, Request +from fastapi import APIRouter, FastAPI, Request from fastapi.responses import JSONResponse, Response +import vllm.envs as envs from vllm.engine.protocol import EngineClient from vllm.logger import init_logger logger = init_logger(__name__) -router = APIRouter() - - def engine_client(request: Request) -> EngineClient: return request.app.state.engine_client -@router.post("/sleep") -async def sleep(raw_request: Request): - # get POST params - level = raw_request.query_params.get("level", "1") - await engine_client(raw_request).sleep(int(level)) - # FIXME: in v0 with frontend multiprocessing, the sleep command - # is sent but does not finish yet when we return a response. - return Response(status_code=200) - - -@router.post("/wake_up") -async def wake_up(raw_request: Request): - tags = raw_request.query_params.getlist("tags") - if tags == []: - # set to None to wake up all tags if no tags are provided - tags = None - logger.info("wake up the engine with tags: %s", tags) - await engine_client(raw_request).wake_up(tags) - # FIXME: in v0 with frontend multiprocessing, the wake-up command - # is sent but does not finish yet when we return a response. - return Response(status_code=200) - - -@router.get("/is_sleeping") -async def is_sleeping(raw_request: Request): - logger.info("check whether the engine is sleeping") - is_sleeping = await engine_client(raw_request).is_sleeping() - return JSONResponse(content={"is_sleeping": is_sleeping}) +def attach_router(app: FastAPI): + if not envs.VLLM_SERVER_DEV_MODE: + return + logger.warning( + "SECURITY WARNING: Development endpoints are enabled! " + "This should NOT be used in production!" + ) + router = APIRouter() + + @router.post("/sleep") + async def sleep(raw_request: Request): + # get POST params + level = raw_request.query_params.get("level", "1") + await engine_client(raw_request).sleep(int(level)) + # FIXME: in v0 with frontend multiprocessing, the sleep command + # is sent but does not finish yet when we return a response. + return Response(status_code=200) + + @router.post("/wake_up") + async def wake_up(raw_request: Request): + tags = raw_request.query_params.getlist("tags") + if tags == []: + # set to None to wake up all tags if no tags are provided + tags = None + logger.info("wake up the engine with tags: %s", tags) + await engine_client(raw_request).wake_up(tags) + # FIXME: in v0 with frontend multiprocessing, the wake-up command + # is sent but does not finish yet when we return a response. + return Response(status_code=200) + + @router.get("/is_sleeping") + async def is_sleeping(raw_request: Request): + logger.info("check whether the engine is sleeping") + is_sleeping = await engine_client(raw_request).is_sleeping() + return JSONResponse(content={"is_sleeping": is_sleeping}) diff --git a/vllm/entrypoints/serve/tokenize/api_router.py b/vllm/entrypoints/serve/tokenize/api_router.py index 2082691cf162..921668db4b7e 100644 --- a/vllm/entrypoints/serve/tokenize/api_router.py +++ b/vllm/entrypoints/serve/tokenize/api_router.py @@ -4,7 +4,7 @@ from http import HTTPStatus -from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse from typing_extensions import assert_never @@ -26,86 +26,88 @@ logger = init_logger(__name__) -router = APIRouter() -tokenizer_info_router = APIRouter() - - def tokenization(request: Request) -> OpenAIServingTokenization: return request.app.state.openai_serving_tokenization -@router.post( - "/tokenize", - dependencies=[Depends(validate_json_request)], - responses={ - HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, - HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, - HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, - HTTPStatus.NOT_IMPLEMENTED.value: {"model": ErrorResponse}, - }, -) -@with_cancellation -async def tokenize(request: TokenizeRequest, raw_request: Request): - handler = tokenization(raw_request) - - try: - generator = await handler.create_tokenize(request, raw_request) - except NotImplementedError as e: - raise HTTPException( - status_code=HTTPStatus.NOT_IMPLEMENTED.value, detail=str(e) - ) from e - except Exception as e: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) - ) from e - - if isinstance(generator, ErrorResponse): - return JSONResponse( - content=generator.model_dump(), status_code=generator.error.code - ) - elif isinstance(generator, TokenizeResponse): - return JSONResponse(content=generator.model_dump()) - - assert_never(generator) - - -@router.post( - "/detokenize", - dependencies=[Depends(validate_json_request)], - responses={ - HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, - HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, - HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, - }, -) -@with_cancellation -async def detokenize(request: DetokenizeRequest, raw_request: Request): - handler = tokenization(raw_request) - - try: - generator = await handler.create_detokenize(request, raw_request) - except OverflowError as e: - raise RequestValidationError(errors=[str(e)]) from e - except Exception as e: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) - ) from e - - if isinstance(generator, ErrorResponse): - return JSONResponse( - content=generator.model_dump(), status_code=generator.error.code - ) - elif isinstance(generator, DetokenizeResponse): - return JSONResponse(content=generator.model_dump()) - - assert_never(generator) - - -@tokenizer_info_router.get("/tokenizer_info") -async def get_tokenizer_info(raw_request: Request): - """Get comprehensive tokenizer information.""" - result = await tokenization(raw_request).get_tokenizer_info() - return JSONResponse( - content=result.model_dump(), - status_code=result.error.code if isinstance(result, ErrorResponse) else 200, +def attach_router(app: FastAPI): + router = APIRouter() + + @router.post( + "/tokenize", + dependencies=[Depends(validate_json_request)], + responses={ + HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, + HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, + HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, + HTTPStatus.NOT_IMPLEMENTED.value: {"model": ErrorResponse}, + }, + ) + @with_cancellation + async def tokenize(request: TokenizeRequest, raw_request: Request): + handler = tokenization(raw_request) + + try: + generator = await handler.create_tokenize(request, raw_request) + except NotImplementedError as e: + raise HTTPException( + status_code=HTTPStatus.NOT_IMPLEMENTED.value, detail=str(e) + ) from e + except Exception as e: + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) + ) from e + + if isinstance(generator, ErrorResponse): + return JSONResponse( + content=generator.model_dump(), status_code=generator.error.code + ) + elif isinstance(generator, TokenizeResponse): + return JSONResponse(content=generator.model_dump()) + + assert_never(generator) + + @router.post( + "/detokenize", + dependencies=[Depends(validate_json_request)], + responses={ + HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, + HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, + HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, + }, ) + @with_cancellation + async def detokenize(request: DetokenizeRequest, raw_request: Request): + handler = tokenization(raw_request) + + try: + generator = await handler.create_detokenize(request, raw_request) + except OverflowError as e: + raise RequestValidationError(errors=[str(e)]) from e + except Exception as e: + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) + ) from e + + if isinstance(generator, ErrorResponse): + return JSONResponse( + content=generator.model_dump(), status_code=generator.error.code + ) + elif isinstance(generator, DetokenizeResponse): + return JSONResponse(content=generator.model_dump()) + + assert_never(generator) + + if getattr(app.state.args, "enable_tokenizer_info_endpoint", False): + """Conditionally register the tokenizer info endpoint if enabled.""" + + @router.get("/tokenizer_info") + async def get_tokenizer_info(raw_request: Request): + """Get comprehensive tokenizer information.""" + result = await tokenization(raw_request).get_tokenizer_info() + return JSONResponse( + content=result.model_dump(), + status_code=result.error.code + if isinstance(result, ErrorResponse) + else 200, + ) From 063e5a89d54d5e17ded57593252c998889f9eb40 Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Mon, 1 Dec 2025 15:28:15 +0000 Subject: [PATCH 09/14] [Refactor] [1/N] to simplify the vLLM serving architecture Signed-off-by: chaunceyjiang --- vllm/entrypoints/serve/__init__.py | 4 +- vllm/entrypoints/serve/disagg/api_router.py | 74 ++++---- .../serve/elastic_ep/api_router.py | 129 +++++++------- .../serve/instrumentator/health.py | 4 + vllm/entrypoints/serve/lora/api_router.py | 76 ++++----- vllm/entrypoints/serve/profile/api_router.py | 37 ++-- vllm/entrypoints/serve/rlhf/api_router.py | 158 +++++++++--------- vllm/entrypoints/serve/sleep/api_router.py | 63 +++---- vllm/entrypoints/serve/tokenize/api_router.py | 137 +++++++-------- 9 files changed, 352 insertions(+), 330 deletions(-) diff --git a/vllm/entrypoints/serve/__init__.py b/vllm/entrypoints/serve/__init__.py index 58b3fa185b46..3a6f59248bee 100644 --- a/vllm/entrypoints/serve/__init__.py +++ b/vllm/entrypoints/serve/__init__.py @@ -58,7 +58,7 @@ def register_vllm_serve_api_routers(app: FastAPI): attach_metrics_router(app) from vllm.entrypoints.serve.instrumentator.health import ( - router as health_router, + attach_router as attach_health_router, ) - app.include_router(health_router) + attach_health_router(app) diff --git a/vllm/entrypoints/serve/disagg/api_router.py b/vllm/entrypoints/serve/disagg/api_router.py index eb3ee6f3b283..2a48a764252d 100644 --- a/vllm/entrypoints/serve/disagg/api_router.py +++ b/vllm/entrypoints/serve/disagg/api_router.py @@ -43,43 +43,45 @@ def engine_client(request: Request) -> EngineClient: return request.app.state.engine_client -def attach_router(app: FastAPI): - router = APIRouter() - - @router.post( - "/inference/v1/generate", - dependencies=[Depends(validate_json_request)], - responses={ - HTTPStatus.OK.value: {"content": {"text/event-stream": {}}}, - HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, - HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, - HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, - }, - ) - @with_cancellation - @load_aware_call - async def generate(request: GenerateRequest, raw_request: Request): - handler = generate_tokens(raw_request) - if handler is None: - return tokenization(raw_request).create_error_response( - message="The model does not support generate tokens API" - ) - try: - generator = await handler.serve_tokens(request, raw_request) - except Exception as e: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) - ) from e - if isinstance(generator, ErrorResponse): - return JSONResponse( - content=generator.model_dump(), status_code=generator.error.code - ) - - elif isinstance(generator, GenerateResponse): - return JSONResponse(content=generator.model_dump()) - - return StreamingResponse(content=generator, media_type="text/event-stream") +router = APIRouter() + + +@router.post( + "/inference/v1/generate", + dependencies=[Depends(validate_json_request)], + responses={ + HTTPStatus.OK.value: {"content": {"text/event-stream": {}}}, + HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, + HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, + HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, + }, +) +@with_cancellation +@load_aware_call +async def generate(request: GenerateRequest, raw_request: Request): + handler = generate_tokens(raw_request) + if handler is None: + return tokenization(raw_request).create_error_response( + message="The model does not support generate tokens API" + ) + try: + generator = await handler.serve_tokens(request, raw_request) + except Exception as e: + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) + ) from e + if isinstance(generator, ErrorResponse): + return JSONResponse( + content=generator.model_dump(), status_code=generator.error.code + ) + + elif isinstance(generator, GenerateResponse): + return JSONResponse(content=generator.model_dump()) + + return StreamingResponse(content=generator, media_type="text/event-stream") + +def attach_router(app: FastAPI): if getattr(app.state.args, "tokens_only", False): @router.post("/abort_requests") diff --git a/vllm/entrypoints/serve/elastic_ep/api_router.py b/vllm/entrypoints/serve/elastic_ep/api_router.py index e02e28a85e1c..21d5d2e60778 100644 --- a/vllm/entrypoints/serve/elastic_ep/api_router.py +++ b/vllm/entrypoints/serve/elastic_ep/api_router.py @@ -26,68 +26,71 @@ def engine_client(request: Request) -> EngineClient: return request.app.state.engine_client -def attach_router(app: FastAPI): - router = APIRouter() - - @router.post( - "/scale_elastic_ep", - dependencies=[Depends(validate_json_request)], - responses={ - HTTPStatus.OK.value: {"model": dict}, - HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, - HTTPStatus.REQUEST_TIMEOUT.value: {"model": ErrorResponse}, - HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, - }, - ) - async def scale_elastic_ep(raw_request: Request): - try: - body = await raw_request.json() - except json.JSONDecodeError as e: - raise HTTPException(status_code=400, detail="Invalid JSON format") from e # noqa: B904 - - new_data_parallel_size = body.get("new_data_parallel_size") - drain_timeout = body.get("drain_timeout", 120) # Default 2 minutes - - if new_data_parallel_size is None: - raise HTTPException( - status_code=400, detail="new_data_parallel_size is required" - ) - - if not isinstance(new_data_parallel_size, int) or new_data_parallel_size <= 0: - raise HTTPException( - status_code=400, - detail="new_data_parallel_size must be a positive integer", - ) - - if not isinstance(drain_timeout, int) or drain_timeout <= 0: - raise HTTPException( - status_code=400, detail="drain_timeout must be a positive integer" - ) - - # Set scaling flag to prevent new requests - set_scaling_elastic_ep(True) - client = engine_client(raw_request) - try: - await client.scale_elastic_ep(new_data_parallel_size, drain_timeout) - return JSONResponse( - { - "message": f"Scaled to {new_data_parallel_size} data parallel engines", - } - ) - except TimeoutError as e: - raise HTTPException( - status_code=408, - detail="Scale failed due to request drain timeout " - f"after {drain_timeout} seconds", - ) from e - except Exception as e: - logger.error("Scale failed: %s", e) - raise HTTPException(status_code=500, detail="Scale failed") from e - finally: - set_scaling_elastic_ep(False) - - @router.post("/is_scaling_elastic_ep") - async def is_scaling_elastic_ep(raw_request: Request): - return JSONResponse({"is_scaling_elastic_ep": get_scaling_elastic_ep()}) +router = APIRouter() + + +@router.post( + "/scale_elastic_ep", + dependencies=[Depends(validate_json_request)], + responses={ + HTTPStatus.OK.value: {"model": dict}, + HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, + HTTPStatus.REQUEST_TIMEOUT.value: {"model": ErrorResponse}, + HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, + }, +) +async def scale_elastic_ep(raw_request: Request): + try: + body = await raw_request.json() + except json.JSONDecodeError as e: + raise HTTPException(status_code=400, detail="Invalid JSON format") from e # noqa: B904 + + new_data_parallel_size = body.get("new_data_parallel_size") + drain_timeout = body.get("drain_timeout", 120) # Default 2 minutes + + if new_data_parallel_size is None: + raise HTTPException( + status_code=400, detail="new_data_parallel_size is required" + ) + + if not isinstance(new_data_parallel_size, int) or new_data_parallel_size <= 0: + raise HTTPException( + status_code=400, + detail="new_data_parallel_size must be a positive integer", + ) + + if not isinstance(drain_timeout, int) or drain_timeout <= 0: + raise HTTPException( + status_code=400, detail="drain_timeout must be a positive integer" + ) + + # Set scaling flag to prevent new requests + set_scaling_elastic_ep(True) + client = engine_client(raw_request) + try: + await client.scale_elastic_ep(new_data_parallel_size, drain_timeout) + return JSONResponse( + { + "message": f"Scaled to {new_data_parallel_size} data parallel engines", + } + ) + except TimeoutError as e: + raise HTTPException( + status_code=408, + detail="Scale failed due to request drain timeout " + f"after {drain_timeout} seconds", + ) from e + except Exception as e: + logger.error("Scale failed: %s", e) + raise HTTPException(status_code=500, detail="Scale failed") from e + finally: + set_scaling_elastic_ep(False) + + +@router.post("/is_scaling_elastic_ep") +async def is_scaling_elastic_ep(raw_request: Request): + return JSONResponse({"is_scaling_elastic_ep": get_scaling_elastic_ep()}) + +def attach_router(app: FastAPI): app.include_router(router) diff --git a/vllm/entrypoints/serve/instrumentator/health.py b/vllm/entrypoints/serve/instrumentator/health.py index 8b079ce31618..52fa0dbab723 100644 --- a/vllm/entrypoints/serve/instrumentator/health.py +++ b/vllm/entrypoints/serve/instrumentator/health.py @@ -27,3 +27,7 @@ async def health(raw_request: Request) -> Response: return Response(status_code=200) except EngineDeadError: return Response(status_code=503) + + +def attach_router(app): + app.include_router(router) \ No newline at end of file diff --git a/vllm/entrypoints/serve/lora/api_router.py b/vllm/entrypoints/serve/lora/api_router.py index 4fc79b579532..c53e4eec3584 100644 --- a/vllm/entrypoints/serve/lora/api_router.py +++ b/vllm/entrypoints/serve/lora/api_router.py @@ -17,55 +17,51 @@ from vllm.logger import init_logger logger = init_logger(__name__) +router = APIRouter() + + +@sagemaker_standards.register_load_adapter_handler( + request_shape={ + "lora_name": "body.name", + "lora_path": "body.src", + }, +) +@router.post("/v1/load_lora_adapter", dependencies=[Depends(validate_json_request)]) +async def load_lora_adapter(request: LoadLoRAAdapterRequest, raw_request: Request): + handler: OpenAIServingModels = models(raw_request) + response = await handler.load_lora_adapter(request) + if isinstance(response, ErrorResponse): + return JSONResponse( + content=response.model_dump(), status_code=response.error.code + ) + + return Response(status_code=200, content=response) + + +@sagemaker_standards.register_unload_adapter_handler( + request_shape={ + "lora_name": "path_params.adapter_name", + } +) +@router.post("/v1/unload_lora_adapter", dependencies=[Depends(validate_json_request)]) +async def unload_lora_adapter(request: UnloadLoRAAdapterRequest, raw_request: Request): + handler: OpenAIServingModels = models(raw_request) + response = await handler.unload_lora_adapter(request) + if isinstance(response, ErrorResponse): + return JSONResponse( + content=response.model_dump(), status_code=response.error.code + ) + + return Response(status_code=200, content=response) def attach_router(app: FastAPI): if not envs.VLLM_ALLOW_RUNTIME_LORA_UPDATING: """If LoRA dynamic loading & unloading is not enabled, do nothing.""" return - router = APIRouter() - logger.warning( "LoRA dynamic loading & unloading is enabled in the API server. " "This should ONLY be used for local development!" ) - - @sagemaker_standards.register_load_adapter_handler( - request_shape={ - "lora_name": "body.name", - "lora_path": "body.src", - }, - ) - @router.post("/v1/load_lora_adapter", dependencies=[Depends(validate_json_request)]) - async def load_lora_adapter(request: LoadLoRAAdapterRequest, raw_request: Request): - handler: OpenAIServingModels = models(raw_request) - response = await handler.load_lora_adapter(request) - if isinstance(response, ErrorResponse): - return JSONResponse( - content=response.model_dump(), status_code=response.error.code - ) - - return Response(status_code=200, content=response) - - @sagemaker_standards.register_unload_adapter_handler( - request_shape={ - "lora_name": "path_params.adapter_name", - } - ) - @router.post( - "/v1/unload_lora_adapter", dependencies=[Depends(validate_json_request)] - ) - async def unload_lora_adapter( - request: UnloadLoRAAdapterRequest, raw_request: Request - ): - handler: OpenAIServingModels = models(raw_request) - response = await handler.unload_lora_adapter(request) - if isinstance(response, ErrorResponse): - return JSONResponse( - content=response.model_dump(), status_code=response.error.code - ) - - return Response(status_code=200, content=response) - # register the router app.include_router(router) diff --git a/vllm/entrypoints/serve/profile/api_router.py b/vllm/entrypoints/serve/profile/api_router.py index f47926f8b212..166f13764eb3 100644 --- a/vllm/entrypoints/serve/profile/api_router.py +++ b/vllm/entrypoints/serve/profile/api_router.py @@ -11,14 +11,30 @@ logger = init_logger(__name__) +router = APIRouter() + def engine_client(request: Request) -> EngineClient: return request.app.state.engine_client -def attach_router(app: FastAPI): - router = APIRouter() +@router.post("/start_profile") +async def start_profile(raw_request: Request): + logger.info("Starting profiler...") + await engine_client(raw_request).start_profile() + logger.info("Profiler started.") + return Response(status_code=200) + + +@router.post("/stop_profile") +async def stop_profile(raw_request: Request): + logger.info("Stopping profiler...") + await engine_client(raw_request).stop_profile() + logger.info("Profiler stopped.") + return Response(status_code=200) + +def attach_router(app: FastAPI): if envs.VLLM_TORCH_PROFILER_DIR: logger.warning_once( "Torch Profiler is enabled in the API server. This should ONLY be " @@ -30,19 +46,4 @@ def attach_router(app: FastAPI): "used for local development!" ) if envs.VLLM_TORCH_PROFILER_DIR or envs.VLLM_TORCH_CUDA_PROFILE: - - @router.post("/start_profile") - async def start_profile(raw_request: Request): - logger.info("Starting profiler...") - await engine_client(raw_request).start_profile() - logger.info("Profiler started.") - return Response(status_code=200) - - @router.post("/stop_profile") - async def stop_profile(raw_request: Request): - logger.info("Stopping profiler...") - await engine_client(raw_request).stop_profile() - logger.info("Profiler stopped.") - return Response(status_code=200) - - app.include_router(router) + app.include_router(router) diff --git a/vllm/entrypoints/serve/rlhf/api_router.py b/vllm/entrypoints/serve/rlhf/api_router.py index d7f016cf543b..3b37840ae089 100644 --- a/vllm/entrypoints/serve/rlhf/api_router.py +++ b/vllm/entrypoints/serve/rlhf/api_router.py @@ -17,82 +17,86 @@ def engine_client(request: Request) -> EngineClient: return request.app.state.engine_client -def attach_router(app: FastAPI): - router = APIRouter() - - @router.post("/pause") - async def pause_generation( - raw_request: Request, - wait_for_inflight_requests: bool = Query(False), - clear_cache: bool = Query(True), - ) -> JSONResponse: - """Pause generation requests to allow weight updates. - - Args: - wait_for_inflight_requests: When ``True`` waits for in-flight - requests to finish before pausing. When ``False`` (default), - aborts any in-flight requests immediately. - clear_cache: Whether to clear KV/prefix caches after draining. - """ - - engine = engine_client(raw_request) - - try: - await engine.pause_generation( - wait_for_inflight_requests=wait_for_inflight_requests, - clear_cache=clear_cache, - ) - return JSONResponse( - content={"status": "paused"}, - status_code=HTTPStatus.OK.value, - ) - - except ValueError as err: - return JSONResponse( - content={"error": str(err)}, - status_code=HTTPStatus.BAD_REQUEST.value, - ) - except Exception as err: # pragma: no cover - defensive - logger.exception("Failed to pause generation") - return JSONResponse( - content={"error": f"Failed to pause generation: {err}"}, - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, - ) - - @router.post("/resume") - async def resume_generation(raw_request: Request) -> JSONResponse: - """Resume generation after a pause.""" - - engine = engine_client(raw_request) - - try: - await engine.resume_generation() - return JSONResponse( - content={"status": "resumed"}, - status_code=HTTPStatus.OK.value, - ) - except Exception as err: # pragma: no cover - defensive - logger.exception("Failed to resume generation") - return JSONResponse( - content={"error": f"Failed to resume generation: {err}"}, - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, - ) - - @router.get("/is_paused") - async def is_paused(raw_request: Request) -> JSONResponse: - """Return the current pause status.""" - - engine = engine_client(raw_request) - - try: - paused = await engine.is_paused() - except Exception as err: # pragma: no cover - defensive - logger.exception("Failed to fetch pause status") - return JSONResponse( - content={"error": f"Failed to fetch pause status: {err}"}, - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, - ) - - return JSONResponse(content={"is_paused": paused}) +router = APIRouter() + + +@router.post("/pause") +async def pause_generation( + raw_request: Request, + wait_for_inflight_requests: bool = Query(False), + clear_cache: bool = Query(True), +) -> JSONResponse: + """Pause generation requests to allow weight updates. + + Args: + wait_for_inflight_requests: When ``True`` waits for in-flight + requests to finish before pausing. When ``False`` (default), + aborts any in-flight requests immediately. + clear_cache: Whether to clear KV/prefix caches after draining. + """ + + engine = engine_client(raw_request) + + try: + await engine.pause_generation( + wait_for_inflight_requests=wait_for_inflight_requests, + clear_cache=clear_cache, + ) + return JSONResponse( + content={"status": "paused"}, + status_code=HTTPStatus.OK.value, + ) + + except ValueError as err: + return JSONResponse( + content={"error": str(err)}, + status_code=HTTPStatus.BAD_REQUEST.value, + ) + except Exception as err: # pragma: no cover - defensive + logger.exception("Failed to pause generation") + return JSONResponse( + content={"error": f"Failed to pause generation: {err}"}, + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, + ) + + +@router.post("/resume") +async def resume_generation(raw_request: Request) -> JSONResponse: + """Resume generation after a pause.""" + + engine = engine_client(raw_request) + + try: + await engine.resume_generation() + return JSONResponse( + content={"status": "resumed"}, + status_code=HTTPStatus.OK.value, + ) + except Exception as err: # pragma: no cover - defensive + logger.exception("Failed to resume generation") + return JSONResponse( + content={"error": f"Failed to resume generation: {err}"}, + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, + ) + + +@router.get("/is_paused") +async def is_paused(raw_request: Request) -> JSONResponse: + """Return the current pause status.""" + + engine = engine_client(raw_request) + + try: + paused = await engine.is_paused() + except Exception as err: # pragma: no cover - defensive + logger.exception("Failed to fetch pause status") + return JSONResponse( + content={"error": f"Failed to fetch pause status: {err}"}, + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, + ) + + return JSONResponse(content={"is_paused": paused}) + +def attach_router(app: FastAPI): app.include_router(router) diff --git a/vllm/entrypoints/serve/sleep/api_router.py b/vllm/entrypoints/serve/sleep/api_router.py index 624d060973b3..bc01e185315c 100644 --- a/vllm/entrypoints/serve/sleep/api_router.py +++ b/vllm/entrypoints/serve/sleep/api_router.py @@ -16,6 +16,39 @@ def engine_client(request: Request) -> EngineClient: return request.app.state.engine_client +router = APIRouter() + + +@router.post("/sleep") +async def sleep(raw_request: Request): + # get POST params + level = raw_request.query_params.get("level", "1") + await engine_client(raw_request).sleep(int(level)) + # FIXME: in v0 with frontend multiprocessing, the sleep command + # is sent but does not finish yet when we return a response. + return Response(status_code=200) + + +@router.post("/wake_up") +async def wake_up(raw_request: Request): + tags = raw_request.query_params.getlist("tags") + if tags == []: + # set to None to wake up all tags if no tags are provided + tags = None + logger.info("wake up the engine with tags: %s", tags) + await engine_client(raw_request).wake_up(tags) + # FIXME: in v0 with frontend multiprocessing, the wake-up command + # is sent but does not finish yet when we return a response. + return Response(status_code=200) + + +@router.get("/is_sleeping") +async def is_sleeping(raw_request: Request): + logger.info("check whether the engine is sleeping") + is_sleeping = await engine_client(raw_request).is_sleeping() + return JSONResponse(content={"is_sleeping": is_sleeping}) + + def attach_router(app: FastAPI): if not envs.VLLM_SERVER_DEV_MODE: return @@ -23,31 +56,5 @@ def attach_router(app: FastAPI): "SECURITY WARNING: Development endpoints are enabled! " "This should NOT be used in production!" ) - router = APIRouter() - - @router.post("/sleep") - async def sleep(raw_request: Request): - # get POST params - level = raw_request.query_params.get("level", "1") - await engine_client(raw_request).sleep(int(level)) - # FIXME: in v0 with frontend multiprocessing, the sleep command - # is sent but does not finish yet when we return a response. - return Response(status_code=200) - - @router.post("/wake_up") - async def wake_up(raw_request: Request): - tags = raw_request.query_params.getlist("tags") - if tags == []: - # set to None to wake up all tags if no tags are provided - tags = None - logger.info("wake up the engine with tags: %s", tags) - await engine_client(raw_request).wake_up(tags) - # FIXME: in v0 with frontend multiprocessing, the wake-up command - # is sent but does not finish yet when we return a response. - return Response(status_code=200) - - @router.get("/is_sleeping") - async def is_sleeping(raw_request: Request): - logger.info("check whether the engine is sleeping") - is_sleeping = await engine_client(raw_request).is_sleeping() - return JSONResponse(content={"is_sleeping": is_sleeping}) + + app.include_router(router) diff --git a/vllm/entrypoints/serve/tokenize/api_router.py b/vllm/entrypoints/serve/tokenize/api_router.py index 921668db4b7e..3bdbf95e7e09 100644 --- a/vllm/entrypoints/serve/tokenize/api_router.py +++ b/vllm/entrypoints/serve/tokenize/api_router.py @@ -30,74 +30,77 @@ def tokenization(request: Request) -> OpenAIServingTokenization: return request.app.state.openai_serving_tokenization -def attach_router(app: FastAPI): - router = APIRouter() - - @router.post( - "/tokenize", - dependencies=[Depends(validate_json_request)], - responses={ - HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, - HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, - HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, - HTTPStatus.NOT_IMPLEMENTED.value: {"model": ErrorResponse}, - }, - ) - @with_cancellation - async def tokenize(request: TokenizeRequest, raw_request: Request): - handler = tokenization(raw_request) - - try: - generator = await handler.create_tokenize(request, raw_request) - except NotImplementedError as e: - raise HTTPException( - status_code=HTTPStatus.NOT_IMPLEMENTED.value, detail=str(e) - ) from e - except Exception as e: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) - ) from e - - if isinstance(generator, ErrorResponse): - return JSONResponse( - content=generator.model_dump(), status_code=generator.error.code - ) - elif isinstance(generator, TokenizeResponse): - return JSONResponse(content=generator.model_dump()) - - assert_never(generator) - - @router.post( - "/detokenize", - dependencies=[Depends(validate_json_request)], - responses={ - HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, - HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, - HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, - }, - ) - @with_cancellation - async def detokenize(request: DetokenizeRequest, raw_request: Request): - handler = tokenization(raw_request) - - try: - generator = await handler.create_detokenize(request, raw_request) - except OverflowError as e: - raise RequestValidationError(errors=[str(e)]) from e - except Exception as e: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) - ) from e - - if isinstance(generator, ErrorResponse): - return JSONResponse( - content=generator.model_dump(), status_code=generator.error.code - ) - elif isinstance(generator, DetokenizeResponse): - return JSONResponse(content=generator.model_dump()) +router = APIRouter() + + +@router.post( + "/tokenize", + dependencies=[Depends(validate_json_request)], + responses={ + HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, + HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, + HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, + HTTPStatus.NOT_IMPLEMENTED.value: {"model": ErrorResponse}, + }, +) +@with_cancellation +async def tokenize(request: TokenizeRequest, raw_request: Request): + handler = tokenization(raw_request) + + try: + generator = await handler.create_tokenize(request, raw_request) + except NotImplementedError as e: + raise HTTPException( + status_code=HTTPStatus.NOT_IMPLEMENTED.value, detail=str(e) + ) from e + except Exception as e: + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) + ) from e + + if isinstance(generator, ErrorResponse): + return JSONResponse( + content=generator.model_dump(), status_code=generator.error.code + ) + elif isinstance(generator, TokenizeResponse): + return JSONResponse(content=generator.model_dump()) + + assert_never(generator) + + +@router.post( + "/detokenize", + dependencies=[Depends(validate_json_request)], + responses={ + HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse}, + HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse}, + HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse}, + }, +) +@with_cancellation +async def detokenize(request: DetokenizeRequest, raw_request: Request): + handler = tokenization(raw_request) + + try: + generator = await handler.create_detokenize(request, raw_request) + except OverflowError as e: + raise RequestValidationError(errors=[str(e)]) from e + except Exception as e: + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=str(e) + ) from e + + if isinstance(generator, ErrorResponse): + return JSONResponse( + content=generator.model_dump(), status_code=generator.error.code + ) + elif isinstance(generator, DetokenizeResponse): + return JSONResponse(content=generator.model_dump()) - assert_never(generator) + assert_never(generator) + +def attach_router(app: FastAPI): if getattr(app.state.args, "enable_tokenizer_info_endpoint", False): """Conditionally register the tokenizer info endpoint if enabled.""" @@ -111,3 +114,5 @@ async def get_tokenizer_info(raw_request: Request): if isinstance(result, ErrorResponse) else 200, ) + + app.include_router(router) From f6ad5a1eaaf26b4f31f74992c76e02700c44b5f5 Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Mon, 1 Dec 2025 15:31:53 +0000 Subject: [PATCH 10/14] [Refactor] [1/N] to simplify the vLLM serving architecture Signed-off-by: chaunceyjiang --- vllm/entrypoints/openai/api_server.py | 2 +- vllm/entrypoints/serve/disagg/api_router.py | 2 +- vllm/entrypoints/serve/tokenize/api_router.py | 2 +- vllm/entrypoints/serve/tokenize/{protocol.py => serving.py} | 0 4 files changed, 3 insertions(+), 3 deletions(-) rename vllm/entrypoints/serve/tokenize/{protocol.py => serving.py} (100%) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 4033b4be4dff..2fa6afa2bacb 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -82,7 +82,7 @@ from vllm.entrypoints.serve.elastic_ep.middleware import ( ScalingMiddleware, ) -from vllm.entrypoints.serve.tokenize.protocol import OpenAIServingTokenization +from vllm.entrypoints.serve.tokenize.serving import OpenAIServingTokenization from vllm.entrypoints.tool_server import DemoToolServer, MCPToolServer, ToolServer from vllm.entrypoints.utils import ( cli_env_setup, diff --git a/vllm/entrypoints/serve/disagg/api_router.py b/vllm/entrypoints/serve/disagg/api_router.py index 2a48a764252d..c38ede30dad1 100644 --- a/vllm/entrypoints/serve/disagg/api_router.py +++ b/vllm/entrypoints/serve/disagg/api_router.py @@ -21,7 +21,7 @@ from vllm.entrypoints.serve.disagg.serving import ( ServingTokens, ) -from vllm.entrypoints.serve.tokenize.protocol import OpenAIServingTokenization +from vllm.entrypoints.serve.tokenize.serving import OpenAIServingTokenization from vllm.entrypoints.utils import ( load_aware_call, with_cancellation, diff --git a/vllm/entrypoints/serve/tokenize/api_router.py b/vllm/entrypoints/serve/tokenize/api_router.py index 3bdbf95e7e09..a10e78c8d28e 100644 --- a/vllm/entrypoints/serve/tokenize/api_router.py +++ b/vllm/entrypoints/serve/tokenize/api_router.py @@ -17,7 +17,7 @@ TokenizeRequest, TokenizeResponse, ) -from vllm.entrypoints.serve.tokenize.protocol import OpenAIServingTokenization +from vllm.entrypoints.serve.tokenize.serving import OpenAIServingTokenization from vllm.entrypoints.utils import ( with_cancellation, ) diff --git a/vllm/entrypoints/serve/tokenize/protocol.py b/vllm/entrypoints/serve/tokenize/serving.py similarity index 100% rename from vllm/entrypoints/serve/tokenize/protocol.py rename to vllm/entrypoints/serve/tokenize/serving.py From 4527627f71aa5641db1f37335eb67bf9845b19e8 Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Mon, 1 Dec 2025 15:38:11 +0000 Subject: [PATCH 11/14] [Refactor] [1/N] to simplify the vLLM serving architecture Signed-off-by: chaunceyjiang --- vllm/entrypoints/serve/__init__.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/vllm/entrypoints/serve/__init__.py b/vllm/entrypoints/serve/__init__.py index 3a6f59248bee..c4fcc92db931 100644 --- a/vllm/entrypoints/serve/__init__.py +++ b/vllm/entrypoints/serve/__init__.py @@ -4,10 +4,6 @@ from fastapi import FastAPI -from vllm.logger import init_logger - -logger = init_logger("vllm.entrypoints.serve") - def register_vllm_serve_api_routers(app: FastAPI): from vllm.entrypoints.serve.lora.api_router import ( From e4e54899bf000da7604610a206ad6e05e21ed032 Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Mon, 1 Dec 2025 16:09:24 +0000 Subject: [PATCH 12/14] [Refactor] [1/N] to simplify the vLLM serving architecture Signed-off-by: chaunceyjiang --- vllm/entrypoints/serve/instrumentator/health.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/entrypoints/serve/instrumentator/health.py b/vllm/entrypoints/serve/instrumentator/health.py index 52fa0dbab723..029ef677aaa2 100644 --- a/vllm/entrypoints/serve/instrumentator/health.py +++ b/vllm/entrypoints/serve/instrumentator/health.py @@ -30,4 +30,4 @@ async def health(raw_request: Request) -> Response: def attach_router(app): - app.include_router(router) \ No newline at end of file + app.include_router(router) From d02b17e509eb26bab912c850de316c146e07f87a Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Tue, 2 Dec 2025 09:33:43 +0000 Subject: [PATCH 13/14] [Refactor] [1/N] to simplify the vLLM serving architecture Signed-off-by: chaunceyjiang --- tests/entrypoints/openai/test_basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/entrypoints/openai/test_basic.py b/tests/entrypoints/openai/test_basic.py index 3d581a300b6a..1ff30de31bbe 100644 --- a/tests/entrypoints/openai/test_basic.py +++ b/tests/entrypoints/openai/test_basic.py @@ -232,7 +232,7 @@ def make_long_completion_request(): @pytest.mark.asyncio async def test_health_check_engine_dead_error(): # Import the health function directly to test it in isolation - from vllm.entrypoints.openai.api_server import health + from vllm.entrypoints.serve.instrumentator.health import health # Create a mock request that simulates what FastAPI would provide mock_request = Mock(spec=Request) From 54b840babf4ce5f70a3efb8ca657897b3bdb9d91 Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Wed, 3 Dec 2025 05:10:15 +0000 Subject: [PATCH 14/14] [Refactor] [1/N] to simplify the vLLM serving architecture Signed-off-by: chaunceyjiang --- vllm/entrypoints/serve/lora/api_router.py | 73 ++++++++++++----------- 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/vllm/entrypoints/serve/lora/api_router.py b/vllm/entrypoints/serve/lora/api_router.py index c53e4eec3584..6a57e73f334f 100644 --- a/vllm/entrypoints/serve/lora/api_router.py +++ b/vllm/entrypoints/serve/lora/api_router.py @@ -20,41 +20,6 @@ router = APIRouter() -@sagemaker_standards.register_load_adapter_handler( - request_shape={ - "lora_name": "body.name", - "lora_path": "body.src", - }, -) -@router.post("/v1/load_lora_adapter", dependencies=[Depends(validate_json_request)]) -async def load_lora_adapter(request: LoadLoRAAdapterRequest, raw_request: Request): - handler: OpenAIServingModels = models(raw_request) - response = await handler.load_lora_adapter(request) - if isinstance(response, ErrorResponse): - return JSONResponse( - content=response.model_dump(), status_code=response.error.code - ) - - return Response(status_code=200, content=response) - - -@sagemaker_standards.register_unload_adapter_handler( - request_shape={ - "lora_name": "path_params.adapter_name", - } -) -@router.post("/v1/unload_lora_adapter", dependencies=[Depends(validate_json_request)]) -async def unload_lora_adapter(request: UnloadLoRAAdapterRequest, raw_request: Request): - handler: OpenAIServingModels = models(raw_request) - response = await handler.unload_lora_adapter(request) - if isinstance(response, ErrorResponse): - return JSONResponse( - content=response.model_dump(), status_code=response.error.code - ) - - return Response(status_code=200, content=response) - - def attach_router(app: FastAPI): if not envs.VLLM_ALLOW_RUNTIME_LORA_UPDATING: """If LoRA dynamic loading & unloading is not enabled, do nothing.""" @@ -63,5 +28,43 @@ def attach_router(app: FastAPI): "LoRA dynamic loading & unloading is enabled in the API server. " "This should ONLY be used for local development!" ) + + @sagemaker_standards.register_load_adapter_handler( + request_shape={ + "lora_name": "body.name", + "lora_path": "body.src", + }, + ) + @router.post("/v1/load_lora_adapter", dependencies=[Depends(validate_json_request)]) + async def load_lora_adapter(request: LoadLoRAAdapterRequest, raw_request: Request): + handler: OpenAIServingModels = models(raw_request) + response = await handler.load_lora_adapter(request) + if isinstance(response, ErrorResponse): + return JSONResponse( + content=response.model_dump(), status_code=response.error.code + ) + + return Response(status_code=200, content=response) + + @sagemaker_standards.register_unload_adapter_handler( + request_shape={ + "lora_name": "path_params.adapter_name", + } + ) + @router.post( + "/v1/unload_lora_adapter", dependencies=[Depends(validate_json_request)] + ) + async def unload_lora_adapter( + request: UnloadLoRAAdapterRequest, raw_request: Request + ): + handler: OpenAIServingModels = models(raw_request) + response = await handler.unload_lora_adapter(request) + if isinstance(response, ErrorResponse): + return JSONResponse( + content=response.model_dump(), status_code=response.error.code + ) + + return Response(status_code=200, content=response) + # register the router app.include_router(router)