Skip to content
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
import pytest_asyncio

from ...utils import RemoteAnthropicServer
from ...utils import RemoteOpenAIServer

MODEL_NAME = "Qwen/Qwen3-0.6B"

Expand All @@ -23,13 +23,13 @@ def server(): # noqa: F811
"claude-3-7-sonnet-latest",
]

with RemoteAnthropicServer(MODEL_NAME, args) as remote_server:
with RemoteOpenAIServer(MODEL_NAME, args) as remote_server:
yield remote_server


@pytest_asyncio.fixture
async def client(server):
async with server.get_async_client() as async_client:
async with server.get_async_client_anthropic() as async_client:
yield async_client


Expand Down Expand Up @@ -105,37 +105,37 @@ async def test_anthropic_tool_call(client: anthropic.AsyncAnthropic):

print(f"Anthropic response: {resp.model_dump_json()}")

@pytest.mark.asyncio
async def test_anthropic_tool_call_streaming(client: anthropic.AsyncAnthropic):
resp = await client.messages.create(
model="claude-3-7-sonnet-latest",
max_tokens=1024,
messages=[
{
"role": "user",
"content": "What's the weather like in New York today?",
}
],
tools=[
{
"name": "get_current_weather",
"description": "Useful for querying the weather "
"in a specified city.",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City or region, for example: "
"New York, London, Tokyo, etc.",
}
},
"required": ["location"],

@pytest.mark.asyncio
async def test_anthropic_tool_call_streaming(client: anthropic.AsyncAnthropic):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case let's not copy the tests like this to avoid duplication in CI. We just need to check that the endpoint exists for OpenAI server since the same code is used to process the endpoint

resp = await client.messages.create(
model="claude-3-7-sonnet-latest",
max_tokens=1024,
messages=[
{
"role": "user",
"content": "What's the weather like in New York today?",
}
],
tools=[
{
"name": "get_current_weather",
"description": "Useful for querying the weather in a specified city.",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City or region, for example: "
"New York, London, Tokyo, etc.",
}
},
}
],
stream=True,
)
"required": ["location"],
},
}
],
stream=True,
)

async for chunk in resp:
print(chunk.model_dump_json())
async for chunk in resp:
print(chunk.model_dump_json())
142 changes: 17 additions & 125 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,23 @@ def get_async_client(self, **kwargs):
**kwargs,
)

def get_client_anthropic(self, **kwargs):
if "timeout" not in kwargs:
kwargs["timeout"] = 600
return anthropic.Anthropic(
base_url=self.url_for(),
api_key=self.DUMMY_API_KEY,
max_retries=0,
**kwargs,
)

def get_async_client_anthropic(self, **kwargs):
if "timeout" not in kwargs:
kwargs["timeout"] = 600
return anthropic.AsyncAnthropic(
base_url=self.url_for(), api_key=self.DUMMY_API_KEY, max_retries=0, **kwargs
)


class RemoteOpenAIServerCustom(RemoteOpenAIServer):
"""Launch test server with custom child process"""
Expand Down Expand Up @@ -293,131 +310,6 @@ def __exit__(self, exc_type, exc_value, traceback):
self.proc.kill()


class RemoteAnthropicServer:
DUMMY_API_KEY = "token-abc123" # vLLM's Anthropic server does not need API key

def __init__(
self,
model: str,
vllm_serve_args: list[str],
*,
env_dict: dict[str, str] | None = None,
seed: int | None = 0,
auto_port: bool = True,
max_wait_seconds: float | None = None,
) -> None:
if auto_port:
if "-p" in vllm_serve_args or "--port" in vllm_serve_args:
raise ValueError(
"You have manually specified the port when `auto_port=True`."
)

# Don't mutate the input args
vllm_serve_args = vllm_serve_args + ["--port", str(get_open_port())]
if seed is not None:
if "--seed" in vllm_serve_args:
raise ValueError(
f"You have manually specified the seed when `seed={seed}`."
)

vllm_serve_args = vllm_serve_args + ["--seed", str(seed)]

parser = FlexibleArgumentParser(description="vLLM's remote Anthropic server.")
subparsers = parser.add_subparsers(required=False, dest="subparser")
parser = ServeSubcommand().subparser_init(subparsers)
args = parser.parse_args(["--model", model, *vllm_serve_args])
self.host = str(args.host or "localhost")
self.port = int(args.port)

self.show_hidden_metrics = args.show_hidden_metrics_for_version is not None

# download the model before starting the server to avoid timeout
is_local = os.path.isdir(model)
if not is_local:
engine_args = AsyncEngineArgs.from_cli_args(args)
model_config = engine_args.create_model_config()
load_config = engine_args.create_load_config()

model_loader = get_model_loader(load_config)
model_loader.download_model(model_config)

env = os.environ.copy()
# the current process might initialize cuda,
# to be safe, we should use spawn method
env["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
if env_dict is not None:
env.update(env_dict)
self.proc = subprocess.Popen(
[
sys.executable,
"-m",
"vllm.entrypoints.anthropic.api_server",
model,
*vllm_serve_args,
],
env=env,
stdout=sys.stdout,
stderr=sys.stderr,
)
max_wait_seconds = max_wait_seconds or 240
self._wait_for_server(url=self.url_for("health"), timeout=max_wait_seconds)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.proc.terminate()
try:
self.proc.wait(8)
except subprocess.TimeoutExpired:
# force kill if needed
self.proc.kill()

def _wait_for_server(self, *, url: str, timeout: float):
# run health check
start = time.time()
while True:
try:
if requests.get(url).status_code == 200:
break
except Exception:
# this exception can only be raised by requests.get,
# which means the server is not ready yet.
# the stack trace is not useful, so we suppress it
# by using `raise from None`.
result = self.proc.poll()
if result is not None and result != 0:
raise RuntimeError("Server exited unexpectedly.") from None

time.sleep(0.5)
if time.time() - start > timeout:
raise RuntimeError("Server failed to start in time.") from None

@property
def url_root(self) -> str:
return f"http://{self.host}:{self.port}"

def url_for(self, *parts: str) -> str:
return self.url_root + "/" + "/".join(parts)

def get_client(self, **kwargs):
if "timeout" not in kwargs:
kwargs["timeout"] = 600
return anthropic.Anthropic(
base_url=self.url_for(),
api_key=self.DUMMY_API_KEY,
max_retries=0,
**kwargs,
)

def get_async_client(self, **kwargs):
if "timeout" not in kwargs:
kwargs["timeout"] = 600
return anthropic.AsyncAnthropic(
base_url=self.url_for(), api_key=self.DUMMY_API_KEY, max_retries=0, **kwargs
)


def _test_completion(
client: openai.OpenAI,
model: str,
Expand Down
Loading