Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ Example:
> Metadata are aligned to the `--max-micro-batch-number,` so if the `--compaction-number` is higher than
> the number of metadata files, it can produce empty, but still valid, metadata files.

### compare-folders
- Compares content of two folders and log difference

Note that the tool does not perform any operation to file system

### compare-metadata
- Compares content of metadata data files in two different data locations

Note that the tool does not perform any operation to file system

## Usage
### Obtaining
The application is being published as a standalone executable JAR. Simply download the most recent version of the file `spark-metadata-tool_2.13-x.y.z-assembly.jar` from the [package repository](https://github.com/orgs/AbsaOSS/packages?repo_name=spark-metadata-tool).
Expand Down Expand Up @@ -129,6 +139,15 @@ Create Spark structured streaming metadata
-c, --compaction-number <value>
set compaction number

Command: compare-folders [options]
Compares content of two folders and log difference
-p, --path <value> full path to the folder, including filesystem (e.g. s3://bucket/foo/root)
-s, --secondarypath <value> full path to the other folder, including filesystem (e.g. s3://bucket/foo/root)

Command: compare-metadata [options]
Compares content of metadata data files in two different data locations
-p, --path <value> full path to root data folder, including filesystem (e.g. s3://bucket/foo/root)
-s, --secondarypath <value> full path to root secondary data folder, including filesystem (e.g. s3://bucket/foo/root)

Other options:
-k, --keep-backup persist backup files after successful run
Expand Down
4 changes: 2 additions & 2 deletions scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
</check>
<check level="warning" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true">
<parameters>
<parameter name="maximum"><![CDATA[10]]></parameter>
<parameter name="maximum"><![CDATA[15]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
Expand All @@ -98,7 +98,7 @@
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
<parameters>
<parameter name="maxLength"><![CDATA[50]]></parameter>
<parameter name="maxLength"><![CDATA[60]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true">
Expand Down
111 changes: 101 additions & 10 deletions src/main/scala/za/co/absa/spark_metadata_tool/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.log4s.Logger
import software.amazon.awssdk.services.s3.S3Client
import za.co.absa.spark_metadata_tool.LoggingImplicits._
import za.co.absa.spark_metadata_tool.io.{FileManager, HdfsFileManager, S3FileManager, UnixFileManager}
import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, AppErrorWithThrowable, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, NotFoundError, S3, S3a, SinkFileStatus, TargetFilesystem, Unix, UnknownError}
import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, AppErrorWithThrowable, CompareFolders, CompareMetadata, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, NotFoundError, S3, S3a, SinkFileStatus, TargetFilesystem, Unix, UnknownError}

import java.net.URI
import scala.util.Try
Expand All @@ -39,12 +39,22 @@ object Application extends App {
}

def run(args: Array[String]): Either[AppError, Unit] = for {
(conf, io, tool) <- init(args).tap(_.logInfo("Initialized application"))
conf <- initConfig(args)
(io, tool) <- initIo(conf.filesystem).tap(_.logInfo("Initialized application"))
_ <- conf.mode match {
case FixPaths => fixPaths(conf, io, tool)
case Merge => mergeMetadataFiles(conf, io, tool)
case CompareMetadataWithData => compareMetadataWithData(conf, io, tool)
case m: CreateMetadata => createMetadata(conf, io, tool, new DataTool(io), m)
case FixPaths => fixPaths(conf, io, tool)
case Merge => mergeMetadataFiles(conf, io, tool)
case CompareMetadataWithData => compareMetadataWithData(conf, io, tool)
case m: CreateMetadata => createMetadata(conf, io, tool, new DataTool(io), m)
case CompareFolders | CompareMetadata =>
for {
(secondaryIo, secondaryTool) <- initIo(conf.secondaryFilesystem)
_ <- conf.mode match {
case CompareFolders => compareFolders(conf, tool, secondaryTool)
case CompareMetadata => compareMetadataFolders(conf, tool, io, secondaryTool, secondaryIo)
case _ => ().asRight
}
} yield ()
}
backupPath = new Path(s"${conf.path}/$BackupDir")
_ <- conf.mode match {
Expand Down Expand Up @@ -158,6 +168,87 @@ object Application extends App {
} yield ()
}

private def compareFolders(
config: AppConfig,
tool: MetadataTool,
secondaryTool: MetadataTool,
): Either[AppError, Unit] = {
for {
dirContent <- tool.listDirectoryRecursively(config.path)
.map(_.map(path => path.toString.replaceFirst(config.path.toString, "")))
secondaryDirContent <- secondaryTool.listDirectoryRecursively(config.secondaryPath)
.map(_.map(path => path.toString.replaceFirst(config.secondaryPath.toString, "")))
} yield {
val diff = dirContent.diff(secondaryDirContent)
val oppositeDiff = secondaryDirContent.diff(dirContent)
if (diff.isEmpty && oppositeDiff.isEmpty) {
logger.info("Dirs are identical")
} else {
logger.error("Dirs are not identical")
logger.error("Paths that are different:")
(diff ++ oppositeDiff).toSet.foreach { path: String =>
dirContent.find(_.endsWith(path)).foreach(p => logger.error(s"${config.path}$p"))
secondaryDirContent.find(_.endsWith(path)).foreach(p => logger.error(s"${config.secondaryPath}$p"))
}
}
()
}
}

private def compareMetadataFolders(
config: AppConfig,
tool: MetadataTool,
io: FileManager,
secondaryTool: MetadataTool,
secondaryIo: FileManager,
): Either[AppError, Unit] = {
val dataPath = config.path
val metaPath = new Path(s"$dataPath/$SparkMetadataDir")
val secondaryDataPath = config.secondaryPath
val secondaryMetaPath = new Path(s"$secondaryDataPath/$SparkMetadataDir")

for {
metaPaths <- io.listFiles(metaPath)
usedMetaFiles <- tool.filterLastCompact(metaPaths)
metaRecords <- usedMetaFiles.flatTraverse(metaFile => tool.getMetaRecords(metaFile.path))

secondaryMetaPaths <- secondaryIo.listFiles(secondaryMetaPath)
secondaryUsedMetaFiles <- secondaryTool.filterLastCompact(secondaryMetaPaths)
secondaryMetaRecords <- secondaryUsedMetaFiles.flatTraverse(metaFile => secondaryTool.getMetaRecords(metaFile.path))
} yield {
val incorrectMetaRecords =
metaRecords.map(_.path.toString).filter(p => !p.startsWith(dataPath.toString))
val incorrectSecondaryMetaRecords =
secondaryMetaRecords.map(_.path.toString).filter(p => !p.startsWith(secondaryDataPath.toString))

if(incorrectMetaRecords.nonEmpty) {
logger.error(s"Metafiles in $dataPath contains different filesystems")
incorrectMetaRecords.foreach(p => logger.error(p))
}
if (incorrectSecondaryMetaRecords.nonEmpty) {
logger.error(s"Metafiles in $secondaryDataPath contains different filesystems")
incorrectSecondaryMetaRecords.foreach(p => logger.error(p))
}

val metaPaths =
metaRecords.map(_.path.toString.replaceFirst(dataPath.toString, ""))
val secondaryMetaPaths =
secondaryMetaRecords.map(_.path.toString.replaceFirst(secondaryDataPath.toString, ""))

val diff = metaPaths.diff(secondaryMetaPaths)
val oppositeDiff = secondaryMetaPaths.diff(metaPaths)
if (diff.isEmpty && oppositeDiff.isEmpty) {
logger.info("Meta files are identical")
} else {
logger.error("Meta files are not identical")
logger.error("Paths that are different:")
(diff ++ oppositeDiff).toSet.foreach { path: String =>
logger.error(path)
}
}
}
}

private def printDetectedDataIssues(
notDeletedData: Iterable[Path],
missingData: Iterable[Path],
Expand Down Expand Up @@ -186,10 +277,10 @@ object Application extends App {
}
}

private def init(args: Array[String]): Either[AppError, (AppConfig, FileManager, MetadataTool)] = for {
config <- ArgumentParser.createConfig(args)
io <- initFileManager(config.filesystem)
} yield (config, io, new MetadataTool(io))
private def initConfig(args: Array[String]): Either[AppError, AppConfig] = ArgumentParser.createConfig(args)

private def initIo(targetFilesystem: TargetFilesystem): Either[AppError, (FileManager, MetadataTool)] =
initFileManager(targetFilesystem).map(fileManager => (fileManager, new MetadataTool(fileManager)))

private def fixFile(
path: Path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.log4j.PatternLayout
import org.log4s.Logger
import scopt.OParser
import za.co.absa.spark_metadata_tool.LoggingImplicits._
import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, ParsingError, S3, S3a, TargetFilesystem, Unix, UnknownFileSystemError}
import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, CompareFolders, CompareMetadata, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, ParsingError, S3, S3a, TargetFilesystem, Unix, UnknownFileSystemError}

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
Expand Down Expand Up @@ -100,6 +100,34 @@ object ArgumentParser {
.text("set compaction number")
),
note(sys.props("line.separator")),
cmd("compare-folders")
.action((_, c) => c.copy(mode = CompareFolders))
.text("Compare content of two folders")
.children(
opt[Path]('p', "path")
.required()
.action((x, c) => c.copy(path = x))
.text("full path to folder, including filesystem (e.g. s3://bucket/foo/root)"),
opt[Path]('s', "secondarypath")
.required()
.action((x, c) => c.copy(secondaryPath = x))
.text("full path to secondary folder, including filesystem (e.g. s3://bucket/foo/root)")
),
note(sys.props("line.separator")),
cmd("compare-metadata")
.action((_, c) => c.copy(mode = CompareMetadata))
.text("Compare content of metadata data files in two different data locations")
.children(
opt[Path]('p', "path")
.required()
.action((x, c) => c.copy(path = x))
.text("full path to root data folder, including filesystem (e.g. s3://bucket/foo/root)"),
opt[Path]('s', "secondarypath")
.required()
.action((x, c) => c.copy(secondaryPath = x))
.text("full path to root secondary data folder, including filesystem (e.g. s3://bucket/foo/root)")
),
note(sys.props("line.separator")),
note("Other options:"),
opt[Unit]('k', "keep-backup")
.action((_, c) => c.copy(keepBackup = true))
Expand All @@ -126,6 +154,8 @@ object ArgumentParser {
oldPath = None,
path = new Path("default"),
filesystem = Unix,
secondaryPath = new Path("default"),
secondaryFilesystem = Unix,
keepBackup = false,
verbose = false,
logToFile = false,
Expand All @@ -136,10 +166,16 @@ object ArgumentParser {
parseResult
.fold(Left(ParsingError("Couldn't parse provided arguments", None)): Either[AppError, AppConfig]) { conf =>
for {
_ <- initLogging(conf.verbose, conf.logToFile).tap(_.logDebug("Initialized logging"))
_ <- initLogging(conf.verbose, conf.logToFile).tap(_.logDebug("Initialized logging"))
fs <- getFsFromPath(conf.path.toString).tap(_.logValueDebug("Derived filesystem from path"))
secondaryFs <- (getFsFromPath(conf.secondaryPath.toString) match {
case Right(value) => Right(Some(value))
case Left(_) if conf.secondaryPath.toString == "default" => Right(None)
case Left(value) => Left(value)
}).tap(_.logValueDebug("Derived secondary filesystem from path"))
} yield conf.copy(
filesystem = fs
filesystem = fs,
secondaryFilesystem = secondaryFs.getOrElse(Unix)
)
}
.tap(_.logValueDebug("Initialized application config"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,22 @@ class MetadataTool(io: FileManager) {
files ++ filesInSubDir
}

/** List content of directory recursively.
*
* @param rootDirPath
* path to root directory
* @return
* content of directory and all subdirectories recursively
*/
def listDirectoryRecursively(rootDirPath: Path): Either[AppError, Seq[Path]] =
for {
dirs <- io.listDirectories(rootDirPath)
files <- io.listFiles(rootDirPath)
filesInSubDir <- dirs.flatTraverse(dir => listDirectoryRecursively(dir))
} yield {
dirs ++ files ++ filesInSubDir
}

/** Get metadata records from a metadata file.
*
* @param metadataFilePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ final case class AppConfig(
oldPath: Option[Path],
path: Path,
filesystem: TargetFilesystem,
secondaryPath: Path,
secondaryFilesystem: TargetFilesystem,
keepBackup: Boolean,
verbose: Boolean,
logToFile: Boolean,
dryRun: Boolean
)

sealed trait Mode
case object FixPaths extends Mode
case object Merge extends Mode
case object CompareMetadataWithData extends Mode
case object FixPaths extends Mode
case object Merge extends Mode
case object CompareMetadataWithData extends Mode
final case class CreateMetadata(maxMicroBatchNumber: Int, compactionNumber: Int) extends Mode
case object CompareFolders extends Mode
case object CompareMetadata extends Mode