diff --git a/Makefile b/Makefile index 6c1eaa448..97b37af90 100644 --- a/Makefile +++ b/Makefile @@ -86,6 +86,8 @@ tests_install: ${MKDIR} ${SCRIPTS_INSTALL_PATH}/tests ${MKDIR} ${SCRIPTS_INSTALL_PATH}/tests/rcumap_sync ${INSTALL} -m 0644 tests/rcumap_sync/*.lua ${SCRIPTS_INSTALL_PATH}/tests/rcumap_sync + ${MKDIR} ${SCRIPTS_INSTALL_PATH}/tests/mailbox + ${INSTALL} -m 0644 tests/mailbox/*.lua ${SCRIPTS_INSTALL_PATH}/tests/mailbox tests_uninstall: ${RM} -r ${SCRIPTS_INSTALL_PATH}/tests diff --git a/lib/mailbox.lua b/lib/mailbox.lua index 3f9d324c2..4182e0912 100644 --- a/lib/mailbox.lua +++ b/lib/mailbox.lua @@ -1,6 +1,6 @@ -- -- SPDX-FileCopyrightText: (c) 2024 Ring Zero Desenvolvimento de Software LTDA --- SPDX-License-Identifier: MIT OR GPL-2.0-only +-- SPDX-License-Identifier: MIT OR GPL-2.0-only -- --- @@ -20,10 +20,12 @@ local fifo = require("fifo") local completion = require("completion") +local lunatik = require("lunatik") + --- -- The main mailbox table. -- @table mailbox -local mailbox = {} +local mailbox = {} --- -- Metatable for MailBox objects. @@ -43,44 +45,127 @@ MailBox.__index = MailBox -- @return (MailBox) The new mailbox object. -- @local local function new(q, e, allowed, forbidden) + -- Setting e to false makes this Mailbox non-blocking, + -- hence suitable for non-sleepable runtimes. local mbox = {} if type(q) == 'userdata' then mbox.queue, mbox.event = q, e else - mbox.queue, mbox.event = fifo.new(q), completion.new() + mbox.queue = fifo.new(q) + if e ~= false then + mbox.event = completion.new() + end end mbox[forbidden] = function () error(allowed .. "-only mailbox") end return setmetatable(mbox, MailBox) end --- --- Creates a new inbox (receive-only mailbox). --- @param q (fifo|number) Either an existing FIFO object or a capacity for a new FIFO. --- If a number, a new FIFO with this capacity will be created. --- @param e (completion) [optional] An existing completion object. If nil and `q` is a number, --- a new completion object will be created. --- @return (MailBox) A new inbox object. +-- Internal constructor for mailbox objects using RCU (Read-Copy-Update) mechanism. +-- This function is used to create mailboxes that are shared across runtimes +-- using the `lunatik._ENV` table for synchronization. +-- @tparam string name The name of the mailbox in the `lunatik._ENV` table. +-- @tparam (fifo|number|boolean) q Either an existing FIFO object, a capacity for a new FIFO, or `false` for a non-blocking mailbox. +-- @tparam[opt] completion e An existing completion object. If `nil` and `q` is a number, a new completion is created. If `false`, the mailbox is non-blocking. +-- @tparam string allowed The allowed operation ("send" or "receive"). +-- @tparam string forbidden The forbidden operation ("send" or "receive"). +-- @treturn MailBox The new mailbox object. +-- @local -- @usage --- local my_inbox = mailbox.inbox(10) -- Inbox with capacity for 10 messages --- local msg = my_inbox:receive() -function mailbox.inbox(q, e) - return new(q, e, 'receive', 'send') +-- local my_inbox = mailbox.inbox("my_shared_mailbox") +-- local my_outbox = mailbox.outbox("my_shared_mailbox") +-- my_outbox:send("hello from outbox") +-- local msg = my_inbox:receive() -- msg will be "hello from outbox" +local env = lunatik._ENV +local function new_rcu(name, q, e, allowed, forbidden) + if q == false then -- Makes it possible to create a non-blocking mailbox as follows: mailbox.rcu "my_mbox", false + e = q + end + q = q or 10240 + local mbox + local queue = env[name] + if queue then + mbox = new(queue, e, allowed, forbidden) + -- The following functions are a guard against race conditions + function mbox:receive(message) + self.queue, self.receive, self.send = env[name], nil, nil + MailBox.receive(self, message) + end + function mbox:send(message) + self.queue, self.receive, self.send = env[name], nil, nil + MailBox.send(self, message) + end + else + mbox = new(q, e, allowed, forbidden) + env[name] = mbox.queue + end + return mbox end --- --- Creates a new outbox (send-only mailbox). --- @param q (fifo|number) Either an existing FIFO object or a capacity for a new FIFO. --- If a number, a new FIFO with this capacity will be created. --- @param e (completion) [optional] An existing completion object. If nil and `q` is a number, --- a new completion object will be created. --- @return (MailBox) A new outbox object. +-- Creates a new inbox (receive-only) mailbox. +-- An inbox allows messages to be received. Sending messages to an inbox will result in an error. +-- @function mailbox.inbox +-- @tparam[opt] string name The name of the mailbox for RCU sharing across runtimes. If omitted, a local mailbox is created. +-- @tparam[opt] (fifo|number|boolean) q Either an existing FIFO object, a capacity for a new FIFO, or `false` for a non-blocking mailbox. Defaults to 10240 if `name` is provided. +-- @tparam[opt] completion e An existing completion object. If `nil` and `q` is a number, a new completion is created. If `false`, the mailbox is non-blocking. +-- @treturn MailBox The new inbox mailbox object. -- @usage --- local my_outbox = mailbox.outbox(10) -- Outbox with capacity for 10 messages --- my_outbox:send("hello") -function mailbox.outbox(q, e) - return new(q, e, 'send', 'receive') +-- -- Create a local inbox with a capacity of 1024 bytes +-- local my_inbox = mailbox.inbox(1024) +-- +-- -- Non-blocking receive (returns immediately if no message) +-- local msg = my_inbox:receive(0) +-- if not msg then +-- print("No message available") +-- end +-- +-- -- Create an inbox from an existing outbox's queue and event +-- local existing_outbox = mailbox.outbox(2048) +-- local paired_inbox = mailbox.inbox(existing_outbox.queue, existing_outbox.event) +-- +-- -- Shared inbox example: +-- local shared_inbox = mailbox.inbox("global_messages") +-- local msg = shared_inbox:receive() +-- @within mailbox +function mailbox.inbox(name, q, e) + if type(name) ~= "string" then + q, e = name, q + return new(q, e, 'receive', 'send') + end + return new_rcu(name, q, e, 'receive', 'send') end +--- +-- Creates a new outbox (send-only) mailbox. +-- An outbox allows messages to be sent. Receiving messages from an outbox will result in an error. +-- @function mailbox.outbox +-- @tparam[opt] string name The name of the mailbox for RCU sharing across runtimes. If omitted, a local mailbox is created. +-- @tparam[opt] (fifo|number|boolean) q Either an existing FIFO object, a capacity for a new FIFO, or `false` for a non-blocking mailbox. Defaults to 10240 if `name` is provided. +-- @tparam[opt] completion e An existing completion object. If `nil` and `q` is a number, a new completion is created. If `false`, the mailbox is non-blocking. +-- @treturn MailBox The new outbox mailbox object. +-- @usage +-- -- Create a local outbox with a capacity of 2048 bytes +-- local my_outbox = mailbox.outbox(2048) +-- my_outbox:send("hello") +-- +-- -- Create an outbox from an existing inbox's queue and event +-- local existing_inbox = mailbox.inbox(1024) +-- local paired_outbox = mailbox.outbox(existing_inbox.queue, existing_inbox.event) +-- +-- -- Shared outbox example: +-- local shared_outbox = mailbox.outbox("global_messages") +-- shared_outbox:send("another message") +-- @within mailbox +function mailbox.outbox(name, q, e) + if type(name) ~= "string" then + q, e = name, q + return new(q, e, 'send', 'receive') + end + return new_rcu(name, q, e, 'send', 'receive') +end + + local sizeoft = string.packsize("T") --- @@ -95,9 +180,27 @@ local sizeoft = string.packsize("T") -- @treturn[2] string Error message if the wait times out or another error occurs. -- @raise Error if called on an outbox, or if the underlying event wait fails, -- or if a malformed message is encountered. +-- @usage +-- -- Blocking receive (waits indefinitely) +-- local msg = my_inbox:receive() +-- +-- -- Non-blocking receive (returns immediately if no message) +-- local msg = my_inbox:receive(0) +-- if not msg then +-- print("No message available") +-- end +-- +-- -- Blocking receive with timeout (waits for 100 jiffies) +-- local msg = my_inbox:receive(100) +-- if not msg then +-- print("Timeout: No message received within 100 jiffies") +-- end function MailBox:receive(timeout) - local ok, err = self.event:wait(timeout) - if not ok then error(err) end + -- Setting timeout = 0 makes this function non-blocking. + if self.event and timeout ~= 0 then + local ok, err = self.event:wait(timeout) + if not ok then error(err) end + end local queue = self.queue local header, header_size = queue:pop(sizeoft) @@ -117,10 +220,14 @@ end -- @function MailBox:send -- @tparam string message The message to send. -- @raise Error if called on an inbox. +-- @usage +-- local my_outbox = mailbox.outbox(1) +-- my_outbox:send("important message") -- This will complete the event, potentially waking up a receiver. function MailBox:send(message) self.queue:push(string.pack("s", message)) - self.event:complete() + if self.event then self.event:complete() end end + return mailbox diff --git a/tests/mailbox/main.lua b/tests/mailbox/main.lua new file mode 100644 index 000000000..71dff7644 --- /dev/null +++ b/tests/mailbox/main.lua @@ -0,0 +1,184 @@ +-- +-- SPDX-FileCopyrightText: (c) 2024 Ring Zero Desenvolvimento de Software LTDA +-- SPDX-License-Identifier: MIT OR GPL-2.0-only +-- + +local lunatik = require("lunatik") +local mailbox = require("mailbox") +local fifo = require("fifo") +local completion = require("completion") + +local function assert_true(condition, message) + if not condition then + error("Assertion failed: " .. (message or "Expected true")) + end +end + +local function assert_equal(expected, actual, message) + if expected ~= actual then + error("Assertion failed: " .. (message or "") .. " Expected '" .. tostring(expected) .. "', got '" .. tostring(actual) .. "'") + end +end + +local function assert_is_nil(value, message) + if value ~= nil then + error("Assertion failed: " .. (message or "") .. " Expected nil, got '" .. tostring(value) .. "'") + end +end + +local function assert_has_error(func, expected_error_msg) + local ok, err = pcall(func) + assert_true(not ok, "Assertion failed: Expected an error, but no error occurred.") + if expected_error_msg then + assert_true(string.find(err, expected_error_msg), "Assertion failed: Expected error message containing '" .. expected_error_msg .. "', but got '" .. err .. "'") + end +end + +local MailboxTests = {} + +function MailboxTests.test_local_inbox_outbox() + local outbox = mailbox.outbox(1024) + local inbox = mailbox.inbox(outbox.queue, outbox.event) + + assert_true(pcall(outbox.send, outbox, "hello")) + assert_equal("hello", inbox:receive(0)) + + assert_is_nil(inbox:receive(0)) -- Should be empty now + + -- Test send on inbox (should error) + assert_has_error(function() inbox:send("should fail") end, "receive%-only mailbox") + + -- Test receive on outbox (should error) + assert_has_error(function() outbox:receive(0) end, "send%-only mailbox") +end + +function MailboxTests.test_non_blocking_mailbox() + local outbox = mailbox.outbox(false) -- Non-blocking outbox + local inbox = mailbox.inbox(outbox.queue, false) -- Non-blocking inbox + + assert_true(pcall(outbox.send, outbox, "non-blocking message")) + assert_equal("non-blocking message", inbox:receive(0)) + assert_is_nil(inbox:receive(0)) + + -- Test non-blocking receive on empty mailbox + local empty_inbox = mailbox.inbox(1, false) + assert_is_nil(empty_inbox:receive(0)) +end + +function MailboxTests.test_rcu_mailbox() + local mbox_name = "test_rcu_mailbox_name" + + -- Ensure _ENV is clean before test + lunatik._ENV[mbox_name] = nil + + local outbox1 = mailbox.outbox(mbox_name) + local inbox1 = mailbox.inbox(mbox_name) + + assert_true(pcall(outbox1.send, outbox1, "rcu message 1")) + assert_equal("rcu message 1", inbox1:receive(0)) + + -- Create new instances, they should share the same underlying queue + local outbox2 = mailbox.outbox(mbox_name) + local inbox2 = mailbox.inbox(mbox_name) + + assert_true(pcall(outbox2.send, outbox2, "rcu message 2")) + assert_equal("rcu message 2", inbox2:receive(0)) + + -- Test that the queue is indeed shared + assert_equal(outbox1.queue, outbox2.queue) + assert_equal(inbox1.queue, inbox2.queue) + assert_equal(outbox1.queue, inbox1.queue) + + -- Test RCU update mechanism (simulated by re-assigning _ENV[mbox_name]) + local new_fifo = fifo.new(10) + lunatik._ENV[mbox_name] = new_fifo + + local outbox3 = mailbox.outbox(mbox_name) + local inbox3 = mailbox.inbox(mbox_name) + + assert_true(pcall(outbox3.send, outbox3, "rcu message 3")) + assert_equal("rcu message 3", inbox3:receive(0)) + + assert_equal(outbox3.queue, new_fifo) + assert_equal(inbox3.queue, new_fifo) + + -- Old mailboxes should now point to the new queue after their next operation + outbox1:send("rcu message 4") + assert_equal(outbox1.queue, new_fifo) + assert_equal("rcu message 4", inbox3:receive(0)) + + inbox1:receive(0) -- This will update inbox1.queue + assert_equal(inbox1.queue, new_fifo) + + -- Clean up _ENV + lunatik._ENV[mbox_name] = nil +end + +function MailboxTests.test_mailbox_capacity() + local outbox = mailbox.outbox(1) -- Capacity of 1 message + local inbox = mailbox.inbox(outbox.queue, outbox.event) + + assert_true(pcall(outbox.send, outbox, "message 1")) + assert_has_error(function() outbox:send("message 2") end, "fifo is full") + + assert_equal("message 1", inbox:receive(0)) + assert_is_nil(inbox:receive(0)) + + assert_true(pcall(outbox.send, outbox, "message 3")) + assert_equal("message 3", inbox:receive(0)) +end + +function MailboxTests.test_receive_timeout() + local outbox = mailbox.outbox(1) + local inbox = mailbox.inbox(outbox.queue, outbox.event) + + -- Test immediate return (timeout 0) + local msg = inbox:receive(0) + assert_is_nil(msg) + + -- Test blocking receive with timeout (short timeout, should return nil) + msg = inbox:receive(1) -- 1 jiffy timeout + assert_is_nil(msg) + + -- Test blocking receive with message (should return message) + outbox:send("timed message") + msg = inbox:receive(100) -- Long timeout, but message is there + assert_equal("timed message", msg) +end + +function MailboxTests.test_malformed_message() + local outbox = mailbox.outbox(1) + local inbox = mailbox.inbox(outbox.queue, outbox.event) + + -- Manually push a malformed header to the queue + outbox.queue:push("short") -- Not enough bytes for string.unpack("T") + + assert_has_error(function() inbox:receive() end, "malformed message") +end + +local function run_tests() + local num_passed = 0 + local num_failed = 0 + + print("\nRunning Mailbox Tests...") + + for name, func in pairs(MailboxTests) do + print(" " .. name .. ": ") + local ok, err = pcall(func) + if ok then + print("PASS") + num_passed = num_passed + 1 + else + print("FAIL - " .. err) + num_failed = num_failed + 1 + end + end + + print(string.format("\nTests finished: %d passed, %d failed", num_passed, num_failed)) + if num_failed > 0 then + error("Some tests failed.") + end +end + +run_tests() +