Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ tests_install:
${MKDIR} ${SCRIPTS_INSTALL_PATH}/tests
${MKDIR} ${SCRIPTS_INSTALL_PATH}/tests/rcumap_sync
${INSTALL} -m 0644 tests/rcumap_sync/*.lua ${SCRIPTS_INSTALL_PATH}/tests/rcumap_sync
${MKDIR} ${SCRIPTS_INSTALL_PATH}/tests/mailbox
${INSTALL} -m 0644 tests/mailbox/*.lua ${SCRIPTS_INSTALL_PATH}/tests/mailbox

tests_uninstall:
${RM} -r ${SCRIPTS_INSTALL_PATH}/tests
Expand Down
159 changes: 133 additions & 26 deletions lib/mailbox.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
--
-- SPDX-FileCopyrightText: (c) 2024 Ring Zero Desenvolvimento de Software LTDA
-- SPDX-License-Identifier: MIT OR GPL-2.0-only
-- SPDX-License-Identifier: MIT OR GPL-2.0-only
--

---
Expand All @@ -20,10 +20,12 @@
local fifo = require("fifo")
local completion = require("completion")

local lunatik = require("lunatik")

---
-- The main mailbox table.
-- @table mailbox
local mailbox = {}
local mailbox = {}

---
-- Metatable for MailBox objects.
Expand All @@ -43,44 +45,127 @@ MailBox.__index = MailBox
-- @return (MailBox) The new mailbox object.
-- @local
local function new(q, e, allowed, forbidden)
-- Setting e to false makes this Mailbox non-blocking,
-- hence suitable for non-sleepable runtimes.
local mbox = {}
if type(q) == 'userdata' then
mbox.queue, mbox.event = q, e
else
mbox.queue, mbox.event = fifo.new(q), completion.new()
mbox.queue = fifo.new(q)
if e ~= false then
mbox.event = completion.new()
end
end
mbox[forbidden] = function () error(allowed .. "-only mailbox") end
return setmetatable(mbox, MailBox)
end

---
-- Creates a new inbox (receive-only mailbox).
-- @param q (fifo|number) Either an existing FIFO object or a capacity for a new FIFO.
-- If a number, a new FIFO with this capacity will be created.
-- @param e (completion) [optional] An existing completion object. If nil and `q` is a number,
-- a new completion object will be created.
-- @return (MailBox) A new inbox object.
-- Internal constructor for mailbox objects using RCU (Read-Copy-Update) mechanism.
-- This function is used to create mailboxes that are shared across runtimes
-- using the `lunatik._ENV` table for synchronization.
-- @tparam string name The name of the mailbox in the `lunatik._ENV` table.
-- @tparam (fifo|number|boolean) q Either an existing FIFO object, a capacity for a new FIFO, or `false` for a non-blocking mailbox.
-- @tparam[opt] completion e An existing completion object. If `nil` and `q` is a number, a new completion is created. If `false`, the mailbox is non-blocking.
-- @tparam string allowed The allowed operation ("send" or "receive").
-- @tparam string forbidden The forbidden operation ("send" or "receive").
-- @treturn MailBox The new mailbox object.
-- @local
-- @usage
-- local my_inbox = mailbox.inbox(10) -- Inbox with capacity for 10 messages
-- local msg = my_inbox:receive()
function mailbox.inbox(q, e)
return new(q, e, 'receive', 'send')
-- local my_inbox = mailbox.inbox("my_shared_mailbox")
-- local my_outbox = mailbox.outbox("my_shared_mailbox")
-- my_outbox:send("hello from outbox")
-- local msg = my_inbox:receive() -- msg will be "hello from outbox"
local env = lunatik._ENV
local function new_rcu(name, q, e, allowed, forbidden)
if q == false then -- Makes it possible to create a non-blocking mailbox as follows: mailbox.rcu "my_mbox", false
e = q
end
q = q or 10240
local mbox
local queue = env[name]
if queue then
mbox = new(queue, e, allowed, forbidden)
-- The following functions are a guard against race conditions
function mbox:receive(message)
self.queue, self.receive, self.send = env[name], nil, nil
MailBox.receive(self, message)
end
function mbox:send(message)
self.queue, self.receive, self.send = env[name], nil, nil
MailBox.send(self, message)
end
else
mbox = new(q, e, allowed, forbidden)
env[name] = mbox.queue
end
return mbox
end

---
-- Creates a new outbox (send-only mailbox).
-- @param q (fifo|number) Either an existing FIFO object or a capacity for a new FIFO.
-- If a number, a new FIFO with this capacity will be created.
-- @param e (completion) [optional] An existing completion object. If nil and `q` is a number,
-- a new completion object will be created.
-- @return (MailBox) A new outbox object.
-- Creates a new inbox (receive-only) mailbox.
-- An inbox allows messages to be received. Sending messages to an inbox will result in an error.
-- @function mailbox.inbox
-- @tparam[opt] string name The name of the mailbox for RCU sharing across runtimes. If omitted, a local mailbox is created.
-- @tparam[opt] (fifo|number|boolean) q Either an existing FIFO object, a capacity for a new FIFO, or `false` for a non-blocking mailbox. Defaults to 10240 if `name` is provided.
-- @tparam[opt] completion e An existing completion object. If `nil` and `q` is a number, a new completion is created. If `false`, the mailbox is non-blocking.
-- @treturn MailBox The new inbox mailbox object.
-- @usage
-- local my_outbox = mailbox.outbox(10) -- Outbox with capacity for 10 messages
-- my_outbox:send("hello")
function mailbox.outbox(q, e)
return new(q, e, 'send', 'receive')
-- -- Create a local inbox with a capacity of 1024 bytes
-- local my_inbox = mailbox.inbox(1024)
--
-- -- Non-blocking receive (returns immediately if no message)
-- local msg = my_inbox:receive(0)
-- if not msg then
-- print("No message available")
-- end
--
-- -- Create an inbox from an existing outbox's queue and event
-- local existing_outbox = mailbox.outbox(2048)
-- local paired_inbox = mailbox.inbox(existing_outbox.queue, existing_outbox.event)
--
-- -- Shared inbox example:
-- local shared_inbox = mailbox.inbox("global_messages")
-- local msg = shared_inbox:receive()
-- @within mailbox
function mailbox.inbox(name, q, e)
if type(name) ~= "string" then
q, e = name, q
return new(q, e, 'receive', 'send')
end
return new_rcu(name, q, e, 'receive', 'send')
end

---
-- Creates a new outbox (send-only) mailbox.
-- An outbox allows messages to be sent. Receiving messages from an outbox will result in an error.
-- @function mailbox.outbox
-- @tparam[opt] string name The name of the mailbox for RCU sharing across runtimes. If omitted, a local mailbox is created.
-- @tparam[opt] (fifo|number|boolean) q Either an existing FIFO object, a capacity for a new FIFO, or `false` for a non-blocking mailbox. Defaults to 10240 if `name` is provided.
-- @tparam[opt] completion e An existing completion object. If `nil` and `q` is a number, a new completion is created. If `false`, the mailbox is non-blocking.
-- @treturn MailBox The new outbox mailbox object.
-- @usage
-- -- Create a local outbox with a capacity of 2048 bytes
-- local my_outbox = mailbox.outbox(2048)
-- my_outbox:send("hello")
--
-- -- Create an outbox from an existing inbox's queue and event
-- local existing_inbox = mailbox.inbox(1024)
-- local paired_outbox = mailbox.outbox(existing_inbox.queue, existing_inbox.event)
--
-- -- Shared outbox example:
-- local shared_outbox = mailbox.outbox("global_messages")
-- shared_outbox:send("another message")
-- @within mailbox
function mailbox.outbox(name, q, e)
if type(name) ~= "string" then
q, e = name, q
return new(q, e, 'send', 'receive')
end
return new_rcu(name, q, e, 'send', 'receive')
end


local sizeoft = string.packsize("T")

---
Expand All @@ -95,9 +180,27 @@ local sizeoft = string.packsize("T")
-- @treturn[2] string Error message if the wait times out or another error occurs.
-- @raise Error if called on an outbox, or if the underlying event wait fails,
-- or if a malformed message is encountered.
-- @usage
-- -- Blocking receive (waits indefinitely)
-- local msg = my_inbox:receive()
--
-- -- Non-blocking receive (returns immediately if no message)
-- local msg = my_inbox:receive(0)
-- if not msg then
-- print("No message available")
-- end
--
-- -- Blocking receive with timeout (waits for 100 jiffies)
-- local msg = my_inbox:receive(100)
-- if not msg then
-- print("Timeout: No message received within 100 jiffies")
-- end
function MailBox:receive(timeout)
local ok, err = self.event:wait(timeout)
if not ok then error(err) end
-- Setting timeout = 0 makes this function non-blocking.
if self.event and timeout ~= 0 then
local ok, err = self.event:wait(timeout)
if not ok then error(err) end
end

local queue = self.queue
local header, header_size = queue:pop(sizeoft)
Expand All @@ -117,10 +220,14 @@ end
-- @function MailBox:send
-- @tparam string message The message to send.
-- @raise Error if called on an inbox.
-- @usage
-- local my_outbox = mailbox.outbox(1)
-- my_outbox:send("important message") -- This will complete the event, potentially waking up a receiver.
function MailBox:send(message)
self.queue:push(string.pack("s", message))
self.event:complete()
if self.event then self.event:complete() end
end


return mailbox

Loading