Skip to content

Commit 26dd5bb

Browse files
committed
LLM pools.
1 parent 739a4ce commit 26dd5bb

File tree

21 files changed

+2524
-326
lines changed

21 files changed

+2524
-326
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#!/usr/bin/env python3
2+
# Copyright 2025 The Marin Authors
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# https://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""Demo: Math inference with worker pool and autoscaling.
17+
18+
This example demonstrates how to use WorkerPool to scale LLM inference across
19+
multiple workers. It uses fake LLM workers to simulate solving math problems,
20+
showcasing the autoscaling and queue-based task distribution.
21+
22+
Example:
23+
$ python -m fray.examples.inference_pool
24+
"""
25+
26+
import logging
27+
import time
28+
29+
from fray.cluster.local_cluster import LocalCluster
30+
from fray.worker_pool import WorkerPool, WorkerPoolConfig
31+
32+
logging.basicConfig(
33+
level=logging.INFO,
34+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
35+
)
36+
logger = logging.getLogger(__name__)
37+
38+
39+
def main():
40+
"""Run math inference demo with worker pool."""
41+
logger.info("Starting math inference demo")
42+
43+
# Sample math problems to solve
44+
problems = [
45+
{"id": 1, "problem": "What is 2 + 2?"},
46+
{"id": 2, "problem": "What is 15 * 3?"},
47+
{"id": 3, "problem": "What is 100 - 47?"},
48+
{"id": 4, "problem": "What is 144 / 12?"},
49+
{"id": 5, "problem": "What is 8^2?"},
50+
{"id": 6, "problem": "What is sqrt(256)?"},
51+
{"id": 7, "problem": "What is 7! (factorial)?"},
52+
{"id": 8, "problem": "What is the sum of integers from 1 to 100?"},
53+
{"id": 9, "problem": "What is 2^10?"},
54+
{"id": 10, "problem": "What is the greatest common divisor of 48 and 18?"},
55+
]
56+
57+
# Create local cluster
58+
logger.info("Creating local cluster")
59+
cluster = LocalCluster()
60+
61+
# Configure worker pool with autoscaling
62+
config = WorkerPoolConfig(
63+
min_workers=2, # Start with 2 workers
64+
max_workers=5, # Scale up to 5 workers
65+
scale_up_threshold=0.8, # Scale up when >0.8 tasks per worker
66+
scale_down_threshold=0.2, # Scale down when <0.2 tasks per worker
67+
scale_check_interval=2.0, # Check every 2 seconds
68+
)
69+
70+
logger.info(f"Creating worker pool: min={config.min_workers}, max={config.max_workers}")
71+
pool = WorkerPool(
72+
cluster=cluster,
73+
worker_module="fray.examples.fake_llm_worker",
74+
config=config,
75+
)
76+
77+
try:
78+
# Process problems in batches to demonstrate autoscaling
79+
logger.info(f"Processing {len(problems)} math problems")
80+
81+
# Submit first batch (should trigger scale up)
82+
start_time = time.time()
83+
results = pool.map(problems, timeout=60.0)
84+
duration = time.time() - start_time
85+
86+
# Display results
87+
logger.info(f"\nResults (processed in {duration:.2f}s):")
88+
for result in sorted(results, key=lambda r: r["problem_id"]):
89+
problem = next(p for p in problems if p["id"] == result["problem_id"])
90+
logger.info(f" Problem {result['problem_id']}: {problem['problem']}")
91+
logger.info(f" Answer: {result['answer']}")
92+
93+
# Show pool metrics
94+
logger.info("\nPool metrics:")
95+
logger.info(f" Total problems processed: {len(results)}")
96+
logger.info(f" Average time per problem: {duration / len(results):.2f}s")
97+
logger.info(f" Final worker count: {pool.num_workers()}")
98+
99+
finally:
100+
logger.info("\nShutting down worker pool")
101+
pool.shutdown(timeout=10.0)
102+
logger.info("Demo complete")
103+
104+
105+
if __name__ == "__main__":
106+
main()

lib/fray/src/fray/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
ThreadContext,
2323
create_context,
2424
)
25+
from fray.worker_pool import WorkerPool, WorkerPoolConfig
2526

2627
__all__ = [
2728
"ContextConfig",
2829
"ExecutionContext",
2930
"RayContext",
3031
"SyncContext",
3132
"ThreadContext",
33+
"WorkerPool",
34+
"WorkerPoolConfig",
3235
"create_context",
3336
]

lib/fray/src/fray/cluster/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
"""
3333

