diff --git a/.gitignore b/.gitignore index dc17cc5..f4fe1e6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ nimcache/ nimblecache/ htmldocs/ -tests/tsmartptrsleak -tests/tchannels_simple +* +!*/ +!*.* +*.dSYM/ \ No newline at end of file diff --git a/tests/tchannels_ring.nim b/tests/tchannels_ring.nim new file mode 100644 index 0000000..9842b09 --- /dev/null +++ b/tests/tchannels_ring.nim @@ -0,0 +1,43 @@ +import threading/channels +import std/unittest + +const + BufferSize = 3 + +suite "Ring Buffer Channel Tests": + test "Non-blocking ring buffer behavior": + + proc fillBuffer(n: int): seq[int] = + var chan = newChan[int](BufferSize) + # Fill the buffer + for i in 0..= 2 + block example_non_blocking_overwrite: + var chanRingBuffer = newChan[string](elements = 1) + chanRingBuffer.push("Hello") + chanRingBuffer.push("World") + var msg = "" + assert chanRingBuffer.tryRecv(msg) + assert msg == "World" + when not (defined(gcArc) or defined(gcOrc) or defined(gcAtomicArc) or defined(nimdoc)): {.error: "This module requires one of --mm:arc / --mm:atomicArc / --mm:orc compilation flags".} @@ -113,8 +123,8 @@ type slots: int ## Number of item slots in the buffer head: Atomic[int] ## Write/enqueue/send index tail: Atomic[int] ## Read/dequeue/receive index - buffer: ptr UncheckedArray[byte] atomicCounter: Atomic[int] + buffer: ptr UncheckedArray[byte] # ------------------------------------------------------------------------------ @@ -180,19 +190,33 @@ proc freeChannel(chan: ChannelRaw) = # MPMC Channels (Multi-Producer Multi-Consumer) # ------------------------------------------------------------------------------ -proc channelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bool): bool = +template incrWriteIndex(chan: ChannelRaw) = + atomicInc(chan.head) + if chan.getHead() == 2 * chan.slots: + chan.setHead(0) + +template incrReadIndex(chan: ChannelRaw) = + atomicInc(chan.tail) + if chan.getTail() == 2 * chan.slots: + chan.setTail(0) + +proc channelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bool, overwrite: bool): bool = assert not chan.isNil assert not data.isNil when not blocking: - if chan.isFull(): return false + if chan.isFull() and not overwrite: return false acquire(chan.lock) # check for when another thread was faster to fill when blocking: - while chan.isFull(): - wait(chan.spaceAvailableCV, chan.lock) + if chan.isFull(): + if overwrite: + incrReadIndex(chan) + else: + while chan.isFull(): + wait(chan.spaceAvailableCV, chan.lock) else: if chan.isFull(): release(chan.lock) @@ -200,15 +224,15 @@ proc channelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bo assert not chan.isFull() - let writeIdx = if chan.getHead() < chan.slots: + let writeIdx = + if chan.getHead() < chan.slots: chan.getHead() else: chan.getHead() - chan.slots copyMem(chan.buffer[writeIdx * size].addr, data, size) - atomicInc(chan.head) - if chan.getHead() == 2 * chan.slots: - chan.setHead(0) + + incrWriteIndex(chan) signal(chan.dataAvailableCV) release(chan.lock) @@ -234,16 +258,15 @@ proc channelReceive(chan: ChannelRaw, data: pointer, size: int, blocking: static assert not chan.isEmpty() - let readIdx = if chan.getTail() < chan.slots: + let readIdx = + if chan.getTail() < chan.slots: chan.getTail() else: chan.getTail() - chan.slots copyMem(data, chan.buffer[readIdx * size].addr, size) - atomicInc(chan.tail) - if chan.getTail() == 2 * chan.slots: - chan.setTail(0) + incrReadIndex(chan) signal(chan.spaceAvailableCV) release(chan.lock) @@ -298,7 +321,7 @@ proc trySend*[T](c: Chan[T], src: sink Isolated[T]): bool {.inline.} = ## ## Returns `false` if the message was not sent because the number of pending ## messages in the channel exceeded its capacity. - result = channelSend(c.d, src.addr, sizeof(T), false) + result = channelSend(c.d, src.addr, sizeof(T), false, false) if result: wasMoved(src) @@ -326,7 +349,7 @@ proc tryTake*[T](c: Chan[T], src: var Isolated[T]): bool {.inline.} = ## ## Returns `false` if the message was not sent because the number of pending ## messages in the channel exceeded its capacity. - result = channelSend(c.d, src.addr, sizeof(T), false) + result = channelSend(c.d, src.addr, sizeof(T), false, false) if result: wasMoved(src) @@ -353,7 +376,7 @@ proc send*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} = ## messages from the channel are removed. when defined(gcOrc) and defined(nimSafeOrcSend): GC_runOrc() - discard channelSend(c.d, src.addr, sizeof(T), true) + discard channelSend(c.d, src.addr, sizeof(T), true, false) wasMoved(src) template send*[T](c: Chan[T]; src: T) = @@ -361,6 +384,21 @@ template send*[T](c: Chan[T]; src: T) = mixin isolate send(c, isolate(src)) +proc push*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} = + ## Pushes the message `src` to the channel `c`. + ## This is a non-blocking operation that overwrites the oldest message if the channel is full. + ## + ## The memory of `src` is moved, not copied. + when defined(gcOrc) and defined(nimSafeOrcSend): + GC_runOrc() + discard channelSend(c.d, src.addr, sizeof(T), true, overwrite=true) + wasMoved(src) + +template push*[T](c: Chan[T]; src: T) = + ## Helper template for `push`. + mixin isolate + push(c, isolate(src)) + proc recv*[T](c: Chan[T], dst: var T) {.inline.} = ## Receives a message from the channel `c` and fill `dst` with its value. ##