Skip to content

Conversation

@rjpower
Copy link
Collaborator

@rjpower rjpower commented Nov 15, 2025

No description provided.

@rjpower rjpower marked this pull request as ready for review November 18, 2025 19:24
Copilot AI review requested due to automatic review settings November 18, 2025 19:24
Copilot finished reviewing on behalf of rjpower November 18, 2025 19:26
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR replaces the Ray actor-based worker pool implementation from Zephyr with a new cluster-agnostic worker pool system for Fray. The new design uses job-based workers and distributed queues to enable LLM inference scaling across different cluster backends.

Key changes:

  • Introduced a Queue abstraction with lease semantics for reliable distributed task processing
  • Implemented multiple queue backends (Ray actor-based, multiprocessing-based, file-based)
  • Created a new WorkerPool that uses cluster jobs instead of Ray actors for better backend flexibility

Reviewed Changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
lib/zephyr/src/zephyr/worker_pool.py Removed Ray actor-based worker pool implementation
lib/fray/src/fray/worker_pool.py Added new cluster-agnostic worker pool with autoscaling support
lib/fray/src/fray/cluster/queue.py Introduced Queue protocol and Lease dataclass for distributed task management
lib/fray/src/fray/cluster/base.py Added create_queue abstract method to Cluster interface
lib/fray/src/fray/cluster/ray/queue.py Implemented Ray actor-based queue backend
lib/fray/src/fray/cluster/local/queue.py Implemented multiprocessing-based queue for local clusters
lib/fray/src/fray/cluster/local/file_queue.py Implemented file-based queue for subprocess communication
lib/fray/src/fray/examples/fake_llm_worker.py Added example worker simulating LLM inference for testing
lib/fray/tests/test_worker_pool.py Added comprehensive tests for worker pool functionality
lib/fray/tests/cluster/test_*.py Added tests for queue implementations
lib/fray/examples/inference_pool.py Added demo script showcasing worker pool usage


def test_push_and_pop(queue: SimpleQueue):
"""Test pushing and popping items."""
assert queue.pop() is None
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This 'assert' statement contains an expression which may have side effects.

Suggested change
assert queue.pop() is None
result = queue.pop()
assert result is None

Copilot uses AI. Check for mistakes.
def test_empty_queue_operations(queue: SimpleQueue):
"""Test operations on an empty queue."""
assert queue.peek() is None
assert queue.pop() is None
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This 'assert' statement contains an expression which may have side effects.

Suggested change
assert queue.pop() is None
result = queue.pop()
assert result is None

Copilot uses AI. Check for mistakes.

def test_push_and_pop(queue: RayQueue):
"""Test pushing and popping items."""
assert queue.pop() is None
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This 'assert' statement contains an expression which may have side effects.

Suggested change
assert queue.pop() is None
result = queue.pop()
assert result is None

Copilot uses AI. Check for mistakes.
def test_empty_queue_operations(queue: RayQueue):
"""Test operations on an empty queue."""
assert queue.peek() is None
assert queue.pop() is None
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This 'assert' statement contains an expression which may have side effects.

Suggested change
assert queue.pop() is None
result = queue.pop()
assert result is None

Copilot uses AI. Check for mistakes.
@rjpower rjpower force-pushed the rjpower/20251114-llm-pool branch from 26dd5bb to c4a24a9 Compare November 18, 2025 19:43
@rjpower rjpower changed the base branch from main to rjpower/20251114-fray-tpu November 18, 2025 19:45
@rjpower rjpower force-pushed the rjpower/20251114-fray-tpu branch from 6b88bb6 to 9b45fd5 Compare November 20, 2025 17:49
Base automatically changed from rjpower/20251114-fray-tpu to main November 20, 2025 20:06
@rjpower rjpower force-pushed the rjpower/20251114-llm-pool branch 2 times, most recently from 3eaae02 to 473cc12 Compare November 21, 2025 18:01
import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))

Check warning

Code scanning / CodeQL

Binding a socket to all network interfaces Medium

'' binds a socket to all interfaces.

Copilot Autofix

AI 2 days ago

To fix the problem, update the socket binding in the find_free_port() function from s.bind(("", 0)) to s.bind(("127.0.0.1", 0)). This ensures the port discovery socket is only available on the loopback interface, preventing any potential network exposure during its short lifetime. Only lines in the find_free_port function need to be changed, and no imports or additional method definitions are required since '127.0.0.1' is available by default.


Suggested changeset 1
lib/marin/src/marin/evaluation/backends/vllm.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/lib/marin/src/marin/evaluation/backends/vllm.py b/lib/marin/src/marin/evaluation/backends/vllm.py
--- a/lib/marin/src/marin/evaluation/backends/vllm.py
+++ b/lib/marin/src/marin/evaluation/backends/vllm.py
@@ -34,7 +34,7 @@
     import socket
 
     with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
-        s.bind(("", 0))
+        s.bind(("127.0.0.1", 0))
         s.listen(1)
         port = s.getsockname()[1]
     return port
EOF
@@ -34,7 +34,7 @@
import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
s.bind(("127.0.0.1", 0))
s.listen(1)
port = s.getsockname()[1]
return port
Copilot is powered by AI and may make mistakes. Always verify output.
Unable to commit as this autofix suggestion is now outdated
@rjpower rjpower force-pushed the rjpower/20251114-llm-pool branch 3 times, most recently from 1290c74 to 5a12d3c Compare November 25, 2025 04:02
@rjpower rjpower force-pushed the rjpower/20251114-llm-pool branch from 5a12d3c to 33a9b8d Compare November 25, 2025 18:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants