@@ -12,38 +12,6 @@ namespace Confluent.Kafka.Dataflow
1212 /// </summary>
1313 public static class ClientExtensions
1414 {
15- /// <summary>
16- /// Represents a consumer as a source block for Kafka messages.
17- /// </summary>
18- /// <remarks>
19- /// Consumer must be subscribed/assigned for messages to be received.
20- /// </remarks>
21- /// <typeparam name="TKey">The consumer key type.</typeparam>
22- /// <typeparam name="TValue">The consumer value type.</typeparam>
23- /// <param name="consumer">The consumer.</param>
24- /// <param name="options">Block options for consuming.</param>
25- /// <returns>The consumer source block.</returns>
26- [ Obsolete ( "Use overload with handler instead." ) ]
27- public static ISourceBlock < Message < TKey , TValue > > AsSourceBlock < TKey , TValue > (
28- this IConsumer < TKey , TValue > consumer ,
29- ConsumeBlockOptions ? options = null )
30- {
31- var loader = new MessageLoader < TKey , TValue > ( consumer ?? throw new ArgumentNullException ( nameof ( consumer ) ) ) ;
32-
33- if ( options ? . OffsetTarget != null )
34- {
35- loader . OnConsumed += ( _ , offset ) =>
36- {
37- if ( ! options . OffsetTarget . Post ( offset ) )
38- {
39- throw new InvalidOperationException ( "Target rejected offset!" ) ;
40- }
41- } ;
42- }
43-
44- return new CustomBlock < Message < TKey , TValue > > ( loader . Load , options ?? new ( ) ) ;
45- }
46-
4715 /// <summary>
4816 /// Represents a consumer as a source block for Kafka messages.
4917 /// </summary>
@@ -135,91 +103,6 @@ void Store(Message<TKey, TValue> message, TopicPartitionOffset offset)
135103 return new CustomBlock < Message < TKey , TValue > > ( loader . Load , options ) ;
136104 }
137105
138- /// <summary>
139- /// Represents a consumer as a target block for processed Kafka offsets.
140- /// </summary>
141- /// <typeparam name="TKey">The consumer key type.</typeparam>
142- /// <typeparam name="TValue">The consumer value type.</typeparam>
143- /// <param name="consumer">The consumer.</param>
144- /// <param name="options">Block options for processing.</param>
145- /// <returns>The consumer offset block.</returns>
146- [ Obsolete ( "Use AsSourceBlock<TKey, TValue>(...) with commit target instead." ) ]
147- public static ITargetBlock < TopicPartitionOffset > AsOffsetBlock < TKey , TValue > (
148- this IConsumer < TKey , TValue > consumer ,
149- OffsetBlockOptions ? options = null )
150- {
151- if ( consumer == null )
152- {
153- throw new ArgumentNullException ( nameof ( consumer ) ) ;
154- }
155-
156- options ??= new ( ) ;
157-
158- var target = new ActionBlock < TopicPartitionOffset > (
159- tpo => consumer . StoreOffset ( new TopicPartitionOffset ( tpo . TopicPartition , tpo . Offset + 1 ) ) ,
160- options ) ;
161-
162- return new ContinueBlock < TopicPartitionOffset > (
163- target ,
164- ( ) => Task . Factory . StartNew (
165- consumer . Commit ,
166- options . CancellationToken ,
167- TaskCreationOptions . LongRunning ,
168- options . TaskScheduler ) ,
169- options ) ;
170- }
171-
172- /// <summary>
173- /// Represents a producer as a target block for Kafka messages.
174- /// </summary>
175- /// <remarks>
176- /// A producer can be represented as multiple targets.
177- /// </remarks>
178- /// <typeparam name="TKey">The producer key type.</typeparam>
179- /// <typeparam name="TValue">The producer value type.</typeparam>
180- /// <param name="producer">The producer.</param>
181- /// <param name="topicPartition">
182- /// The topic/partition receiving the messages. Use <see cref="Partition.Any"/> for automatic partitioning.
183- /// </param>
184- /// <param name="options">Block options for producing.</param>
185- /// <returns>The producer target block.</returns>
186- [ Obsolete ( "Use overload with handler instead." ) ]
187- public static ITargetBlock < Message < TKey , TValue > > AsTargetBlock < TKey , TValue > (
188- this IProducer < TKey , TValue > producer ,
189- TopicPartition topicPartition ,
190- ProduceBlockOptions ? options = null )
191- {
192- if ( producer == null )
193- {
194- throw new ArgumentNullException ( nameof ( producer ) ) ;
195- }
196-
197- if ( topicPartition == null )
198- {
199- throw new ArgumentNullException ( nameof ( topicPartition ) ) ;
200- }
201-
202- options ??= new ( ) ;
203-
204- return new ActionBlock < Message < TKey , TValue > > (
205- async message =>
206- {
207- var result = await producer . ProduceAsync ( topicPartition , message ) . ConfigureAwait ( false ) ;
208- options ? . OffsetHandler ? . Invoke ( result . TopicPartitionOffset ) ;
209- } ,
210- new ( )
211- {
212- BoundedCapacity = options . BoundedCapacity ,
213- CancellationToken = options . CancellationToken ,
214- EnsureOrdered = options . EnsureOrdered ,
215- NameFormat = options . NameFormat ,
216- MaxMessagesPerTask = options . MaxMessagesPerTask ,
217- TaskScheduler = options . TaskScheduler ,
218- MaxDegreeOfParallelism = DataflowBlockOptions . Unbounded ,
219- SingleProducerConstrained = false ,
220- } ) ;
221- }
222-
223106 /// <summary>
224107 /// Represents a producer as a target block for Kafka messages.
225108 /// </summary>
0 commit comments