Skip to content

Commit 16e7780

Browse files
authored
Revert recent change to CDK function signature (#62052)
1 parent 7c3b0d1 commit 16e7780

File tree

9 files changed

+20
-39
lines changed

9 files changed

+20
-39
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,9 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.48.17 | 2025-06-26 | [\#62052](https://github.com/airbytehq/airbyte/pull/62015) | Revert change to CDK interface signature. |
177178
| 0.48.16 | 2025-06-27 | [\#61729](https://github.com/airbytehq/airbyte/pull/61729) | Make function used in testDiscover overridable. |
178-
| 0.48.15 | 2025-05-15 | [\#62015](https://github.com/airbytehq/airbyte/pull/62015) | Add an option for sources to exclude today's data. |
179+
| 0.48.15 | 2025-05-15 | [\#62015](https://github.com/airbytehq/airbyte/pull/62015) | Add an ooption for sources to exclude today's data. |
179180
| 0.48.14 | 2025-05-15 | [\#60251](https://github.com/airbytehq/airbyte/pull/60251) | Improve logging by reducing redundant log statements. |
180181
| 0.48.13 | 2025-05-06 | [\#59682](https://github.com/airbytehq/airbyte/pull/59682) | Reduce initial load timeout and improve logging. |
181182
| 0.48.9 | 2025-04-17 | [\#58132](https://github.com/airbytehq/airbyte/pull/58132) | Fix vulnerability in dependencies. |
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.48.16
1+
version=0.48.17

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,6 @@ abstract class AbstractJdbcSource<Datatype>(
614614
tableName: String,
615615
cursorInfo: CursorInfo,
616616
cursorFieldType: Datatype,
617-
excludeTodaysData: Boolean,
618617
): AutoCloseableIterator<AirbyteRecordData> {
619618
LOGGER.info { "Queueing query for table: $tableName" }
620619
val airbyteStream = AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName)
@@ -667,11 +666,11 @@ abstract class AbstractJdbcSource<Datatype>(
667666

668667
val sql =
669668
StringBuilder(
670-
when (excludeTodaysData && cursorInfo.cutoffTime != null) {
671-
true ->
672-
"SELECT $wrappedColumnNames FROM $fullTableName WHERE $quotedCursorField $operator ? AND $quotedCursorField < ?"
673-
false ->
669+
when (cursorInfo.cutoffTime) {
670+
null ->
674671
"SELECT $wrappedColumnNames FROM $fullTableName WHERE $quotedCursorField $operator ?"
672+
else ->
673+
"SELECT $wrappedColumnNames FROM $fullTableName WHERE $quotedCursorField $operator ? AND $quotedCursorField < ?"
675674
}
676675
)
677676
// if the connector emits intermediate states, the incremental query
@@ -691,7 +690,7 @@ abstract class AbstractJdbcSource<Datatype>(
691690
cursorInfo.cursor!!
692691
)
693692

694-
if (excludeTodaysData && cursorInfo.cutoffTime != null) {
693+
if (cursorInfo.cutoffTime != null) {
695694
sourceOperations.setCursorField(
696695
preparedStatement,
697696
2,
@@ -888,7 +887,6 @@ abstract class AbstractJdbcSource<Datatype>(
888887
table: TableInfo<CommonField<Datatype>>,
889888
stateManager: StateManager?,
890889
emittedAt: Instant,
891-
excludeTodaysData: Boolean,
892890
): AutoCloseableIterator<AirbyteMessage> {
893891
val iterator =
894892
super.createReadIterator(
@@ -898,7 +896,6 @@ abstract class AbstractJdbcSource<Datatype>(
898896
table,
899897
stateManager,
900898
emittedAt,
901-
excludeTodaysData,
902899
)
903900
return when (airbyteStream.syncMode) {
904901
INCREMENTAL -> augmentWithStreamStatus(airbyteStream, iterator)

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,6 @@ protected constructor(driverClassName: String) :
151151
fullyQualifiedTableNameToInfo,
152152
stateManager,
153153
emittedAt,
154-
isExcludeTodayDateForCursorIncremental(config)
155154
)
156155
val fullRefreshIterators =
157156
getFullRefreshIterators(
@@ -334,18 +333,15 @@ protected constructor(driverClassName: String) :
334333
stateManager,
335334
emittedAt,
336335
SyncMode.FULL_REFRESH,
337-
false
338336
)
339337
}
340338

341-
@JvmOverloads
342339
protected open fun getIncrementalIterators(
343340
database: Database,
344341
catalog: ConfiguredAirbyteCatalog,
345342
tableNameToTable: Map<String, TableInfo<CommonField<DataType>>>,
346343
stateManager: StateManager?,
347344
emittedAt: Instant,
348-
excludeTodaysData: Boolean = false,
349345
): List<AutoCloseableIterator<AirbyteMessage>> {
350346
return getSelectedIterators(
351347
database,
@@ -354,7 +350,6 @@ protected constructor(driverClassName: String) :
354350
stateManager,
355351
emittedAt,
356352
SyncMode.INCREMENTAL,
357-
excludeTodaysData
358353
)
359354
}
360355

@@ -377,7 +372,6 @@ protected constructor(driverClassName: String) :
377372
stateManager: StateManager?,
378373
emittedAt: Instant,
379374
syncMode: SyncMode,
380-
excludeTodaysData: Boolean,
381375
): List<AutoCloseableIterator<AirbyteMessage>> {
382376
val iteratorList: MutableList<AutoCloseableIterator<AirbyteMessage>> = ArrayList()
383377
for (airbyteStream in catalog!!.streams) {
@@ -401,7 +395,6 @@ protected constructor(driverClassName: String) :
401395
table,
402396
stateManager,
403397
emittedAt,
404-
excludeTodaysData,
405398
)
406399
iteratorList.add(tableReadIterator)
407400
}
@@ -420,15 +413,13 @@ protected constructor(driverClassName: String) :
420413
* @param emittedAt Time when data was emitted from the Source database
421414
* @return
422415
*/
423-
@JvmOverloads
424416
protected open fun createReadIterator(
425417
database: Database,
426418
airbyteStream: ConfiguredAirbyteStream,
427419
catalog: ConfiguredAirbyteCatalog?,
428420
table: TableInfo<CommonField<DataType>>,
429421
stateManager: StateManager?,
430422
emittedAt: Instant,
431-
excludeTodaysData: Boolean = false,
432423
): AutoCloseableIterator<AirbyteMessage> {
433424
val streamName = airbyteStream.stream.name
434425
val namespace = airbyteStream.stream.namespace
@@ -457,7 +448,6 @@ protected constructor(driverClassName: String) :
457448
table,
458449
cursorInfo.get(),
459450
emittedAt,
460-
excludeTodaysData
461451
)
462452
} else {
463453
// if no cursor is present then this is the first read for is the same as doing a
@@ -554,7 +544,6 @@ protected constructor(driverClassName: String) :
554544
table: TableInfo<CommonField<DataType>>,
555545
cursorInfo: CursorInfo,
556546
emittedAt: Instant,
557-
excludeTodaysData: Boolean,
558547
): AutoCloseableIterator<AirbyteMessage> {
559548
val streamName = airbyteStream.stream.name
560549
val namespace = airbyteStream.stream.namespace
@@ -578,7 +567,6 @@ protected constructor(driverClassName: String) :
578567
table.name,
579568
cursorInfo,
580569
cursorType,
581-
excludeTodaysData,
582570
)
583571

584572
return getMessageIterator(queryIterator, streamName, namespace, emittedAt.toEpochMilli())
@@ -804,7 +792,6 @@ protected constructor(driverClassName: String) :
804792
tableName: String,
805793
cursorInfo: CursorInfo,
806794
cursorFieldType: DataType,
807-
excludeTodaysData: Boolean = false,
808795
): AutoCloseableIterator<AirbyteRecordData>
809796

810797
protected open val stateEmissionFrequency: Int
@@ -832,9 +819,6 @@ protected constructor(driverClassName: String) :
832819
return AirbyteStateMessage.AirbyteStateType.STREAM
833820
}
834821

835-
// Added to allow excluding today's data from incremental cursor reads.
836-
open protected fun isExcludeTodayDateForCursorIncremental(config: JsonNode): Boolean = false
837-
838822
companion object {
839823
const val CHECK_TRACE_OPERATION_NAME: String = "check-operation"
840824
const val DISCOVER_TRACE_OPERATION_NAME: String = "discover-operation"

airbyte-integrations/connectors/source-mssql/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ plugins {
55
}
66

77
airbyteJavaConnector {
8-
cdkVersionRequired = '0.48.16'
8+
cdkVersionRequired = '0.48.17'
99
features = ['db-sources']
1010
useLocalCdk = false
1111
}

airbyte-integrations/connectors/source-mssql/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
12-
dockerImageTag: 4.2.2
12+
dockerImageTag: 4.2.3
1313
dockerRepository: airbyte/source-mssql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
1515
githubIssueLabel: source-mssql

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -415,8 +415,7 @@ protected void assertSqlServerAgentRunning(final JdbcDatabase database) throws S
415415
final @NotNull ConfiguredAirbyteCatalog catalog,
416416
final @NotNull Map<String, TableInfo<CommonField<JDBCType>>> tableNameToTable,
417417
final StateManager stateManager,
418-
final @NotNull Instant emittedAt,
419-
final @NotNull boolean excludeTodaysData) {
418+
final @NotNull Instant emittedAt) {
420419
final JsonNode sourceConfig = database.getSourceConfig();
421420
if (MssqlCdcHelper.isCdc(sourceConfig) && isAnyStreamIncrementalSyncMode(catalog)) {
422421
LOGGER.info("using OC + CDC");
@@ -427,7 +426,7 @@ protected void assertSqlServerAgentRunning(final JdbcDatabase database) throws S
427426
LOGGER.info("Syncing via Primary Key");
428427
final MssqlCursorBasedStateManager cursorBasedStateManager = new MssqlCursorBasedStateManager(stateManager.getRawStateMessages(), catalog);
429428

430-
if (excludeTodaysData) {
429+
if (isExcludeTodayDateForCursorIncremental(sourceConfig)) {
431430
setCutoffCursorTime(tableNameToTable, cursorBasedStateManager.getPairToCursorInfoMap());
432431
}
433432

@@ -457,15 +456,15 @@ protected void assertSqlServerAgentRunning(final JdbcDatabase database) throws S
457456
new ConfiguredAirbyteCatalog().withStreams(
458457
cursorBasedStreams.streamsForCursorBased()),
459458
tableNameToTable,
460-
cursorBasedStateManager, emittedAt, excludeTodaysData));
459+
cursorBasedStateManager, emittedAt));
461460

462461
return Stream.of(initialLoadIterator, cursorBasedIterator).flatMap(Collection::stream).collect(Collectors.toList());
463462

464463
}
465464
}
466465

467466
LOGGER.info("using CDC: {}", false);
468-
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt, excludeTodaysData);
467+
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt);
469468
}
470469

471470
private static void setCutoffCursorTime(@NotNull Map<String, TableInfo<CommonField<JDBCType>>> tableNameToTable,
@@ -488,7 +487,7 @@ private static void setCutoffCursorTime(@NotNull Map<String, TableInfo<CommonFie
488487
static void setCursorCutoffInfoForValue(CursorInfo cursorInfo, @NotNull CommonField<JDBCType> f, Instant nowInstant) {
489488
switch (f.getType()) {
490489
case JDBCType.DATE -> {
491-
final var instant = nowInstant.minus(1, ChronoUnit.DAYS).atOffset(ZoneOffset.UTC);
490+
final var instant = nowInstant.atOffset(ZoneOffset.UTC);
492491
cursorInfo.setCutoffTime(ISO_LOCAL_DATE.format(instant));
493492
}
494493
case JDBCType.TIMESTAMP -> {
@@ -705,8 +704,7 @@ public AutoCloseableIterator<AirbyteMessage> augmentWithStreamStatus(@NotNull fi
705704
return AutoCloseableIterators.concatWithEagerClose(starterStatus, streamItrator, completeStatus);
706705
}
707706

708-
@Override
709-
protected boolean isExcludeTodayDateForCursorIncremental(@NotNull JsonNode config) {
707+
private boolean isExcludeTodayDateForCursorIncremental(@NotNull JsonNode config) {
710708
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
711709
final JsonNode replicationConfig = config.get(LEGACY_REPLICATION_FIELD);
712710
if (MssqlCdcHelper.ReplicationMethod.valueOf(replicationConfig.get(METHOD_FIELD).asText()) == ReplicationMethod.STANDARD) {

airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ void testSetCursorCutoffInfoForValue() {
281281
// DATE
282282
CommonField<JDBCType> dateField = new CommonField<>("date_col", JDBCType.DATE);
283283
MssqlSource.setCursorCutoffInfoForValue(cursorInfo, dateField, now);
284-
assertEquals("2024-05-31", cursorInfo.getCutoffTime());
284+
assertEquals("2024-06-01", cursorInfo.getCutoffTime());
285285

286286
// TIMESTAMP
287287
cursorInfo = new CursorInfo(null, null, null, null);

docs/integrations/sources/mssql.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -445,9 +445,10 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura
445445

446446
| Version | Date | Pull Request | Subject |
447447
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
448+
| 4.2.3 | 2025-07-01 | [62052](https://github.com/airbytehq/airbyte/pull/62052) | Revert change to CDK interface signature. |
448449
| 4.2.2 | 2025-06-25 | [61729](https://github.com/airbytehq/airbyte/pull/61729) | Support the use of logical primary keys for CDC. |
449-
| 4.2.1 | 2025-06-23 | [62015](https://github.com/airbytehq/airbyte/pull/62015) | Fix previous merge. |
450-
| 4.2.0 | 2025-06-19 | [62015](https://github.com/airbytehq/airbyte/pull/61685) | Add an option to exclude today's data from cursor based incremental syncs when using temporal cursor (datetime). |
450+
| 4.2.1 | 2025-06-23 | [62015](https://github.com/airbytehq/airbyte/pull/62015) | Fix previous merge. Improve cutoff date handling |
451+
| 4.2.0 | 2025-06-19 | [62015](https://github.com/airbytehq/airbyte/pull/61685) | Add an option to exclude today's data from cursor based incremental syncs when using temporal cursor (datetime). |
451452
| 4.1.29 | 2025-06-03 | [61320](https://github.com/airbytehq/airbyte/pull/61320) | Add error handling for connection issues and adopt the latest CDK version. |
452453
| 4.1.28 | 2025-05-15 | [60311](https://github.com/airbytehq/airbyte/pull/60311) | Migrate to new gradle flow. |
453454
| 4.1.27 | 2025-04-28 | [59124](https://github.com/airbytehq/airbyte/pull/59124) | Fix _ab_cdc_event_serial_no datatype in addMetaDataToRowsFetchedOutsideDebezium method |

0 commit comments

Comments
 (0)