3434
from fray.cluster.base import Cluster
35-
from fray.cluster.local import LocalCluster
35+
from fray.cluster.queue import Lease, Queue
3636
from fray.cluster.types import (
3737
CpuConfig,
3838
DeviceConfig,
@@ -60,13 +60,22 @@
6060
"JobInfo",
6161
"JobRequest",
6262
"JobStatus",
63-
"LocalCluster",
63+
"Lease",
64+
"Queue",
6465
"ResourceConfig",
6566
"TpuConfig",
6667
"TpuType",
6768
"create_environment",
6869
]
6970

71+
# LocalCluster is optional
72+
try:
73+
from fray.cluster.local import LocalCluster
74+
75+
__all__.append("LocalCluster")
76+
except ImportError:
77+
pass
78+
7079
# Ray cluster is optional
7180
try:
7281
from fray.cluster.ray.cluster import RayCluster

lib/fray/src/fray/cluster/base.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from abc import ABC, abstractmethod
2020
from collections.abc import Iterator
2121

22+
from fray.cluster.queue import Queue
2223
from fray.cluster.types import JobId, JobInfo, JobRequest
2324

2425
logger = logging.getLogger(__name__)
@@ -110,6 +111,21 @@ def list_jobs(self) -> list[JobInfo]:
110111
"""
111112
...
112113

114+
@abstractmethod
115+
def create_queue(self, name: str) -> Queue:
116+
"""Create a distributed queue for task distribution.
117+
118+
Args:
119+
name: Unique name for this queue
120+
121+
Returns:
122+
Backend-specific Queue implementation
123+
124+
Raises:
125+
ValueError: If queue name is invalid or already exists
126+
"""
127+
...
128+
113129
def wait(self, job_id: JobId) -> JobInfo:
114130
"""Block until the specified job completes, returning its final status."""
115131
# default implementation polls until job is no longer running
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Copyright 2025 The Marin Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Local implementations for cluster components."""
16+
17+
from fray.cluster.local_cluster import LocalCluster
18+
from fray.cluster.local.queue import LocalQueue
19+
20+
__all__ = ["LocalCluster", "LocalQueue"]
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
# Copyright 2025 The Marin Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""File-based queue implementation for cross-subprocess communication.
16+
17+
This queue implementation uses the filesystem for state management, enabling
18+
reliable cross-process communication even when using subprocess.Popen (which
19+
doesn't share memory like multiprocessing.Process does).
20+
"""
21+
22+
import json
23+
import time
24+
import uuid
25+
from pathlib import Path
26+
from typing import Any
27+
28+
from fray.cluster.queue import Lease
29+
30+
31+
class FileQueue:
32+
"""File-based queue implementation for cross-subprocess communication.
33+
34+
Uses the filesystem to store queue state, allowing multiple independent
35+
processes (even those started via subprocess.Popen) to share a queue.
36+
37+
Queue structure:
38+
- {queue_dir}/available/{uuid}.json - available tasks
39+
- {queue_dir}/leased/{uuid}.json - currently leased tasks
40+
- {queue_dir}/.lock - simple file-based lock (not perfect but good enough)
41+
"""
42+
43+
def __init__(self, name: str, queue_dir: Path | None = None):
44+
"""Initialize a file-based queue.
45+
46+
Args:
47+
name: Unique queue name
48+
queue_dir: Directory to store queue files (default: /tmp/fray_queues)
49+
"""
50+
if queue_dir is None:
51+
queue_dir = Path("/tmp/fray_queues")
52+
53+
self._name = name
54+
self._base_dir = queue_dir / name
55+
self._available_dir = self._base_dir / "available"
56+
self._leased_dir = self._base_dir / "leased"
57+
58+
# Create directories
59+
self._available_dir.mkdir(parents=True, exist_ok=True)
60+
self._leased_dir.mkdir(parents=True, exist_ok=True)
61+
62+
def push(self, item: Any) -> None:
63+
"""Add an item to the queue."""
64+
item_id = str(uuid.uuid4())
65+
file_path = self._available_dir / f"{item_id}.json"
66+
67+
with open(file_path, "w") as f:
68+
json.dump({"item": item, "timestamp": time.time()}, f)
69+
70+
def peek(self) -> Any | None:
71+
"""View the next available item without acquiring a lease."""
72+
files = sorted(self._available_dir.glob("*.json"))
73+
if not files:
74+
return None
75+
76+
with open(files[0]) as f:
77+
data = json.load(f)
78+
return data["item"]
79+
80+
def pop(self) -> Lease[Any] | None:
81+
"""Acquire a lease on the next available item.
82+
83+
Also checks for expired leases (older than 60 seconds) and requeues them.
84+
"""
85+
# Check for expired leases and requeue them
86+
current_time = time.time()
87+
lease_timeout = 60.0 # seconds
88+
89+
for leased_file in self._leased_dir.glob("*.json"):
90+
try:
91+
with open(leased_file) as f:
92+
data = json.load(f)
93+
94+
# Check if lease has expired
95+
lease_time = data.get("lease_time", 0)
96+
if current_time - lease_time > lease_timeout:
97+
# Requeue expired lease
98+
available_path = self._available_dir / leased_file.name
99+
leased_file.rename(available_path)
100+
except (FileNotFoundError, json.JSONDecodeError):
101+
# File was removed or corrupted, skip
102+
continue
103+
104+
# Find oldest available item
105+
files = sorted(self._available_dir.glob("*.json"))
106+
if not files:
107+
return None
108+
109+
# Try to move file to leased directory (atomic operation)
110+
for file_path in files:
111+
lease_id = file_path.stem
112+
leased_path = self._leased_dir / f"{lease_id}.json"
113+
114+
try:
115+
# Read the item first
116+
with open(file_path) as f:
117+
data = json.load(f)
118+
119+
# Add lease time
120+
data["lease_time"] = time.time()
121+
122+
# Write with lease time, then rename (atomic)
123+
temp_path = file_path.with_suffix(".tmp")
124+
with open(temp_path, "w") as f:
125+
json.dump(data, f)
126+
127+
temp_path.rename(file_path)
128+
file_path.rename(leased_path)
129+
130+
return Lease(
131+
item=data["item"],
132+
lease_id=lease_id,
133+
timestamp=data["lease_time"],
134+
)
135+
except FileNotFoundError:
136+
# Another process got it first, try next file
137+
continue
138+
139+
return None
140+
141+
def done(self, lease: Lease[Any]) -> None:
142+
"""Mark a leased task as successfully completed."""
143+
leased_path = self._leased_dir / f"{lease.lease_id}.json"
144+
145+
if not leased_path.exists():
146+
raise ValueError(f"Invalid lease: {lease.lease_id}")
147+
148+
leased_path.unlink()
149+
150+
def release(self, lease: Lease[Any]) -> None:
151+
"""Release a lease and requeue the item."""
152+
leased_path = self._leased_dir / f"{lease.lease_id}.json"
153+
154+
if not leased_path.exists():
155+
raise ValueError(f"Invalid lease: {lease.lease_id}")
156+
157+
# Move back to available
158+
available_path = self._available_dir / f"{lease.lease_id}.json"
159+
leased_path.rename(available_path)
160+
161+
def size(self) -> int:
162+
"""Return the total number of items in the queue."""
163+
available = len(list(self._available_dir.glob("*.json")))
164+
leased = len(list(self._leased_dir.glob("*.json")))
165+
return available + leased
166+
167+
def pending(self) -> int:
168+
"""Return the number of items available for leasing."""
169+
return len(list(self._available_dir.glob("*.json")))

0 commit comments

Comments
 (0)