Skip to content

Commit ec71a1d

Browse files
committed
#660 Implement incremental offset commits for re-runs so normal runs can be ran after reruns..
1 parent a74ab20 commit ec71a1d

File tree

4 files changed

+172
-3
lines changed

4 files changed

+172
-3
lines changed

pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderIncrementalImpl.scala

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,14 @@ class MetastoreReaderIncrementalImpl(metastore: Metastore,
4747
if (readMode == ReaderMode.IncrementalPostProcessing && !isRerun) {
4848
log.info(s"Getting the current batch for table '$tableName' at '$infoDate'...")
4949
metastore.getBatch(tableName, infoDate, None)
50-
} else if ((readMode == ReaderMode.IncrementalValidation || readMode == ReaderMode.IncrementalRun) && !isRerun) {
51-
log.info(s"Getting the current incremental chunk for table '$tableName' at '$infoDate'...")
52-
getIncremental(tableName, infoDate)
50+
} else if (readMode == ReaderMode.IncrementalValidation || readMode == ReaderMode.IncrementalRun) {
51+
if (isRerun) {
52+
log.info(s"Getting the current incremental chunk for table rerun '$tableName' at '$infoDate'...")
53+
getIncrementalForRerun(tableName, infoDate)
54+
} else {
55+
log.info(s"Getting the current incremental chunk for table '$tableName' at '$infoDate'...")
56+
getIncremental(tableName, infoDate)
57+
}
5358
} else {
5459
log.info(s"Getting daily data for table '$tableName' at '$infoDate'...")
5560
metastore.getTable(tableName, Option(infoDate), Option(infoDate))
@@ -89,6 +94,13 @@ class MetastoreReaderIncrementalImpl(metastore: Metastore,
8994
getIncrementalDf(tableName, trackingName, infoDate, commitChanges)
9095
}
9196

97+
private def getIncrementalForRerun(tableName: String, infoDate: LocalDate): DataFrame = {
98+
val commitChanges = readMode == ReaderMode.IncrementalRun
99+
val trackingName = s"$tableName->$outputTable"
100+
101+
getIncrementalDfForRerun(tableName, trackingName, infoDate, commitChanges)
102+
}
103+
92104
private def getIncrementalDf(tableName: String, trackingName: String, infoDate: LocalDate, commit: Boolean): DataFrame = {
93105
val tableDef = metastore.getTableDef(tableName)
94106
val om = bookkeeper.getOffsetManager
@@ -133,4 +145,31 @@ class MetastoreReaderIncrementalImpl(metastore: Metastore,
133145

134146
df
135147
}
148+
149+
private def getIncrementalDfForRerun(tableName: String, trackingName: String, infoDate: LocalDate, commit: Boolean): DataFrame = {
150+
val tableDef = metastore.getTableDef(tableName)
151+
val om = bookkeeper.getOffsetManager
152+
val offsets = om.getMaxInfoDateAndOffset(trackingName, Option(infoDate))
153+
val tableDf = metastore.getTable(tableName, Option(infoDate), Option(infoDate))
154+
155+
if (commit && !trackingTables.exists(t => t.trackingName == trackingName && t.infoDate == infoDate)) {
156+
log.info(s"Starting offset commit for table rerun '$trackingName' for '$infoDate'")
157+
158+
val trackingTable = TrackingTable(
159+
Thread.currentThread().getId,
160+
tableName,
161+
outputTable,
162+
trackingName,
163+
tableDef.batchIdColumn,
164+
offsets.map(_.minimumOffset),
165+
offsets.map(_.maximumOffset),
166+
infoDate,
167+
Instant.now()
168+
)
169+
170+
trackingTables += trackingTable
171+
}
172+
173+
tableDf
174+
}
136175
}

pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineDeltaLongSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,5 +113,13 @@ class IncrementalPipelineDeltaLongSuite extends IncrementalPipelineLongFixture {
113113
"transformer picks up doubly ingested offsets" in {
114114
testTransformerPicksUpFromDoubleIngestedData(format)
115115
}
116+
117+
"run normal run after a rerun" in {
118+
testNormalRunAfterRerun(format)
119+
}
120+
121+
"run normal run then rerun, then normal run again for the same day" in {
122+
testNormalRunAfterRerunAfterNormalRun(format)
123+
}
116124
}
117125
}

pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,6 +1279,120 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
12791279
succeed
12801280
}
12811281

