Skip to content

Commit 400ce6d

Browse files
committed
Producer target block
1 parent f475614 commit 400ce6d

File tree

4 files changed

+145
-0
lines changed

4 files changed

+145
-0
lines changed

Confluent.Kafka.Dataflow/BlockOptions.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
namespace Confluent.Kafka.Dataflow
22
{
3+
using System;
34
using System.Threading.Tasks.Dataflow;
45

56
/// <summary>
@@ -19,4 +20,15 @@ public class ConsumeBlockOptions : DataflowBlockOptions
1920
public class OffsetBlockOptions : ExecutionDataflowBlockOptions
2021
{
2122
}
23+
24+
/// <summary>
25+
/// Dataflow block options for producing to Kafka.
26+
/// </summary>
27+
public class ProduceBlockOptions : ExecutionDataflowBlockOptions
28+
{
29+
/// <summary>
30+
/// Gets or sets the handler for produced message offsets.
31+
/// </summary>
32+
public Action<TopicPartitionOffset>? OffsetHandler { get; set; }
33+
}
2234
}

Confluent.Kafka.Dataflow/ClientExtensions.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,28 @@ public static ITargetBlock<TopicPartitionOffset> AsOffsetBlock<TKey, TValue>(
5050
x => consumer.StoreOffset(new TopicPartitionOffset(x.TopicPartition, x.Offset + 1)),
5151
options ?? new());
5252
}
53+
54+
/// <summary>
55+
/// Represents a producer as a target block for Kafka messages.
56+
/// </summary>
57+
/// <remarks>
58+
/// A producer can be represented as multiple targets.
59+
/// </remarks>
60+
/// <typeparam name="TKey">The producer key type.</typeparam>
61+
/// <typeparam name="TValue">The producer value type.</typeparam>
62+
/// <param name="producer">The producer.</param>
63+
/// <param name="topicPartition">The topic/partition receiving the messages. Use <see cref="Partition.Any"/> for automatic partitioning.</param>
64+
/// <param name="options">Block options for producing.</param>
65+
/// <returns>The producer target block.</returns>
66+
public static ITargetBlock<Message<TKey, TValue>> AsTargetBlock<TKey, TValue>(
67+
this IProducer<TKey, TValue> producer,
68+
TopicPartition topicPartition,
69+
ProduceBlockOptions? options = null)
70+
{
71+
return new ProduceBlock<TKey, TValue>(
72+
producer ?? throw new ArgumentNullException(nameof(producer)),
73+
topicPartition ?? throw new ArgumentNullException(nameof(topicPartition)),
74+
options ?? new ProduceBlockOptions());
75+
}
5376
}
5477
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
namespace Confluent.Kafka.Dataflow.Internal
2+
{
3+
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using System.Threading.Tasks.Dataflow;
7+
8+
class ProduceBlock<TKey, TValue> : ITargetBlock<Message<TKey, TValue>>
9+
{
10+
readonly ITargetBlock<Message<TKey, TValue>> target;
11+
readonly Action<DeliveryReport<TKey, TValue>> deliveryHandler;
12+
13+
readonly IProducer<TKey, TValue> producer;
14+
readonly TopicPartition topicPartition;
15+
readonly ProduceBlockOptions options;
16+
17+
public ProduceBlock(IProducer<TKey, TValue> producer, TopicPartition topicPartition, ProduceBlockOptions options)
18+
{
19+
this.producer = producer;
20+
this.topicPartition = topicPartition;
21+
this.options = options;
22+
23+
this.deliveryHandler = this.OnDelivery;
24+
this.target = new ActionBlock<Message<TKey, TValue>>(this.Produce, options);
25+
26+
var flushTask = this.target.Completion.ContinueWith(
27+
(task, obj) =>
28+
{
29+
var block = (ProduceBlock<TKey, TValue>)obj!;
30+
block.producer.Flush(block.options.CancellationToken);
31+
},
32+
this,
33+
this.options.CancellationToken,
34+
TaskContinuationOptions.OnlyOnRanToCompletion,
35+
this.options.TaskScheduler);
36+
37+
this.Completion = Task.Factory.ContinueWhenAll(
38+
new[] { this.target.Completion, flushTask },
39+
t => t[0].IsFaulted ? t[0] : t[1],
40+
CancellationToken.None,
41+
TaskContinuationOptions.None,
42+
this.options.TaskScheduler).Unwrap();
43+
}
44+
45+
public Task Completion { get; }
46+
47+
public DataflowMessageStatus OfferMessage(
48+
DataflowMessageHeader messageHeader,
49+
Message<TKey, TValue> messageValue,
50+
ISourceBlock<Message<TKey, TValue>>? source,
51+
bool consumeToAccept)
52+
{
53+
return this.target.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
54+
}
55+
56+
void Produce(Message<TKey, TValue> message) =>
57+
this.producer.Produce(this.topicPartition, message, this.deliveryHandler);
58+
59+
void OnDelivery(DeliveryReport<TKey, TValue> report)
60+
{
61+
if (report.Error.IsError)
62+
{
63+
this.target.Fault(new ProduceException<TKey, TValue>(report.Error, report));
64+
}
65+
else
66+
{
67+
this.options.OffsetHandler?.Invoke(report.TopicPartitionOffset);
68+
}
69+
}
70+
71+
public void Complete() => this.target.Complete();
72+
73+
public void Fault(Exception exception) => this.target.Fault(exception);
74+
}
75+
}

README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,38 @@ var processor = new TransformBlock<Tuple<TopicPartitionOffset, Message<string, s
110110
var offsetTarget = consumer.AsOffsetBlock();
111111
processor.LinkTo(offsetTarget, linkOptions);
112112
```
113+
114+
### Producing using `ITargetBlock<T>`
115+
116+
Use `IProducer<TKey, TValue>.AsTargetBlock(...)` to direct a message pipeline into a destination Kafka topic:
117+
118+
```c#
119+
using System.Threading.Tasks.Dataflow;
120+
using Confluent.Kafka;
121+
using Confluent.Kafka.Dataflow;
122+
123+
using var producer = new ProducerBuilder<string, string>(
124+
new ProducerConfig
125+
{
126+
BootstrapServers = "localhost:9092",
127+
}).Build();
128+
129+
var target = producer.AsTargetBlock(new TopicPartition("my-topic", Partition.Any));
130+
131+
var generator = new TransformBlock<int, Message<string, string>>(
132+
i => new Message<string, string>
133+
{
134+
Key = i.ToString(),
135+
Value = $"Value #{i}"
136+
});
137+
138+
generator.LinkTo(target, new DataflowLinkOptions { PropagateCompletion = true });
139+
140+
for (var i = 0; i < 10; i++)
141+
{
142+
generator.Post(i);
143+
}
144+
145+
generator.Complete();
146+
await target.Completion;
147+
```

0 commit comments

Comments
 (0)