Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
nimcache/
nimblecache/
htmldocs/
tests/tsmartptrsleak
tests/tchannels_simple
*
!*/
!*.*
*.dSYM/
43 changes: 43 additions & 0 deletions tests/tchannels_ring.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import threading/channels
import std/unittest

const
BufferSize = 3

suite "Ring Buffer Channel Tests":
test "Non-blocking ring buffer behavior":

proc fillBuffer(n: int): seq[int] =
var chan = newChan[int](BufferSize)
# Fill the buffer
for i in 0..<BufferSize+n:
chan.push(i)

# Receive values - should get BufferSize as first value
var values: seq[int]
for i in 0..<BufferSize:
var x: int
if not chan.tryRecv(x):
break
values.add(x)

# Verify we got the most recent values
result = values

check fillBuffer(0) == @[0, 1, 2]
check fillBuffer(1) == @[1, 2, 3]
check fillBuffer(2) == @[2, 3, 4]
check fillBuffer(3) == @[3, 4, 5]
check fillBuffer(4) == @[4, 5, 6]
check fillBuffer(5) == @[5, 6, 7]
check fillBuffer(6) == @[6, 7, 8]
check fillBuffer(7) == @[7, 8, 9]
check fillBuffer(8) == @[8, 9, 10]

test "Non-blocking ring buffer behavior with size 1":
var chan = newChan[int](1)
chan.push(1)
chan.push(2)
var x: int
check chan.tryRecv(x)
check x == 2
74 changes: 56 additions & 18 deletions threading/channels.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
## the underlying resources and synchronization. It has to be initialized using
## the `newChan` proc. Sending and receiving operations are provided by the
## blocking `send` and `recv` procs, and non-blocking `trySend` and `tryRecv`
## procs. Send operations add messages to the channel, receiving operations
## remove them.
## procs. For ring buffer behavior, use the `push` proc rather than `send`.
## Send operations add messages to the channel, receiving operations remove them,
## while `push` adds a message or overwrites the oldest message if the channel is full.
##
##
## See also:
## * [std/isolation](https://nim-lang.org/docs/isolation.html)
Expand Down Expand Up @@ -97,6 +99,14 @@ runnableExamples("--threads:on --gc:orc"):
# At least one non-successful attempt to receive the message had to occur.
assert messages.len >= 2

block example_non_blocking_overwrite:
var chanRingBuffer = newChan[string](elements = 1)
chanRingBuffer.push("Hello")
chanRingBuffer.push("World")
var msg = ""
assert chanRingBuffer.tryRecv(msg)
assert msg == "World"

when not (defined(gcArc) or defined(gcOrc) or defined(gcAtomicArc) or defined(nimdoc)):
{.error: "This module requires one of --mm:arc / --mm:atomicArc / --mm:orc compilation flags".}

Expand All @@ -113,8 +123,8 @@ type
slots: int ## Number of item slots in the buffer
head: Atomic[int] ## Write/enqueue/send index
tail: Atomic[int] ## Read/dequeue/receive index
buffer: ptr UncheckedArray[byte]
atomicCounter: Atomic[int]
buffer: ptr UncheckedArray[byte]

# ------------------------------------------------------------------------------

Expand Down Expand Up @@ -180,35 +190,49 @@ proc freeChannel(chan: ChannelRaw) =
# MPMC Channels (Multi-Producer Multi-Consumer)
# ------------------------------------------------------------------------------

proc channelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bool): bool =
template incrWriteIndex(chan: ChannelRaw) =
atomicInc(chan.head)
if chan.getHead() == 2 * chan.slots:
chan.setHead(0)

template incrReadIndex(chan: ChannelRaw) =
atomicInc(chan.tail)
if chan.getTail() == 2 * chan.slots:
chan.setTail(0)

proc channelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bool, overwrite: bool): bool =
assert not chan.isNil
assert not data.isNil

when not blocking:
if chan.isFull(): return false
if chan.isFull() and not overwrite: return false

acquire(chan.lock)

# check for when another thread was faster to fill
when blocking:
while chan.isFull():
wait(chan.spaceAvailableCV, chan.lock)
if chan.isFull():
if overwrite:
incrReadIndex(chan)
else:
while chan.isFull():
wait(chan.spaceAvailableCV, chan.lock)
else:
if chan.isFull():
release(chan.lock)
return false

assert not chan.isFull()

let writeIdx = if chan.getHead() < chan.slots:
let writeIdx =
if chan.getHead() < chan.slots:
chan.getHead()
else:
chan.getHead() - chan.slots

copyMem(chan.buffer[writeIdx * size].addr, data, size)
atomicInc(chan.head)
if chan.getHead() == 2 * chan.slots:
chan.setHead(0)

incrWriteIndex(chan)

signal(chan.dataAvailableCV)
release(chan.lock)
Expand All @@ -234,16 +258,15 @@ proc channelReceive(chan: ChannelRaw, data: pointer, size: int, blocking: static

assert not chan.isEmpty()

let readIdx = if chan.getTail() < chan.slots:
let readIdx =
if chan.getTail() < chan.slots:
chan.getTail()
else:
chan.getTail() - chan.slots

copyMem(data, chan.buffer[readIdx * size].addr, size)

atomicInc(chan.tail)
if chan.getTail() == 2 * chan.slots:
chan.setTail(0)
incrReadIndex(chan)

signal(chan.spaceAvailableCV)
release(chan.lock)
Expand Down Expand Up @@ -298,7 +321,7 @@ proc trySend*[T](c: Chan[T], src: sink Isolated[T]): bool {.inline.} =
##
## Returns `false` if the message was not sent because the number of pending
## messages in the channel exceeded its capacity.
result = channelSend(c.d, src.addr, sizeof(T), false)
result = channelSend(c.d, src.addr, sizeof(T), false, false)
if result:
wasMoved(src)

Expand Down Expand Up @@ -326,7 +349,7 @@ proc tryTake*[T](c: Chan[T], src: var Isolated[T]): bool {.inline.} =
##
## Returns `false` if the message was not sent because the number of pending
## messages in the channel exceeded its capacity.
result = channelSend(c.d, src.addr, sizeof(T), false)
result = channelSend(c.d, src.addr, sizeof(T), false, false)
if result:
wasMoved(src)

Expand All @@ -353,14 +376,29 @@ proc send*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} =
## messages from the channel are removed.
when defined(gcOrc) and defined(nimSafeOrcSend):
GC_runOrc()
discard channelSend(c.d, src.addr, sizeof(T), true)
discard channelSend(c.d, src.addr, sizeof(T), true, false)
wasMoved(src)

template send*[T](c: Chan[T]; src: T) =
## Helper template for `send`.
mixin isolate
send(c, isolate(src))

proc push*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} =
## Pushes the message `src` to the channel `c`.
## This is a non-blocking operation that overwrites the oldest message if the channel is full.
##
## The memory of `src` is moved, not copied.
when defined(gcOrc) and defined(nimSafeOrcSend):
GC_runOrc()
discard channelSend(c.d, src.addr, sizeof(T), true, overwrite=true)
wasMoved(src)

template push*[T](c: Chan[T]; src: T) =
## Helper template for `push`.
mixin isolate
push(c, isolate(src))

proc recv*[T](c: Chan[T], dst: var T) {.inline.} =
## Receives a message from the channel `c` and fill `dst` with its value.
##
Expand Down
Loading