Skip to content

Commit 8bdafb4

Browse files
KAFKA-7699: Anchored punctuation (#19937)
This PR implements Processor API interfaces for using Anchored punctuation. As described in https://cwiki.apache.org/confluence/display/KAFKA/KIP-1146%3A+Anchored+punctuation, anchored punctuations allows the users of the PAPI to set both a start time and an interval for when the punctuations should be triggered. The changes have been based on all the places where `org.apache.kafka.streams.processor.internals.ProcessorContext.schedule()` is used. The changes consist of introducing new interfaces as the punctuation time logic in the `org.apache.kafka.streams.processor.internals.PunctuationSchedule` already supports calculating a new punctuation time based on a start time and an interval. The original `schedule()` implementations have been refactored into using the new `schedule()` implementation supporting the `startTime` parameter. For the original implementations, the `startTime` parameter has been sat to `null`, effectively using the "method overloading" programming technique. Reviewers: Bill Bejeck<[email protected]>
1 parent 169e211 commit 8bdafb4

File tree

17 files changed

+440
-12
lines changed

17 files changed

+440
-12
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.io.File;
3232
import java.time.Duration;
33+
import java.time.Instant;
3334
import java.util.Map;
3435

3536
/**
@@ -157,6 +158,46 @@ Cancellable schedule(final Duration interval,
157158
final PunctuationType type,
158159
final Punctuator callback);
159160

161+
/**
162+
* Schedule a periodic operation for processors. A processor may call this method during a
163+
* {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}'s
164+
* {@link org.apache.kafka.streams.kstream.ValueTransformerWithKey#init(ProcessorContext) initialization} or
165+
* {@link org.apache.kafka.streams.kstream.ValueTransformerWithKey#transform(Object, Object) processing} to
166+
* schedule a periodic callback &mdash; called a punctuation &mdash; to {@link Punctuator#punctuate(long)}.
167+
* The type parameter controls what notion of time is used for punctuation:
168+
* <ul>
169+
* <li>{@link PunctuationType#STREAM_TIME} &mdash; uses "stream time", which is advanced by the processing of messages
170+
* in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use.
171+
* The first punctuation will be triggered by the first record that is processed.
172+
* <b>NOTE:</b> Only advances as messages arrive</li>
173+
* <li>{@link PunctuationType#WALL_CLOCK_TIME} &mdash; uses system time (the wall-clock time),
174+
* which advances independent of whether new messages arrive.
175+
* The first punctuation will be triggered after interval has elapsed.
176+
* <b>NOTE:</b> This is best effort only as its granularity is limited by how long an iteration of the
177+
* processing loop takes to complete</li>
178+
* </ul>
179+
*
180+
* <b>Skipping punctuations:</b> Punctuations will not be triggered more than once at any given timestamp.
181+
* This means that "missed" punctuation will be skipped.
182+
* It's possible to "miss" a punctuation if:
183+
* <ul>
184+
* <li>with {@link PunctuationType#STREAM_TIME}, when stream time advances more than interval</li>
185+
* <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too short interval, ...</li>
186+
* </ul>
187+
*
188+
* @param startTime the time for the first punctuation.
189+
* The subsequent trigger times are calculated using the {@code startTime} and the {@code interval}
190+
* @param interval the time interval between punctuations (supported minimum is 1 millisecond)
191+
* @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME}
192+
* @param callback a function consuming timestamps representing the current stream or system time
193+
* @return a handle allowing cancellation of the punctuation schedule established by this method
194+
* @throws IllegalArgumentException if the interval is not representable in milliseconds
195+
*/
196+
Cancellable schedule(final Instant startTime,
197+
final Duration interval,
198+
final PunctuationType type,
199+
final Punctuator callback);
200+
160201
/**
161202
* Forward a key/value pair to all downstream processors.
162203
* Used the input record's timestamp as timestamp for the output record.

streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import java.io.File;
3434
import java.time.Duration;
35+
import java.time.Instant;
3536
import java.util.Map;
3637
import java.util.Optional;
3738

@@ -159,6 +160,46 @@ Cancellable schedule(final Duration interval,
159160
final PunctuationType type,
160161
final Punctuator callback);
161162

163+
/**
164+
* Schedule a periodic operation for processors. A processor may call this method during
165+
* {@link Processor#init(ProcessorContext) initialization},
166+
* {@link Processor#process(Record) processing},
167+
* {@link FixedKeyProcessor#init(FixedKeyProcessorContext) initialization}, or
168+
* {@link FixedKeyProcessor#process(FixedKeyRecord) processing} to
169+
* schedule a periodic callback &mdash; called a punctuation &mdash; to {@link Punctuator#punctuate(long)}.
170+
* The type parameter controls what notion of time is used for punctuation:
171+
* <ul>
172+
* <li>{@link PunctuationType#STREAM_TIME} &mdash; uses "stream time", which is advanced by the processing of messages
173+
* in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use.
174+
* The first punctuation will be triggered by the first record that is processed.
175+
* <b>NOTE:</b> Only advances as messages arrive</li>
176+
* <li>{@link PunctuationType#WALL_CLOCK_TIME} &mdash; uses system time (the wall-clock time),
177+
* which advances independent of whether new messages arrive.
178+
* The first punctuation will be triggered after interval has elapsed.
179+
* <b>NOTE:</b> This is best effort only as its granularity is limited by how long an iteration of the
180+
* processing loop takes to complete</li>
181+
* </ul>
182+
*
183+
* <b>Skipping punctuations:</b> Punctuations will not be triggered more than once at any given timestamp.
184+
* This means that "missed" punctuation will be skipped.
185+
* It's possible to "miss" a punctuation if:
186+
* <ul>
187+
* <li>with {@link PunctuationType#STREAM_TIME}, when stream time advances more than interval</li>
188+
* <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too short interval, ...</li>
189+
* </ul>
190+
* @param startTime the time for the first punctuation. The subsequent trigger times are calculated
191+
* using the {@code startTime} and the {@code interval}
192+
* @param interval the time interval between punctuations (supported minimum is 1 millisecond)
193+
* @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME}
194+
* @param callback a function consuming timestamps representing the current stream or system time
195+
* @return a handle allowing cancellation of the punctuation schedule established by this method
196+
* @throws IllegalArgumentException if the interval is not representable in milliseconds
197+
*/
198+
Cancellable schedule(final Instant startTime,
199+
final Duration interval,
200+
final PunctuationType type,
201+
final Punctuator callback);
202+
162203
/**
163204
* Request a commit. Note that calling {@code commit()} is only a request for a commit, but it does not execute one.
164205
* Hence, when {@code commit()} returns, no commit was executed yet. However, Kafka Streams will commit as soon

streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import java.io.File;
3333
import java.time.Duration;
34+
import java.time.Instant;
3435
import java.util.Map;
3536
import java.util.Objects;
3637

@@ -96,6 +97,14 @@ public Cancellable schedule(final Duration interval,
9697
return delegate.schedule(interval, type, callback);
9798
}
9899

100+
@Override
101+
public Cancellable schedule(final Instant startTime,
102+
final Duration interval,
103+
final PunctuationType type,
104+
final Punctuator callback) {
105+
return delegate.schedule(startTime, interval, type, callback);
106+
}
107+
99108
@Override
100109
public <K, V> void forward(final K key, final V value) {
101110
throw new StreamsException(EXPLANATION);

streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
3434

3535
import java.time.Duration;
36+
import java.time.Instant;
3637

3738
import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.wrapWithReadWriteStore;
3839

@@ -133,6 +134,11 @@ public Cancellable schedule(final Duration interval, final PunctuationType type,
133134
throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
134135
}
135136

137+
@Override
138+
public Cancellable schedule(final Instant startTime, final Duration interval, final PunctuationType type, final Punctuator callback) {
139+
throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
140+
}
141+
136142
@Override
137143
public void logChange(final String storeName,
138144
final Bytes key,

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
4040

4141
import java.time.Duration;
42+
import java.time.Instant;
4243
import java.util.HashMap;
4344
import java.util.List;
4445
import java.util.Map;
@@ -313,7 +314,24 @@ public Cancellable schedule(final Duration interval,
313314
if (intervalMs < 1) {
314315
throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond.");
315316
}
316-
return streamTask.schedule(intervalMs, type, callback);
317+
return streamTask.schedule(intervalMs, type, callback); }
318+
319+
@Override
320+
public Cancellable schedule(
321+
final Instant startTime,
322+
final Duration interval,
323+
final PunctuationType type,
324+
final Punctuator callback) throws IllegalArgumentException {
325+
throwUnsupportedOperationExceptionIfStandby("schedule");
326+
final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, "interval");
327+
final long intervalMs = validateMillisecondDuration(interval, msgPrefix);
328+
if (intervalMs < 1) {
329+
throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond.");
330+
}
331+
if (startTime.isBefore(Instant.EPOCH)) {
332+
throw new IllegalArgumentException("The minimum supported start time is Instant.EPOCH.");
333+
}
334+
return streamTask.schedule(startTime, intervalMs, type, callback);
317335
}
318336

319337
@Override

streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ boolean isCancelled() {
6969
public PunctuationSchedule next(final long currTimestamp) {
7070
long nextPunctuationTime = timestamp + interval;
7171
if (currTimestamp >= nextPunctuationTime) {
72-
// we missed one ore more punctuations
72+
// we missed one or more punctuations
7373
// avoid scheduling a new punctuations immediately, this can happen:
7474
// - when using STREAM_TIME punctuation and there was a gap i.e., no data was
7575
// received for at least 2*interval

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.kafka.streams.state.internals.ThreadCache;
5454

5555
import java.io.IOException;
56+
import java.time.Instant;
5657
import java.util.Collections;
5758
import java.util.HashMap;
5859
import java.util.HashSet;
@@ -1185,15 +1186,27 @@ public Cancellable schedule(final long interval, final PunctuationType type, fin
11851186
switch (type) {
11861187
case STREAM_TIME:
11871188
// align punctuation to 0L, punctuate as soon as we have data
1188-
return schedule(0L, interval, type, punctuator);
1189+
return schedule(0L, interval, type, punctuator, false);
11891190
case WALL_CLOCK_TIME:
11901191
// align punctuation to now, punctuate after interval has elapsed
1191-
return schedule(time.milliseconds() + interval, interval, type, punctuator);
1192+
return schedule(time.milliseconds() + interval, interval, type, punctuator, false);
11921193
default:
11931194
throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
11941195
}
11951196
}
11961197

1198+
/**
1199+
* Schedules a punctuation for the processor
1200+
*
1201+
* @param startTime time of the first punctuation
1202+
* @param interval the interval in milliseconds
1203+
* @param type the punctuation type
1204+
* @throws IllegalStateException if the current node is not null
1205+
*/
1206+
public Cancellable schedule(final Instant startTime, final long interval, final PunctuationType type, final Punctuator punctuator) {
1207+
return schedule(startTime.toEpochMilli(), interval, type, punctuator, true);
1208+
}
1209+
11971210
/**
11981211
* Schedules a punctuation for the processor
11991212
*
@@ -1202,12 +1215,12 @@ public Cancellable schedule(final long interval, final PunctuationType type, fin
12021215
* @param type the punctuation type
12031216
* @throws IllegalStateException if the current node is not null
12041217
*/
1205-
private Cancellable schedule(final long startTime, final long interval, final PunctuationType type, final Punctuator punctuator) {
1218+
private Cancellable schedule(final long startTime, final long interval, final PunctuationType type, final Punctuator punctuator, final boolean anchored) {
12061219
if (processorContext.currentNode() == null) {
12071220
throw new IllegalStateException(String.format("%sCurrent node is null", logPrefix));
12081221
}
12091222

1210-
final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), startTime, interval, punctuator);
1223+
final PunctuationSchedule schedule = getInitialSchedule(startTime, interval, type, punctuator, anchored);
12111224

12121225
switch (type) {
12131226
case STREAM_TIME:
@@ -1222,6 +1235,15 @@ private Cancellable schedule(final long startTime, final long interval, final Pu
12221235
}
12231236
}
12241237

1238+
// For anchored schedule, we want to have all punctuations only fire on times based on combinations of startTime and interval
1239+
// This method ensures that the first anchored punctuation is not fired prematurely due to startTime < now
1240+
private PunctuationSchedule getInitialSchedule(final long startTime, final long interval, final PunctuationType type, final Punctuator punctuator, final boolean anchored) {
1241+
final PunctuationSchedule originalSchedule = new PunctuationSchedule(processorContext.currentNode(), startTime, interval, punctuator);
1242+
final long now = (type == PunctuationType.WALL_CLOCK_TIME) ? time.milliseconds() : streamTime();
1243+
1244+
return (anchored && startTime < now) ? originalSchedule.next(now) : originalSchedule;
1245+
}
1246+
12251247
/**
12261248
* Possibly trigger registered stream-time punctuation functions if
12271249
* current partition group timestamp has reached the defined stamp

streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.junit.jupiter.api.Test;
4545

4646
import java.time.Duration;
47+
import java.time.Instant;
4748
import java.util.Properties;
4849

4950
import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
@@ -221,6 +222,14 @@ public Cancellable schedule(final Duration interval,
221222
return null;
222223
}
223224

225+
@Override
226+
public Cancellable schedule(final Instant startTime,
227+
final Duration interval,
228+
final PunctuationType type,
229+
final Punctuator callback) {
230+
return null;
231+
}
232+
224233
@Override
225234
public <K, V> void forward(final Record<K, V> record) {}
226235

0 commit comments

Comments
 (0)