diff --git a/README.md b/README.md index 772abda..29128a3 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,16 @@ Example: > Metadata are aligned to the `--max-micro-batch-number,` so if the `--compaction-number` is higher than > the number of metadata files, it can produce empty, but still valid, metadata files. +### compare-folders +- Compares content of two folders and log difference + +Note that the tool does not perform any operation to file system + +### compare-metadata +- Compares content of metadata data files in two different data locations + +Note that the tool does not perform any operation to file system + ## Usage ### Obtaining The application is being published as a standalone executable JAR. Simply download the most recent version of the file `spark-metadata-tool_2.13-x.y.z-assembly.jar` from the [package repository](https://github.com/orgs/AbsaOSS/packages?repo_name=spark-metadata-tool). @@ -129,6 +139,15 @@ Create Spark structured streaming metadata -c, --compaction-number set compaction number +Command: compare-folders [options] +Compares content of two folders and log difference + -p, --path full path to the folder, including filesystem (e.g. s3://bucket/foo/root) + -s, --secondarypath full path to the other folder, including filesystem (e.g. s3://bucket/foo/root) + +Command: compare-metadata [options] +Compares content of metadata data files in two different data locations + -p, --path full path to root data folder, including filesystem (e.g. s3://bucket/foo/root) + -s, --secondarypath full path to root secondary data folder, including filesystem (e.g. s3://bucket/foo/root) Other options: -k, --keep-backup persist backup files after successful run diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 57a51f3..587e1af 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -85,7 +85,7 @@ - + @@ -98,7 +98,7 @@ - + diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala b/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala index 3364512..a62e5ac 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala @@ -24,7 +24,7 @@ import org.log4s.Logger import software.amazon.awssdk.services.s3.S3Client import za.co.absa.spark_metadata_tool.LoggingImplicits._ import za.co.absa.spark_metadata_tool.io.{FileManager, HdfsFileManager, S3FileManager, UnixFileManager} -import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, AppErrorWithThrowable, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, NotFoundError, S3, S3a, SinkFileStatus, TargetFilesystem, Unix, UnknownError} +import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, AppErrorWithThrowable, CompareFolders, CompareMetadata, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, NotFoundError, S3, S3a, SinkFileStatus, TargetFilesystem, Unix, UnknownError} import java.net.URI import scala.util.Try @@ -39,12 +39,22 @@ object Application extends App { } def run(args: Array[String]): Either[AppError, Unit] = for { - (conf, io, tool) <- init(args).tap(_.logInfo("Initialized application")) + conf <- initConfig(args) + (io, tool) <- initIo(conf.filesystem).tap(_.logInfo("Initialized application")) _ <- conf.mode match { - case FixPaths => fixPaths(conf, io, tool) - case Merge => mergeMetadataFiles(conf, io, tool) - case CompareMetadataWithData => compareMetadataWithData(conf, io, tool) - case m: CreateMetadata => createMetadata(conf, io, tool, new DataTool(io), m) + case FixPaths => fixPaths(conf, io, tool) + case Merge => mergeMetadataFiles(conf, io, tool) + case CompareMetadataWithData => compareMetadataWithData(conf, io, tool) + case m: CreateMetadata => createMetadata(conf, io, tool, new DataTool(io), m) + case CompareFolders | CompareMetadata => + for { + (secondaryIo, secondaryTool) <- initIo(conf.secondaryFilesystem) + _ <- conf.mode match { + case CompareFolders => compareFolders(conf, tool, secondaryTool) + case CompareMetadata => compareMetadataFolders(conf, tool, io, secondaryTool, secondaryIo) + case _ => ().asRight + } + } yield () } backupPath = new Path(s"${conf.path}/$BackupDir") _ <- conf.mode match { @@ -158,6 +168,87 @@ object Application extends App { } yield () } + private def compareFolders( + config: AppConfig, + tool: MetadataTool, + secondaryTool: MetadataTool, + ): Either[AppError, Unit] = { + for { + dirContent <- tool.listDirectoryRecursively(config.path) + .map(_.map(path => path.toString.replaceFirst(config.path.toString, ""))) + secondaryDirContent <- secondaryTool.listDirectoryRecursively(config.secondaryPath) + .map(_.map(path => path.toString.replaceFirst(config.secondaryPath.toString, ""))) + } yield { + val diff = dirContent.diff(secondaryDirContent) + val oppositeDiff = secondaryDirContent.diff(dirContent) + if (diff.isEmpty && oppositeDiff.isEmpty) { + logger.info("Dirs are identical") + } else { + logger.error("Dirs are not identical") + logger.error("Paths that are different:") + (diff ++ oppositeDiff).toSet.foreach { path: String => + dirContent.find(_.endsWith(path)).foreach(p => logger.error(s"${config.path}$p")) + secondaryDirContent.find(_.endsWith(path)).foreach(p => logger.error(s"${config.secondaryPath}$p")) + } + } + () + } + } + + private def compareMetadataFolders( + config: AppConfig, + tool: MetadataTool, + io: FileManager, + secondaryTool: MetadataTool, + secondaryIo: FileManager, + ): Either[AppError, Unit] = { + val dataPath = config.path + val metaPath = new Path(s"$dataPath/$SparkMetadataDir") + val secondaryDataPath = config.secondaryPath + val secondaryMetaPath = new Path(s"$secondaryDataPath/$SparkMetadataDir") + + for { + metaPaths <- io.listFiles(metaPath) + usedMetaFiles <- tool.filterLastCompact(metaPaths) + metaRecords <- usedMetaFiles.flatTraverse(metaFile => tool.getMetaRecords(metaFile.path)) + + secondaryMetaPaths <- secondaryIo.listFiles(secondaryMetaPath) + secondaryUsedMetaFiles <- secondaryTool.filterLastCompact(secondaryMetaPaths) + secondaryMetaRecords <- secondaryUsedMetaFiles.flatTraverse(metaFile => secondaryTool.getMetaRecords(metaFile.path)) + } yield { + val incorrectMetaRecords = + metaRecords.map(_.path.toString).filter(p => !p.startsWith(dataPath.toString)) + val incorrectSecondaryMetaRecords = + secondaryMetaRecords.map(_.path.toString).filter(p => !p.startsWith(secondaryDataPath.toString)) + + if(incorrectMetaRecords.nonEmpty) { + logger.error(s"Metafiles in $dataPath contains different filesystems") + incorrectMetaRecords.foreach(p => logger.error(p)) + } + if (incorrectSecondaryMetaRecords.nonEmpty) { + logger.error(s"Metafiles in $secondaryDataPath contains different filesystems") + incorrectSecondaryMetaRecords.foreach(p => logger.error(p)) + } + + val metaPaths = + metaRecords.map(_.path.toString.replaceFirst(dataPath.toString, "")) + val secondaryMetaPaths = + secondaryMetaRecords.map(_.path.toString.replaceFirst(secondaryDataPath.toString, "")) + + val diff = metaPaths.diff(secondaryMetaPaths) + val oppositeDiff = secondaryMetaPaths.diff(metaPaths) + if (diff.isEmpty && oppositeDiff.isEmpty) { + logger.info("Meta files are identical") + } else { + logger.error("Meta files are not identical") + logger.error("Paths that are different:") + (diff ++ oppositeDiff).toSet.foreach { path: String => + logger.error(path) + } + } + } + } + private def printDetectedDataIssues( notDeletedData: Iterable[Path], missingData: Iterable[Path], @@ -186,10 +277,10 @@ object Application extends App { } } - private def init(args: Array[String]): Either[AppError, (AppConfig, FileManager, MetadataTool)] = for { - config <- ArgumentParser.createConfig(args) - io <- initFileManager(config.filesystem) - } yield (config, io, new MetadataTool(io)) + private def initConfig(args: Array[String]): Either[AppError, AppConfig] = ArgumentParser.createConfig(args) + + private def initIo(targetFilesystem: TargetFilesystem): Either[AppError, (FileManager, MetadataTool)] = + initFileManager(targetFilesystem).map(fileManager => (fileManager, new MetadataTool(fileManager))) private def fixFile( path: Path, diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala b/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala index 46fbd15..40617c5 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala @@ -25,7 +25,7 @@ import org.apache.log4j.PatternLayout import org.log4s.Logger import scopt.OParser import za.co.absa.spark_metadata_tool.LoggingImplicits._ -import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, ParsingError, S3, S3a, TargetFilesystem, Unix, UnknownFileSystemError} +import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, CompareFolders, CompareMetadata, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, ParsingError, S3, S3a, TargetFilesystem, Unix, UnknownFileSystemError} import java.time.LocalDateTime import java.time.format.DateTimeFormatter @@ -100,6 +100,34 @@ object ArgumentParser { .text("set compaction number") ), note(sys.props("line.separator")), + cmd("compare-folders") + .action((_, c) => c.copy(mode = CompareFolders)) + .text("Compare content of two folders") + .children( + opt[Path]('p', "path") + .required() + .action((x, c) => c.copy(path = x)) + .text("full path to folder, including filesystem (e.g. s3://bucket/foo/root)"), + opt[Path]('s', "secondarypath") + .required() + .action((x, c) => c.copy(secondaryPath = x)) + .text("full path to secondary folder, including filesystem (e.g. s3://bucket/foo/root)") + ), + note(sys.props("line.separator")), + cmd("compare-metadata") + .action((_, c) => c.copy(mode = CompareMetadata)) + .text("Compare content of metadata data files in two different data locations") + .children( + opt[Path]('p', "path") + .required() + .action((x, c) => c.copy(path = x)) + .text("full path to root data folder, including filesystem (e.g. s3://bucket/foo/root)"), + opt[Path]('s', "secondarypath") + .required() + .action((x, c) => c.copy(secondaryPath = x)) + .text("full path to root secondary data folder, including filesystem (e.g. s3://bucket/foo/root)") + ), + note(sys.props("line.separator")), note("Other options:"), opt[Unit]('k', "keep-backup") .action((_, c) => c.copy(keepBackup = true)) @@ -126,6 +154,8 @@ object ArgumentParser { oldPath = None, path = new Path("default"), filesystem = Unix, + secondaryPath = new Path("default"), + secondaryFilesystem = Unix, keepBackup = false, verbose = false, logToFile = false, @@ -136,10 +166,16 @@ object ArgumentParser { parseResult .fold(Left(ParsingError("Couldn't parse provided arguments", None)): Either[AppError, AppConfig]) { conf => for { - _ <- initLogging(conf.verbose, conf.logToFile).tap(_.logDebug("Initialized logging")) + _ <- initLogging(conf.verbose, conf.logToFile).tap(_.logDebug("Initialized logging")) fs <- getFsFromPath(conf.path.toString).tap(_.logValueDebug("Derived filesystem from path")) + secondaryFs <- (getFsFromPath(conf.secondaryPath.toString) match { + case Right(value) => Right(Some(value)) + case Left(_) if conf.secondaryPath.toString == "default" => Right(None) + case Left(value) => Left(value) + }).tap(_.logValueDebug("Derived secondary filesystem from path")) } yield conf.copy( - filesystem = fs + filesystem = fs, + secondaryFilesystem = secondaryFs.getOrElse(Unix) ) } .tap(_.logValueDebug("Initialized application config")) diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/MetadataTool.scala b/src/main/scala/za/co/absa/spark_metadata_tool/MetadataTool.scala index 9d73139..1f903c4 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/MetadataTool.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/MetadataTool.scala @@ -213,6 +213,22 @@ class MetadataTool(io: FileManager) { files ++ filesInSubDir } + /** List content of directory recursively. + * + * @param rootDirPath + * path to root directory + * @return + * content of directory and all subdirectories recursively + */ + def listDirectoryRecursively(rootDirPath: Path): Either[AppError, Seq[Path]] = + for { + dirs <- io.listDirectories(rootDirPath) + files <- io.listFiles(rootDirPath) + filesInSubDir <- dirs.flatTraverse(dir => listDirectoryRecursively(dir)) + } yield { + dirs ++ files ++ filesInSubDir + } + /** Get metadata records from a metadata file. * * @param metadataFilePath diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/model/AppConfig.scala b/src/main/scala/za/co/absa/spark_metadata_tool/model/AppConfig.scala index 6c1b4c7..b129682 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/model/AppConfig.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/model/AppConfig.scala @@ -23,6 +23,8 @@ final case class AppConfig( oldPath: Option[Path], path: Path, filesystem: TargetFilesystem, + secondaryPath: Path, + secondaryFilesystem: TargetFilesystem, keepBackup: Boolean, verbose: Boolean, logToFile: Boolean, @@ -30,7 +32,9 @@ final case class AppConfig( ) sealed trait Mode -case object FixPaths extends Mode -case object Merge extends Mode -case object CompareMetadataWithData extends Mode +case object FixPaths extends Mode +case object Merge extends Mode +case object CompareMetadataWithData extends Mode final case class CreateMetadata(maxMicroBatchNumber: Int, compactionNumber: Int) extends Mode +case object CompareFolders extends Mode +case object CompareMetadata extends Mode \ No newline at end of file