Skip to content

Commit fb838f3

Browse files
authored
fix - java cdk polling timeout logging (#64888)
1 parent dbaa2dc commit fb838f3

File tree

2 files changed

+7
-11
lines changed

2 files changed

+7
-11
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.48.17
1+
version=0.48.18

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,14 @@ class DebeziumRecordIterator<T>(
7171
// The following logic incorporates heartbeat:
7272
// 1. Wait on queue either the configured time first or 1 min after a record received
7373
// 2. If nothing came out of queue finish sync
74-
// 3. If received heartbeat: check if hearbeat_lsn reached target or hasn't changed in a while
74+
// 3. If received heartbeat: check if heartbeat_lsn reached target or hasn't changed in a while
7575
// finish sync
7676
// 4. If change event lsn reached target finish sync
7777
// 5. Otherwise check message queue again
7878
override fun computeNext(): ChangeEventWithMetadata? {
7979
// keep trying until the publisher is closed or until the queue is empty. the latter case is
8080
// possible when the publisher has shutdown but the consumer has not yet processed all
8181
// messages it emitted.
82-
val instantBeforeSync = Instant.now()
8382
while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) {
8483
val next: ChangeEvent<String?, String?>?
8584
val waitTime =
@@ -149,19 +148,16 @@ class DebeziumRecordIterator<T>(
149148

150149
val heartbeatPos = getHeartbeatPosition(next)
151150
val isProgressing = heartbeatPos != lastHeartbeatPosition
152-
val instantSyncTime: Duration = Duration.between(instantBeforeSync, Instant.now())
153-
val debeziumWaitingTimeRemaining = waitTime.seconds - instantSyncTime.toSeconds()
154151
LOGGER.info {
155-
"CDC events queue poll(): " +
152+
"CDC events queue poll(): returned a heartbeat event, " +
156153
if (isProgressing) {
157-
"returned a heartbeat event, " + "progressing to $heartbeatPos."
154+
"progressing to $heartbeatPos."
158155
} else {
159-
"no progress since last heartbeat. Will continue polling until timeout is reached. Time remaining in seconds: ${debeziumWaitingTimeRemaining}."
156+
"no progress since last heartbeat."
160157
}
161158
}
162-
// wrap up sync if heartbeat position crossed the target OR heartbeat position
163-
// hasn't changed for
164-
// too long
159+
// wrap up sync if heartbeat position crossed the target OR heartbeat's position
160+
// hasn't changed for too long
165161
if (targetPosition.reachedTargetPosition(heartbeatPos)) {
166162
requestClose(
167163
"Closing: Heartbeat indicates sync is done by reaching the target position",

0 commit comments

Comments
 (0)