1282+
def testNormalRunAfterRerun(metastoreFormat: String): Assertion = {
1283+
val csv1DataStr = s"id,name,info_date\n1,John,$infoDate\n2,Jack,$infoDate\n"
1284+
val csv2DataStr = s"id,name,info_date\n3,Jill,$infoDate\n4,Mary,$infoDate\n"
1285+
1286+
val expectedStr1: String =
1287+
"""{"id":1,"name":"John"}
1288+
|{"id":2,"name":"Jack"}
1289+
|""".stripMargin
1290+
1291+
val expectedStr2: String =
1292+
"""{"id":1,"name":"John"}
1293+
|{"id":2,"name":"Jack"}
1294+
|{"id":3,"name":"Jill"}
1295+
|{"id":4,"name":"Mary"}
1296+
|""".stripMargin
1297+
1298+
withTempDirectory("incremental1") { tempDir =>
1299+
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir)
1300+
1301+
val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv"))
1302+
val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv"))
1303+
1304+
val table1Path = new Path(tempDir, "table1")
1305+
val table2Path = new Path(tempDir, "table2")
1306+
1307+
fsUtils.writeFile(path1, csv1DataStr)
1308+
val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema, isRerun = true)
1309+
val exitCode1 = AppRunner.runPipeline(conf1)
1310+
assert(exitCode1 == 0)
1311+
1312+
fsUtils.writeFile(path2, csv2DataStr)
1313+
val conf2 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema)
1314+
val exitCode2 = AppRunner.runPipeline(conf2)
1315+
assert(exitCode2 == 0)
1316+
1317+
val dfTable1 = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate))
1318+
val dfTable2 = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate))
1319+
val actualTable1 = dfTable1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")
1320+
val actualTable2 = dfTable2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")
1321+
1322+
compareText(actualTable1, expectedStr2)
1323+
compareText(actualTable2, expectedStr2)
1324+
1325+
val batchIds = dfTable1.select(BATCH_ID_COLUMN).distinct().collect()
1326+
1327+
assert(batchIds.length == 2)
1328+
1329+
val om = new OffsetManagerJdbc(pramenDb.db, 123L)
1330+
1331+
val offsets = om.getOffsets("table1->table2", infoDate).map(_.asInstanceOf[CommittedOffset])
1332+
assert(offsets.length == 1)
1333+
}
1334+
succeed
1335+
}
1336+
1337+
def testNormalRunAfterRerunAfterNormalRun(metastoreFormat: String): Assertion = {
1338+
val csv1DataStr = s"id,name,info_date\n1,John,$infoDate\n2,Jack,$infoDate\n"
1339+
val csv2DataStr = s"id,name,info_date\n3,Jill,$infoDate\n4,Mary,$infoDate\n"
1340+
val csv3DataStr = s"id,name,info_date\n5,Jane,$infoDate\n6,Kate,$infoDate\n"
1341+
1342+
val expectedStr: String =
1343+
"""{"id":1,"name":"John"}
1344+
|{"id":2,"name":"Jack"}
1345+
|{"id":3,"name":"Jill"}
1346+
|{"id":4,"name":"Mary"}
1347+
|{"id":5,"name":"Jane"}
1348+
|{"id":6,"name":"Kate"}
1349+
|""".stripMargin
1350+
1351+
withTempDirectory("incremental1") { tempDir =>
1352+
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir)
1353+
1354+
val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv"))
1355+
val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv"))
1356+
val path3 = new Path(tempDir, new Path("landing", "landing_file3.csv"))
1357+
1358+
val table1Path = new Path(tempDir, "table1")
1359+
val table2Path = new Path(tempDir, "table2")
1360+
1361+
fsUtils.writeFile(path1, csv1DataStr)
1362+
val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema)
1363+
val exitCode1 = AppRunner.runPipeline(conf1)
1364+
assert(exitCode1 == 0)
1365+
1366+
fsUtils.writeFile(path2, csv2DataStr)
1367+
val conf2 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema, isRerun = true)
1368+
val exitCode2 = AppRunner.runPipeline(conf2)
1369+
assert(exitCode2 == 0)
1370+
1371+
fsUtils.writeFile(path3, csv3DataStr)
1372+
val conf3 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema)
1373+
val exitCode3 = AppRunner.runPipeline(conf3)
1374+
assert(exitCode3 == 0)
1375+
1376+
val dfTable1 = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate))
1377+
val dfTable2 = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate))
1378+
val actualTable1 = dfTable1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")
1379+
val actualTable2 = dfTable2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")
1380+
1381+
compareText(actualTable1, expectedStr)
1382+
compareText(actualTable2, expectedStr)
1383+
1384+
val batchIds = dfTable1.select(BATCH_ID_COLUMN).distinct().collect()
1385+
1386+
assert(batchIds.length == 2)
1387+
1388+
val om = new OffsetManagerJdbc(pramenDb.db, 123L)
1389+
1390+
val offsets = om.getOffsets("table1->table2", infoDate).map(_.asInstanceOf[CommittedOffset])
1391+
assert(offsets.length == 1)
1392+
}
1393+
succeed
1394+
}
1395+
12821396
def getConfig(basePath: String,
12831397
metastoreFormat: String,
12841398
isRerun: Boolean = false,

pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineParquetLongSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,5 +113,13 @@ class IncrementalPipelineParquetLongSuite extends IncrementalPipelineLongFixture
113113
"transformer picks up doubly ingested offsets" in {
114114
testTransformerPicksUpFromDoubleIngestedData(format)
115115
}
116+
117+
"run normal run after a rerun" in {
118+
testNormalRunAfterRerun(format)
119+
}
120+
121+
"run normal run then rerun, then normal run again for the same day" in {
122+
testNormalRunAfterRerunAfterNormalRun(format)
123+
}
116124
}
117125
}

0 commit comments

Comments
 (0)