Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ In every run mode, the tool offers following universal features:
- Backup is deleted after a successful run(can be overridden to keep the backup)
- Currently supported file systems:
- S3
- S3a
- Unix
- HDFS

Expand Down Expand Up @@ -142,6 +143,12 @@ To be able to perform any operation on S3 you must provide AWS credentials. The
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. The application will read them automatically. For more information, as well as other
ways to provide credentials, see [Using credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html)

### S3 Set up
To be able to perform any operation on S3a you must provide AWS credentials. The easiest way to do so is to set environment variables
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. The application will read them automatically. For more information, as well as other
ways to provide credentials, see [Using credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html)
Additionally to set provider endpoint, environment variable `AWS_ENDPOINT_URL` has to be set

### HDFS Set up
To be able to perform any operation on HDFS you must set environment variable `HADOOP_CONF_DIR`.

Expand Down
33 changes: 13 additions & 20 deletions src/main/scala/za/co/absa/spark_metadata_tool/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,9 @@ import org.log4s.Logger
import software.amazon.awssdk.services.s3.S3Client
import za.co.absa.spark_metadata_tool.LoggingImplicits._
import za.co.absa.spark_metadata_tool.io.{FileManager, HdfsFileManager, S3FileManager, UnixFileManager}
import za.co.absa.spark_metadata_tool.model.{
AppConfig,
AppError,
AppErrorWithThrowable,
CompareMetadataWithData,
CreateMetadata,
FixPaths,
Hdfs,
InitializationError,
Merge,
NotFoundError,
S3,
SinkFileStatus,
TargetFilesystem,
Unix,
UnknownError
}
import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, AppErrorWithThrowable, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, NotFoundError, S3, S3a, SinkFileStatus, TargetFilesystem, Unix, UnknownError}

import java.net.URI
import scala.util.Try
import scala.util.chaining._

Expand Down Expand Up @@ -223,8 +208,15 @@ object Application extends App {
} yield ()).tap(_.logInfo(s"Done processing file ${path.toString}"))

def initS3(): Either[AppError, S3Client] = Try {
S3Client.builder().build()
}.toEither.leftMap(err => InitializationError("Failed to initialize S3 Client", err.some))
//This is done because aws sdk does not support overriding aws endpoint url via env variable:
//https://docs.aws.amazon.com/sdkref/latest/guide/settings-reference.html#EVarSettings
//https://github.com/aws/aws-sdk-java-v2/issues/4501
val endpoint = System.getenv("AWS_ENDPOINT_URL")
val builder = S3Client.builder()
if (endpoint.nonEmpty) builder.endpointOverride(new URI(endpoint))

builder.build()
}.toEither.leftMap(err => InitializationError("Failed to initialize S3A Client", err.some))

def initHdfs(): Either[AppError, FileSystem] = Try {
val hadoopConfDir = sys.env("HADOOP_CONF_DIR")
Expand All @@ -242,7 +234,8 @@ object Application extends App {
(fs match {
case Unix => UnixFileManager.asRight
case Hdfs => initHdfs().map(hdfs => HdfsFileManager(hdfs))
case S3 => initS3().map(client => S3FileManager(client))
case S3 => initS3().map(client => S3FileManager(client, "s3"))
case S3a => initS3().map(client => S3FileManager(client, "s3a"))
}).tap(fm => logger.debug(s"Initialized file manager : $fm"))

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,7 @@ import org.apache.log4j.PatternLayout
import org.log4s.Logger
import scopt.OParser
import za.co.absa.spark_metadata_tool.LoggingImplicits._
import za.co.absa.spark_metadata_tool.model.AppConfig
import za.co.absa.spark_metadata_tool.model.AppError
import za.co.absa.spark_metadata_tool.model.CompareMetadataWithData
import za.co.absa.spark_metadata_tool.model.FixPaths
import za.co.absa.spark_metadata_tool.model.Hdfs
import za.co.absa.spark_metadata_tool.model.InitializationError
import za.co.absa.spark_metadata_tool.model.Merge
import za.co.absa.spark_metadata_tool.model.ParsingError
import za.co.absa.spark_metadata_tool.model.S3
import za.co.absa.spark_metadata_tool.model.TargetFilesystem
import za.co.absa.spark_metadata_tool.model.Unix
import za.co.absa.spark_metadata_tool.model.UnknownFileSystemError
import za.co.absa.spark_metadata_tool.model.CreateMetadata
import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, ParsingError, S3, S3a, TargetFilesystem, Unix, UnknownFileSystemError}

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
Expand Down Expand Up @@ -182,6 +170,7 @@ object ArgumentParser {
case _ if path.startsWith("/") => Unix.asRight
case _ if path.startsWith("hdfs://") => Hdfs.asRight
case _ if path.startsWith("s3://") => S3.asRight
case _ if path.startsWith("s3a://") => S3a.asRight
case _ =>
UnknownFileSystemError(
s"Couldn't extract filesystem from path $path"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import scala.util.Try
import scala.util.Using
import scala.util.chaining._

case class S3FileManager(s3: S3Client) extends FileManager {
case class S3FileManager(s3: S3Client, scheme: String) extends FileManager {
import S3FileManager._
implicit private val logger: Logger = org.log4s.getLogger

Expand Down Expand Up @@ -124,7 +124,7 @@ case class S3FileManager(s3: S3Client) extends FileManager {
val bucket = getBucket(baseDir)
val prefix = ensureTrailingSlash(getKey(baseDir))

val builder = new URIBuilder().setScheme("s3").setHost(bucket)
val builder = new URIBuilder().setScheme(scheme).setHost(bucket)

val request = ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).build()

Expand Down Expand Up @@ -171,7 +171,7 @@ case class S3FileManager(s3: S3Client) extends FileManager {

private def listBucket(path: Path, filter: FileType): Either[IoError, Seq[Path]] = Try {
val bucket = getBucket(path)
val pathPrefix = s"s3://$bucket/"
val pathPrefix = s"$scheme://$bucket/"
val rootKey = path.toString.stripPrefix(pathPrefix)

val req = ListObjectsV2Request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ case object Hdfs extends TargetFilesystem {
case object S3 extends TargetFilesystem {
override def pathPrefix: String = "s3://"
}
case object S3a extends TargetFilesystem {
override def pathPrefix: String = "s3a://"
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class S3FileManagerSpec extends AnyFlatSpec with Matchers with OptionValues with

private val s3 = mock[S3Client]

private val io = S3FileManager(s3)
private val io = S3FileManager(s3, "s3")

private val TMinus10 = Instant.now().minus(Duration.ofMinutes(10))

Expand Down