Skip to content

Commit 5dfdc7c

Browse files
author
Kyle McClellan
committed
Extend source completion
* Wait until safe to dispose consumer
1 parent aab87dd commit 5dfdc7c

File tree

1 file changed

+39
-1
lines changed

1 file changed

+39
-1
lines changed

Confluent.Kafka.Dataflow/Internal/DataflowBlockExtensions.cs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@ public static CancellationTokenSource GetCompletionToken(this IDataflowBlock blo
2424
}
2525

2626
public static IReceivableSourceBlock<T> BeginWith<T>(this IReceivableSourceBlock<T> source, Lazy<Task> action) =>
27-
new LazySource<T>(GetTrigger(source, action));
27+
new LazySource<T>(GetTrigger(source, action)).ContinueWith(action);
2828

2929
public static ITargetBlock<T> BeginWith<T>(this ITargetBlock<T> target, Lazy<Task> action) =>
3030
new LazyTarget<T>(GetTrigger(target, action)).ContinueWith(action);
3131

32+
public static IReceivableSourceBlock<T> ContinueWith<T>(this IReceivableSourceBlock<T> source, Lazy<Task> action) =>
33+
new ExtendedSource<T>(source, ContinueWithAsync(source.Completion, action));
34+
3235
public static ITargetBlock<T> ContinueWith<T>(this ITargetBlock<T> target, Lazy<Task> action) =>
3336
new ExtendedTarget<T>(target, ContinueWithAsync(target.Completion, action));
3437

@@ -153,6 +156,41 @@ protected T Value
153156
public void Fault(Exception exception) => this.Value.Fault(exception);
154157
}
155158

159+
class ExtendedSource<T> : IReceivableSourceBlock<T>
160+
{
161+
private readonly IReceivableSourceBlock<T> source;
162+
163+
public ExtendedSource(IReceivableSourceBlock<T> source, Task completion)
164+
{
165+
this.source = source;
166+
this.Completion = completion;
167+
}
168+
169+
public Task Completion { get; }
170+
171+
public bool TryReceive(Predicate<T>? filter, [MaybeNullWhen(false)] out T item) =>
172+
this.source.TryReceive(out item);
173+
174+
public bool TryReceiveAll([NotNullWhen(true)] out IList<T>? items) =>
175+
this.source.TryReceiveAll(out items);
176+
177+
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) =>
178+
this.source.LinkTo(target, linkOptions);
179+
180+
public T? ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed) =>
181+
this.source.ConsumeMessage(messageHeader, target, out messageConsumed);
182+
183+
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target) =>
184+
this.source.ReserveMessage(messageHeader, target);
185+
186+
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target) =>
187+
this.source.ReleaseReservation(messageHeader, target);
188+
189+
public void Complete() => this.source.Complete();
190+
191+
public void Fault(Exception exception) => this.source.Fault(exception);
192+
}
193+
156194
class ExtendedTarget<T> : ITargetBlock<T>
157195
{
158196
private readonly ITargetBlock<T> target;

0 commit comments

Comments
 (0)