Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Release (Fork)

on:
push:
tags:
- 'v*'
- 'dshdfs-*'

permissions:
contents: write

jobs:
release:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: false

- name: Set up JDK 8
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: '8'

- name: Build Delta Sharing Server (universal)
run: |
./build/sbt server/universal:packageBin
ls -lah server/target/universal || true

- name: Create GitHub Release
uses: softprops/action-gh-release@v1
with:
files: |
server/target/universal/delta-sharing-server-*.zip
draft: false
prerelease: false
fail_on_unmatched_files: false

2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ lazy val server = (project in file("server")) enablePlugins(JavaAppPackaging) se
ExclusionRule("com.fasterxml.jackson.module"),
ExclusionRule("com.google.guava", "guava")
),
// JWT signing for HDFS signer (Ed25519)
"org.bitbucket.b_c" % "jose4j" % "0.9.6",
"io.delta" %% "delta-standalone" % "3.2.0" % "provided" excludeAll(
ExclusionRule("com.fasterxml.jackson.core"),
ExclusionRule("com.fasterxml.jackson.module"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ class DeltaSharedTableKernel(
AbfsFileSigner(abfs, dataPath, preSignedUrlTimeoutSeconds)
case gc: GoogleHadoopFileSystem =>
new GCSFileSigner(dataPath, conf, preSignedUrlTimeoutSeconds)
case _: org.apache.hadoop.hdfs.web.WebHdfsFileSystem =>
new io.delta.sharing.server.common.HdfsFileSigner(preSignedUrlTimeoutSeconds)
case _: org.apache.hadoop.hdfs.web.SWebHdfsFileSystem =>
new io.delta.sharing.server.common.HdfsFileSigner(preSignedUrlTimeoutSeconds)
case _: org.apache.hadoop.hdfs.DistributedFileSystem =>
new io.delta.sharing.server.common.HdfsFileSigner(preSignedUrlTimeoutSeconds)
case _ =>
throw new IllegalStateException(s"File system ${fs.getClass} is not supported")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.commons.io.FileUtils
import org.slf4j.LoggerFactory
import scalapb.json4s.Printer

import io.delta.sharing.server.common.JsonUtils
import io.delta.sharing.server.common.{HdfsFileSigner, JsonUtils}
import io.delta.sharing.server.config.ServerConfig
import io.delta.sharing.server.model.{QueryStatus, SingleAction}
import io.delta.sharing.server.protocol._
Expand Down Expand Up @@ -684,6 +684,10 @@ object DeltaSharingService {

def start(serverConfig: ServerConfig): Server = {
lazy val server = {
// Configure HDFS signer if provided via YAML
if (serverConfig.getHdfsSigner != null) {
HdfsFileSigner.configureFrom(serverConfig.getHdfsSigner)
}
updateDefaultJsonPrinterForScalaPbConverterUtil()
val builder = Server.builder()
.defaultHostname(serverConfig.getHost)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// HDFS content-server signer: creates short-lived JWT for the content server
package io.delta.sharing.server.common

import java.net.URLEncoder
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.security.PrivateKey
import java.time.Instant

import org.apache.hadoop.fs.Path
import org.jose4j.jwa.AlgorithmConstraints
import org.jose4j.jwa.AlgorithmConstraints.ConstraintType
import org.jose4j.jws.AlgorithmIdentifiers
import org.jose4j.jws.JsonWebSignature
import org.jose4j.jwt.{JwtClaims, NumericDate}
import org.jose4j.keys.EdDsaKeyUtil

import io.delta.sharing.server.config.HdfsSignerConfig

/**
* Produces PreSignedUrl values that point to a Content Server `/get?token=...` endpoint.
*
* Required configuration (env or system properties):
* - CONTENT_SERVER_BASE or -Dcontent.server.base (e.g., https://content.example.com)
* - SIGNING_PRIVATE_KEY or -Dsigning.key.path (PEM file path)
* Optional:
* - SIGNING_AUDIENCE or -Dsigning.aud
* - SIGNING_KID or -Dsigning.kid
*/
class HdfsFileSigner(preSignedUrlTimeoutSeconds: Long) extends CloudFileSigner {
private val resolved: (String, PrivateKey, Option[String], Option[String]) = {
HdfsFileSigner.config match {
case Some(cfg) =>
val pem = new String(
Files.readAllBytes(Paths.get(cfg.getSigningPrivateKeyFile)),
StandardCharsets.UTF_8)
val key = new EdDsaKeyUtil().fromPemEncoded(pem).asInstanceOf[PrivateKey]
(cfg.getContentServerBase,
key,
Option(cfg.getAudience),
Option(cfg.getKid))
case None =>
val contentBase = propOrEnv("CONTENT_SERVER_BASE", "content.server.base")
.getOrElse("http://localhost:8443")
val privateKeyPemPath = propOrEnv("SIGNING_PRIVATE_KEY", "signing.key.path")
.getOrElse(throw new IllegalStateException(
"Missing signing key path: SIGNING_PRIVATE_KEY or -Dsigning.key.path"))
val pem = new String(
Files.readAllBytes(Paths.get(privateKeyPemPath)),
StandardCharsets.UTF_8)
val audience = propOrEnv("SIGNING_AUDIENCE", "signing.aud")
val kid = propOrEnv("SIGNING_KID", "signing.kid")
val key = new EdDsaKeyUtil().fromPemEncoded(pem).asInstanceOf[PrivateKey]
(contentBase, key, audience, kid)
}
}

override def sign(path: Path): PreSignedUrl = {
val hdfsPath = Option(path).map(_.toUri.getPath).getOrElse("/")
val now = Instant.now
val exp = now.plusSeconds(preSignedUrlTimeoutSeconds)

val claims = new JwtClaims()
claims.setExpirationTime(NumericDate.fromSeconds(exp.getEpochSecond))
resolved._3.foreach(claims.setAudience)
claims.setGeneratedJwtId()
claims.setClaim("hdfs_path", hdfsPath)

val jws = new JsonWebSignature()
jws.setPayload(claims.toJson)
jws.setAlgorithmHeaderValue(AlgorithmIdentifiers.EDDSA)
resolved._4.foreach(jws.setKeyIdHeaderValue)
jws.setKey(resolved._2)
jws.setHeader("typ", "JWT")
jws.setDoKeyValidation(false)
jws.setAlgorithmConstraints(
new AlgorithmConstraints(
ConstraintType.WHITELIST,
AlgorithmIdentifiers.EDDSA))

val jwt = jws.getCompactSerialization
val url = s"${resolved._1.stripSuffix("/")}/get?token=" +
URLEncoder.encode(jwt, "UTF-8")
new PreSignedUrl(url, System.currentTimeMillis() + preSignedUrlTimeoutSeconds * 1000L)
}

private def propOrEnv(env: String, prop: String): Option[String] = {
Option(System.getProperty(prop))
.filter(_.nonEmpty)
.orElse(Option(System.getenv(env)).filter(_.nonEmpty))
}
}

object HdfsFileSigner {
@volatile private var _config: Option[HdfsSignerConfig] = None
def configureFrom(conf: HdfsSignerConfig): Unit = { _config = Option(conf) }
def config: Option[HdfsSignerConfig] = _config
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ case class ServerConfig(
@BeanProperty var version: java.lang.Integer,
@BeanProperty var shares: java.util.List[ShareConfig],
@BeanProperty var authorization: Authorization,
@BeanProperty var hdfsSigner: HdfsSignerConfig,
@BeanProperty var ssl: SSLConfig,
@BeanProperty var host: String,
@BeanProperty var port: Int,
Expand Down Expand Up @@ -75,6 +76,7 @@ case class ServerConfig(
version = null,
shares = Collections.emptyList(),
authorization = null,
hdfsSigner = null,
ssl = null,
host = "localhost",
port = 80,
Expand Down Expand Up @@ -116,6 +118,9 @@ case class ServerConfig(
if (authorization != null) {
authorization.checkConfig()
}
if (hdfsSigner != null) {
hdfsSigner.checkConfig()
}
if (ssl != null) {
ssl.checkConfig()
}
Expand Down Expand Up @@ -250,3 +255,23 @@ case class TableConfig(
}
}
}

case class HdfsSignerConfig(
@BeanProperty var contentServerBase: String,
@BeanProperty var signingPrivateKeyFile: String,
@BeanProperty var audience: String,
@BeanProperty var kid: String) extends ConfigItem {

def this() {
this(null, null, null, null)
}

override def checkConfig(): Unit = {
if (contentServerBase == null) {
throw new IllegalArgumentException("'contentServerBase' in 'hdfsSigner' must be provided")
}
if (signingPrivateKeyFile == null) {
throw new IllegalArgumentException("'signingPrivateKeyFile' in 'hdfsSigner' must be provided")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ class DeltaSharedTable(
AbfsFileSigner(abfs, deltaLog.dataPath.toUri, preSignedUrlTimeoutSeconds)
case gc: GoogleHadoopFileSystem =>
new GCSFileSigner(deltaLog.dataPath.toUri, conf, preSignedUrlTimeoutSeconds)
case _: org.apache.hadoop.hdfs.web.WebHdfsFileSystem =>
new io.delta.sharing.server.common.HdfsFileSigner(preSignedUrlTimeoutSeconds)
case _: org.apache.hadoop.hdfs.web.SWebHdfsFileSystem =>
new io.delta.sharing.server.common.HdfsFileSigner(preSignedUrlTimeoutSeconds)
case _: org.apache.hadoop.hdfs.DistributedFileSystem =>
new io.delta.sharing.server.common.HdfsFileSigner(preSignedUrlTimeoutSeconds)
case _ =>
throw new IllegalStateException(s"File system ${fs.getClass} is not supported")
}
Expand Down
Loading