From 85c8984e8f2ecc48ed89fd2b8630ac92b27175eb Mon Sep 17 00:00:00 2001 From: jozefbakus Date: Tue, 10 Oct 2023 08:57:20 +0200 Subject: [PATCH 1/6] AppConfig and ArgumentParser --- .../spark_metadata_tool/ArgumentParser.scala | 18 +++++++++++++++++- .../spark_metadata_tool/model/AppConfig.scala | 9 ++++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala b/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala index 46fbd15..2cd3fa8 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala @@ -25,7 +25,7 @@ import org.apache.log4j.PatternLayout import org.log4s.Logger import scopt.OParser import za.co.absa.spark_metadata_tool.LoggingImplicits._ -import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, ParsingError, S3, S3a, TargetFilesystem, Unix, UnknownFileSystemError} +import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, CompareFolders, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, ParsingError, S3, S3a, TargetFilesystem, Unix, UnknownFileSystemError} import java.time.LocalDateTime import java.time.format.DateTimeFormatter @@ -100,6 +100,20 @@ object ArgumentParser { .text("set compaction number") ), note(sys.props("line.separator")), + cmd("compare-folders") + .action((_, c) => c.copy(mode = CompareFolders)) + .text("Compare content of two folders") + .children( + opt[Path]('p', "path") + .required() + .action((x, c) => c.copy(path = x)) + .text("full path to folder, including filesystem (e.g. s3://bucket/foo/root)"), + opt[Path]('s', "secondarypath") + .required() + .action((x, c) => c.copy(secondaryPath = x)) + .text("full path to secondary folder, including filesystem (e.g. s3://bucket/foo/root)") + ), + note(sys.props("line.separator")), note("Other options:"), opt[Unit]('k', "keep-backup") .action((_, c) => c.copy(keepBackup = true)) @@ -126,6 +140,8 @@ object ArgumentParser { oldPath = None, path = new Path("default"), filesystem = Unix, + secondaryPath = new Path("default"), + secondaryFilesystem = Unix, keepBackup = false, verbose = false, logToFile = false, diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/model/AppConfig.scala b/src/main/scala/za/co/absa/spark_metadata_tool/model/AppConfig.scala index 6c1b4c7..bfc6dca 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/model/AppConfig.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/model/AppConfig.scala @@ -23,6 +23,8 @@ final case class AppConfig( oldPath: Option[Path], path: Path, filesystem: TargetFilesystem, + secondaryPath: Path, + secondaryFilesystem: TargetFilesystem, keepBackup: Boolean, verbose: Boolean, logToFile: Boolean, @@ -30,7 +32,8 @@ final case class AppConfig( ) sealed trait Mode -case object FixPaths extends Mode -case object Merge extends Mode -case object CompareMetadataWithData extends Mode +case object FixPaths extends Mode +case object Merge extends Mode +case object CompareMetadataWithData extends Mode final case class CreateMetadata(maxMicroBatchNumber: Int, compactionNumber: Int) extends Mode +case object CompareFolders extends Mode \ No newline at end of file From 5cb2beb7dc500f8b2c1b35fe9603fd4a7ed94f5f Mon Sep 17 00:00:00 2001 From: jozefbakus Date: Tue, 10 Oct 2023 09:07:41 +0200 Subject: [PATCH 2/6] MetadataTool and scalastyle-config.xml --- scalastyle-config.xml | 2 +- .../absa/spark_metadata_tool/MetadataTool.scala | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 57a51f3..5a405d2 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -85,7 +85,7 @@ - + diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/MetadataTool.scala b/src/main/scala/za/co/absa/spark_metadata_tool/MetadataTool.scala index 9d73139..1f903c4 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/MetadataTool.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/MetadataTool.scala @@ -213,6 +213,22 @@ class MetadataTool(io: FileManager) { files ++ filesInSubDir } + /** List content of directory recursively. + * + * @param rootDirPath + * path to root directory + * @return + * content of directory and all subdirectories recursively + */ + def listDirectoryRecursively(rootDirPath: Path): Either[AppError, Seq[Path]] = + for { + dirs <- io.listDirectories(rootDirPath) + files <- io.listFiles(rootDirPath) + filesInSubDir <- dirs.flatTraverse(dir => listDirectoryRecursively(dir)) + } yield { + dirs ++ files ++ filesInSubDir + } + /** Get metadata records from a metadata file. * * @param metadataFilePath From bfa406e8a01fe2d7180f007e3905ab9339f8030b Mon Sep 17 00:00:00 2001 From: jozefbakus Date: Tue, 10 Oct 2023 09:19:50 +0200 Subject: [PATCH 3/6] Application --- .../spark_metadata_tool/Application.scala | 44 ++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala b/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala index 3364512..231021b 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala @@ -24,7 +24,7 @@ import org.log4s.Logger import software.amazon.awssdk.services.s3.S3Client import za.co.absa.spark_metadata_tool.LoggingImplicits._ import za.co.absa.spark_metadata_tool.io.{FileManager, HdfsFileManager, S3FileManager, UnixFileManager} -import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, AppErrorWithThrowable, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, NotFoundError, S3, S3a, SinkFileStatus, TargetFilesystem, Unix, UnknownError} +import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, AppErrorWithThrowable, CompareFolders, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, NotFoundError, S3, S3a, SinkFileStatus, TargetFilesystem, Unix, UnknownError} import java.net.URI import scala.util.Try @@ -39,12 +39,18 @@ object Application extends App { } def run(args: Array[String]): Either[AppError, Unit] = for { - (conf, io, tool) <- init(args).tap(_.logInfo("Initialized application")) + conf <- initConfig(args) + (io, tool) <- initIo(conf.filesystem).tap(_.logInfo("Initialized application")) _ <- conf.mode match { case FixPaths => fixPaths(conf, io, tool) case Merge => mergeMetadataFiles(conf, io, tool) case CompareMetadataWithData => compareMetadataWithData(conf, io, tool) case m: CreateMetadata => createMetadata(conf, io, tool, new DataTool(io), m) + case CompareFolders => + for { + (_, secondaryTool) <- initIo(conf.secondaryFilesystem) + _ <- compareFolders(conf, tool, secondaryTool) + } yield () } backupPath = new Path(s"${conf.path}/$BackupDir") _ <- conf.mode match { @@ -158,6 +164,32 @@ object Application extends App { } yield () } + private def compareFolders( + config: AppConfig, + tool: MetadataTool, + secondaryTool: MetadataTool, + ): Either[AppError, Unit] = { + for { + dirContent <- tool.listDirectoryRecursively(config.path) + .map(_.map(path => path.toString.replaceFirst(config.path.toString, ""))) + secondaryDirContent <- secondaryTool.listDirectoryRecursively(config.secondaryPath) + .map(_.map(path => path.toString.replaceFirst(config.secondaryPath.toString, ""))) + } yield { + val diff = dirContent.diff(secondaryDirContent) + val oppositeDiff = secondaryDirContent.diff(dirContent) + if (diff.isEmpty && oppositeDiff.isEmpty) { + logger.info("Dirs are identical") + } else { + logger.error("Dirs are not identical") + logger.error("Paths that are different:") + (diff ++ oppositeDiff).toSet.foreach { path: String => + logger.error(path) + } + } + () + } + } + private def printDetectedDataIssues( notDeletedData: Iterable[Path], missingData: Iterable[Path], @@ -186,10 +218,10 @@ object Application extends App { } } - private def init(args: Array[String]): Either[AppError, (AppConfig, FileManager, MetadataTool)] = for { - config <- ArgumentParser.createConfig(args) - io <- initFileManager(config.filesystem) - } yield (config, io, new MetadataTool(io)) + private def initConfig(args: Array[String]): Either[AppError, AppConfig] = ArgumentParser.createConfig(args) + + private def initIo(targetFilesystem: TargetFilesystem): Either[AppError, (FileManager, MetadataTool)] = + initFileManager(targetFilesystem).map(fileManager => (fileManager, new MetadataTool(fileManager))) private def fixFile( path: Path, From 840a55d3a9e0ecca73b75e0c71c44f12ca1e0477 Mon Sep 17 00:00:00 2001 From: jozefbakus Date: Tue, 10 Oct 2023 09:25:27 +0200 Subject: [PATCH 4/6] README --- README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.md b/README.md index 772abda..f2d1f4a 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,11 @@ Example: > Metadata are aligned to the `--max-micro-batch-number,` so if the `--compaction-number` is higher than > the number of metadata files, it can produce empty, but still valid, metadata files. +### compare-folders +- Compares content of two folders and log difference + +Note that the tool does not perform any operation to file system + ## Usage ### Obtaining The application is being published as a standalone executable JAR. Simply download the most recent version of the file `spark-metadata-tool_2.13-x.y.z-assembly.jar` from the [package repository](https://github.com/orgs/AbsaOSS/packages?repo_name=spark-metadata-tool). @@ -129,6 +134,10 @@ Create Spark structured streaming metadata -c, --compaction-number set compaction number +Command: compare-folders [options] +Compares content of two folders and log difference + -p, --path full path to the folder, including filesystem (e.g. s3://bucket/foo/root) + -s, --secondarypath full path to the other folder, including filesystem (e.g. s3://bucket/foo/root) Other options: -k, --keep-backup persist backup files after successful run From 4b56b76f1cdb6932784dbf64b8f056af4b283bdb Mon Sep 17 00:00:00 2001 From: jozefbakus Date: Thu, 26 Oct 2023 15:43:48 +0200 Subject: [PATCH 5/6] PR FIX --- .../co/absa/spark_metadata_tool/ArgumentParser.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala b/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala index 2cd3fa8..8d84437 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala @@ -152,10 +152,16 @@ object ArgumentParser { parseResult .fold(Left(ParsingError("Couldn't parse provided arguments", None)): Either[AppError, AppConfig]) { conf => for { - _ <- initLogging(conf.verbose, conf.logToFile).tap(_.logDebug("Initialized logging")) + _ <- initLogging(conf.verbose, conf.logToFile).tap(_.logDebug("Initialized logging")) fs <- getFsFromPath(conf.path.toString).tap(_.logValueDebug("Derived filesystem from path")) + secondaryFs <- (getFsFromPath(conf.secondaryPath.toString) match { + case Right(value) => Right(Some(value)) + case Left(_) if conf.secondaryPath.toString == "default" => Right(None) + case Left(value) => Left(value) + }).tap(_.logValueDebug("Derived secondary filesystem from path")) } yield conf.copy( - filesystem = fs + filesystem = fs, + secondaryFilesystem = secondaryFs.getOrElse(Unix) ) } .tap(_.logValueDebug("Initialized application config")) From 8a85da3566c56a8a903f7e896f6251f2cab6cdd1 Mon Sep 17 00:00:00 2001 From: jozefbakus Date: Mon, 6 Nov 2023 14:50:33 +0100 Subject: [PATCH 6/6] Compare metadata (#63) * Compare metadata --- README.md | 10 +++ scalastyle-config.xml | 2 +- .../spark_metadata_tool/Application.scala | 77 ++++++++++++++++--- .../spark_metadata_tool/ArgumentParser.scala | 16 +++- .../spark_metadata_tool/model/AppConfig.scala | 3 +- 5 files changed, 96 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index f2d1f4a..29128a3 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,11 @@ Example: Note that the tool does not perform any operation to file system +### compare-metadata +- Compares content of metadata data files in two different data locations + +Note that the tool does not perform any operation to file system + ## Usage ### Obtaining The application is being published as a standalone executable JAR. Simply download the most recent version of the file `spark-metadata-tool_2.13-x.y.z-assembly.jar` from the [package repository](https://github.com/orgs/AbsaOSS/packages?repo_name=spark-metadata-tool). @@ -139,6 +144,11 @@ Compares content of two folders and log difference -p, --path full path to the folder, including filesystem (e.g. s3://bucket/foo/root) -s, --secondarypath full path to the other folder, including filesystem (e.g. s3://bucket/foo/root) +Command: compare-metadata [options] +Compares content of metadata data files in two different data locations + -p, --path full path to root data folder, including filesystem (e.g. s3://bucket/foo/root) + -s, --secondarypath full path to root secondary data folder, including filesystem (e.g. s3://bucket/foo/root) + Other options: -k, --keep-backup persist backup files after successful run -v, --verbose increase verbosity of application logging diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 5a405d2..587e1af 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -98,7 +98,7 @@ - + diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala b/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala index 231021b..a62e5ac 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala @@ -24,7 +24,7 @@ import org.log4s.Logger import software.amazon.awssdk.services.s3.S3Client import za.co.absa.spark_metadata_tool.LoggingImplicits._ import za.co.absa.spark_metadata_tool.io.{FileManager, HdfsFileManager, S3FileManager, UnixFileManager} -import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, AppErrorWithThrowable, CompareFolders, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, NotFoundError, S3, S3a, SinkFileStatus, TargetFilesystem, Unix, UnknownError} +import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, AppErrorWithThrowable, CompareFolders, CompareMetadata, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, NotFoundError, S3, S3a, SinkFileStatus, TargetFilesystem, Unix, UnknownError} import java.net.URI import scala.util.Try @@ -42,14 +42,18 @@ object Application extends App { conf <- initConfig(args) (io, tool) <- initIo(conf.filesystem).tap(_.logInfo("Initialized application")) _ <- conf.mode match { - case FixPaths => fixPaths(conf, io, tool) - case Merge => mergeMetadataFiles(conf, io, tool) - case CompareMetadataWithData => compareMetadataWithData(conf, io, tool) - case m: CreateMetadata => createMetadata(conf, io, tool, new DataTool(io), m) - case CompareFolders => + case FixPaths => fixPaths(conf, io, tool) + case Merge => mergeMetadataFiles(conf, io, tool) + case CompareMetadataWithData => compareMetadataWithData(conf, io, tool) + case m: CreateMetadata => createMetadata(conf, io, tool, new DataTool(io), m) + case CompareFolders | CompareMetadata => for { - (_, secondaryTool) <- initIo(conf.secondaryFilesystem) - _ <- compareFolders(conf, tool, secondaryTool) + (secondaryIo, secondaryTool) <- initIo(conf.secondaryFilesystem) + _ <- conf.mode match { + case CompareFolders => compareFolders(conf, tool, secondaryTool) + case CompareMetadata => compareMetadataFolders(conf, tool, io, secondaryTool, secondaryIo) + case _ => ().asRight + } } yield () } backupPath = new Path(s"${conf.path}/$BackupDir") @@ -183,13 +187,68 @@ object Application extends App { logger.error("Dirs are not identical") logger.error("Paths that are different:") (diff ++ oppositeDiff).toSet.foreach { path: String => - logger.error(path) + dirContent.find(_.endsWith(path)).foreach(p => logger.error(s"${config.path}$p")) + secondaryDirContent.find(_.endsWith(path)).foreach(p => logger.error(s"${config.secondaryPath}$p")) } } () } } + private def compareMetadataFolders( + config: AppConfig, + tool: MetadataTool, + io: FileManager, + secondaryTool: MetadataTool, + secondaryIo: FileManager, + ): Either[AppError, Unit] = { + val dataPath = config.path + val metaPath = new Path(s"$dataPath/$SparkMetadataDir") + val secondaryDataPath = config.secondaryPath + val secondaryMetaPath = new Path(s"$secondaryDataPath/$SparkMetadataDir") + + for { + metaPaths <- io.listFiles(metaPath) + usedMetaFiles <- tool.filterLastCompact(metaPaths) + metaRecords <- usedMetaFiles.flatTraverse(metaFile => tool.getMetaRecords(metaFile.path)) + + secondaryMetaPaths <- secondaryIo.listFiles(secondaryMetaPath) + secondaryUsedMetaFiles <- secondaryTool.filterLastCompact(secondaryMetaPaths) + secondaryMetaRecords <- secondaryUsedMetaFiles.flatTraverse(metaFile => secondaryTool.getMetaRecords(metaFile.path)) + } yield { + val incorrectMetaRecords = + metaRecords.map(_.path.toString).filter(p => !p.startsWith(dataPath.toString)) + val incorrectSecondaryMetaRecords = + secondaryMetaRecords.map(_.path.toString).filter(p => !p.startsWith(secondaryDataPath.toString)) + + if(incorrectMetaRecords.nonEmpty) { + logger.error(s"Metafiles in $dataPath contains different filesystems") + incorrectMetaRecords.foreach(p => logger.error(p)) + } + if (incorrectSecondaryMetaRecords.nonEmpty) { + logger.error(s"Metafiles in $secondaryDataPath contains different filesystems") + incorrectSecondaryMetaRecords.foreach(p => logger.error(p)) + } + + val metaPaths = + metaRecords.map(_.path.toString.replaceFirst(dataPath.toString, "")) + val secondaryMetaPaths = + secondaryMetaRecords.map(_.path.toString.replaceFirst(secondaryDataPath.toString, "")) + + val diff = metaPaths.diff(secondaryMetaPaths) + val oppositeDiff = secondaryMetaPaths.diff(metaPaths) + if (diff.isEmpty && oppositeDiff.isEmpty) { + logger.info("Meta files are identical") + } else { + logger.error("Meta files are not identical") + logger.error("Paths that are different:") + (diff ++ oppositeDiff).toSet.foreach { path: String => + logger.error(path) + } + } + } + } + private def printDetectedDataIssues( notDeletedData: Iterable[Path], missingData: Iterable[Path], diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala b/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala index 8d84437..40617c5 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala @@ -25,7 +25,7 @@ import org.apache.log4j.PatternLayout import org.log4s.Logger import scopt.OParser import za.co.absa.spark_metadata_tool.LoggingImplicits._ -import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, CompareFolders, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, ParsingError, S3, S3a, TargetFilesystem, Unix, UnknownFileSystemError} +import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, CompareFolders, CompareMetadata, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, ParsingError, S3, S3a, TargetFilesystem, Unix, UnknownFileSystemError} import java.time.LocalDateTime import java.time.format.DateTimeFormatter @@ -114,6 +114,20 @@ object ArgumentParser { .text("full path to secondary folder, including filesystem (e.g. s3://bucket/foo/root)") ), note(sys.props("line.separator")), + cmd("compare-metadata") + .action((_, c) => c.copy(mode = CompareMetadata)) + .text("Compare content of metadata data files in two different data locations") + .children( + opt[Path]('p', "path") + .required() + .action((x, c) => c.copy(path = x)) + .text("full path to root data folder, including filesystem (e.g. s3://bucket/foo/root)"), + opt[Path]('s', "secondarypath") + .required() + .action((x, c) => c.copy(secondaryPath = x)) + .text("full path to root secondary data folder, including filesystem (e.g. s3://bucket/foo/root)") + ), + note(sys.props("line.separator")), note("Other options:"), opt[Unit]('k', "keep-backup") .action((_, c) => c.copy(keepBackup = true)) diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/model/AppConfig.scala b/src/main/scala/za/co/absa/spark_metadata_tool/model/AppConfig.scala index bfc6dca..b129682 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/model/AppConfig.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/model/AppConfig.scala @@ -36,4 +36,5 @@ case object FixPaths case object Merge extends Mode case object CompareMetadataWithData extends Mode final case class CreateMetadata(maxMicroBatchNumber: Int, compactionNumber: Int) extends Mode -case object CompareFolders extends Mode \ No newline at end of file +case object CompareFolders extends Mode +case object CompareMetadata extends Mode \ No newline at end of file