Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bake/async/container/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def memory_sample(duration: 10, connection_id:)
operation = {do: :memory_sample, duration: duration}

# Use the forward operation to proxy the request to a worker:
return connection.call(do: :forward, operation: operation, connection_id: connection_id)
connection.call(do: :forward, operation: operation, connection_id: connection_id)
end
end

Expand Down
2 changes: 1 addition & 1 deletion examples/simple/simple.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def setup(container)
Console.info(self, "Exiting...")
end
end
end
end
end

service "sleep" do
Expand Down
20 changes: 6 additions & 14 deletions lib/async/container/supervisor/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
# Released under the MIT License.
# Copyright, 2025, by Samuel Williams.

require "json"
require "async"
require_relative "message_wrapper"

module Async
module Container
Expand Down Expand Up @@ -37,13 +37,6 @@ def as_json(...)
@message
end

# Convert the call to a JSON string.
#
# @returns [String] The JSON representation.
def to_json(...)
as_json.to_json(...)
end

# @attribute [Connection] The connection that initiated the call.
attr :connection

Expand Down Expand Up @@ -231,7 +224,7 @@ def initialize(stream, id = 0, **state)
@stream = stream
@id = id
@state = state

@message_wrapper = MessageWrapper.new(@stream)
@reader = nil
@calls = {}
end
Expand All @@ -253,17 +246,16 @@ def next_id
#
# @parameter message [Hash] The message to write.
def write(**message)
@stream.write(JSON.dump(message) << "\n")
@stream.flush
@message_wrapper.write(message)
end

# Read a message from the connection stream.
#
# @returns [Hash, nil] The parsed message or nil if stream is closed.
def read
if line = @stream&.gets
JSON.parse(line, symbolize_names: true)
end
@message_wrapper.read
rescue EOFError, IOError
nil
end

# Iterate over all messages from the connection.
Expand Down
114 changes: 114 additions & 0 deletions lib/async/container/supervisor/message_wrapper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2025, by Samuel Williams.

require "msgpack"
require "set"

module Async
module Container
module Supervisor
class MessageWrapper
def initialize(stream)
@factory = MessagePack::Factory.new

register_types

@packer = @factory.packer(stream)
@unpacker = @factory.unpacker(stream)
end

def write(message)
data = pack(message)
@packer.write(data)
end

def read
@unpacker.read
end

def pack(message)
@packer.clear
normalized_message = normalize(message, Set.new)
@packer.pack(normalized_message)
@packer.full_pack
end

def unpack(data)
@factory.unpack(data)
end

private

def normalize(obj, visited = Set.new.compare_by_identity)
# Check for circular references
return "..." if visited.include?(obj)

case obj
when Hash
visited.add(obj)
result = obj.transform_values{|v| normalize(v, visited)}
visited.delete(obj)
result
when Array
visited.add(obj)
result = obj.map{|v| normalize(v, visited)}
visited.delete(obj)
result
else
if obj.respond_to?(:as_json) && (as_json = obj.as_json) && as_json != obj
visited.add(obj)
result = normalize(as_json, visited)
visited.delete(obj)
result
else
obj
end
end
end

def register_types
@factory.register_type(0x00, Symbol)

@factory.register_type(
0x01,
Exception,
packer: self.method(:pack_exception),
unpacker: self.method(:unpack_exception),
recursive: true,
)

@factory.register_type(
0x02,
Class,
packer: ->(klass) {klass.name},
unpacker: ->(name) {name},
)

@factory.register_type(
MessagePack::Timestamp::TYPE,
Time,
packer: MessagePack::Time::Packer,
unpacker: MessagePack::Time::Unpacker
)
end

def pack_exception(exception, packer)
message = [exception.class.name, exception.message, exception.backtrace]
packer.write(message)
end

def unpack_exception(unpacker)
klass, message, backtrace = unpacker.read
klass = Object.const_get(klass)

exception = klass.new(message)
exception.set_backtrace(backtrace)

return exception
end
end
end
end
end
67 changes: 16 additions & 51 deletions test/async/container/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@ def dispatch(call)
describe Async::Container::Supervisor::Connection do
let(:stream) {StringIO.new}
let(:connection) {Async::Container::Supervisor::Connection.new(stream)}
let(:message_wrapper) {Async::Container::Supervisor::MessageWrapper.new(stream)}

def write_message(message)
message_wrapper.write(message)
stream.rewind
end

with "dispatch" do
it "handles failed writes when dispatching a call" do
stream.write(JSON.dump({id: 1, do: :test}) << "\n")
stream.rewind
write_message({id: 1, do: :test})

expect(stream).to receive(:write).and_raise(IOError, "Write error")

Expand All @@ -42,8 +47,7 @@ def dispatch(call)
end

it "closes the queue when the connection fails" do
stream.write(JSON.dump({id: 1, do: :test}) << "\n")
stream.rewind
write_message({id: 1, do: :test})

expect(stream).to receive(:write).and_raise(IOError, "Write error")

Expand Down Expand Up @@ -83,21 +87,11 @@ def dispatch(call)
with subject::Call do
let(:test_call) {Async::Container::Supervisor::Connection::Call.new(connection, 1, {do: :test, data: "value"})}

it "can serialize call to JSON" do
json = test_call.to_json
parsed = JSON.parse(json)

expect(parsed).to have_keys(
"do" => be == "test",
"data" => be == "value"
)
end

it "can get call message via as_json" do
expect(test_call.as_json).to have_keys(
do: be == :test,
data: be == "value"
)
do: be == :test,
data: be == "value"
)
end

it "can iterate over call responses with each" do
Expand All @@ -121,11 +115,11 @@ def dispatch(call)

response = test_call.pop
expect(response).to have_keys(
id: be == 1,
finished: be == true,
failed: be == true,
error: be == "Something went wrong"
)
id: be == 1,
finished: be == true,
failed: be == true,
error: be == "Something went wrong"
)

expect(test_call.closed?).to be == true
end
Expand All @@ -144,35 +138,6 @@ def dispatch(call)
end
end

it "writes JSON with newline" do
connection.write(id: 1, do: :test)

stream.rewind
output = stream.read

# Check it's valid JSON with a newline
expect(output[-1]).to be == "\n"

parsed = JSON.parse(output.chomp)
expect(parsed).to have_keys(
"id" => be == 1,
"do" => be == "test"
)
end

it "parses JSON lines" do
stream.string = JSON.dump({id: 1, do: "test"}) << "\n"
stream.rewind

message = connection.read

# Connection.read uses symbolize_names: true (keys are symbols, values are as-is)
expect(message).to have_keys(
id: be == 1,
do: be == "test"
)
end

it "returns nil when stream is closed" do
stream.string = ""
stream.rewind
Expand Down
Loading