Skip to content
This repository was archived by the owner on Dec 15, 2023. It is now read-only.

Commit 80a6233

Browse files
authored
Merge pull request #1 from Azure-Samples/MergeFromInternal
Initial merge from internal sample
2 parents a507f6f + 7af3c19 commit 80a6233

File tree

8 files changed

+451
-51
lines changed

8 files changed

+451
-51
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
1+
# Generated Class files/logs
12
*.class
23
*.log
4+
# Ignore files generated by IntelliJ
5+
*.iml

README.md

Lines changed: 102 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,108 @@
1-
# Project Name
1+
# Azure Cosmos DB Cassandra API - Datastax Spark Connector Sample
2+
This maven project provides samples and best practices for using the [DataStax Spark Cassandra Connector](https://github.com/datastax/spark-cassandra-connector) against [Azure Cosmos DB's Cassandra API](https://docs.microsoft.com/azure/cosmos-db/cassandra-introduction).
3+
For the purposes of providing an end-to-end sample, we've made use of an [Azure HDI Spark Cluster](https://docs.microsoft.com/azure/hdinsight/spark/apache-spark-jupyter-spark-sql) to run the spark jobs provided in the example.
4+
All samples provided are in scala, built with maven.
25

3-
(short, 1-3 sentenced, description of the project)
6+
*Note - this sample is configured against the 2.0.6 version of the spark connector.*
47

5-
## Features
6-
7-
This project framework provides the following features:
8-
9-
* Feature 1
10-
* Feature 2
11-
* ...
12-
13-
## Getting Started
8+
## Running this Sample
149

1510
### Prerequisites
16-
17-
(ideally very short, if any)
18-
19-
- OS
20-
- Library version
21-
- ...
22-
23-
### Installation
24-
25-
(ideally very short)
26-
27-
- npm install [package name]
28-
- mvn install
29-
- ...
30-
31-
### Quickstart
32-
(Add steps to get up and running quickly)
33-
34-
1. git clone [repository clone url]
35-
2. cd [respository name]
36-
3. ...
37-
38-
39-
## Demo
40-
41-
A demo app is included to show how to use the project.
42-
43-
To run the demo, follow these steps:
44-
45-
(Add steps to start up the demo)
46-
47-
1.
48-
2.
49-
3.
11+
- Cosmos DB Account configured with Cassandra API
12+
- Spark Cluster
13+
14+
# Quick Start
15+
Information regarding submitting spark jobs is not covered as part of this sample, please refer to Apache Spark's [documentation](https://spark.apache.org/docs/latest/submitting-applications.html).
16+
In order run this sample, correctly configure the sample to your cluster(as discussed below), build the project, generate the required jar(s), and then submit the job to your spark cluster.
17+
18+
## Cassandra API Connection Parameters
19+
In order for your spark jobs to connect with Cosmos DB's Cassandra API, you must set the following configurations:
20+
21+
*Note - all these values can be found on the ["Connection String" blade](https://docs.microsoft.com/azure/cosmos-db/manage-account#keys) of your CosmosDB Account*
22+
23+
<table class="table">
24+
<tr><th>Property Name</th><th>Value</th></tr>
25+
<tr>
26+
<td><code>spark.cassandra.connection.host</code></td>
27+
<td>Your Cassandra Endpoint: <code>ACOUNT_NAME.cassandra.cosmosdb.azure.com</code></td>
28+
</tr>
29+
<tr>
30+
<td><code>spark.cassandra.connection.port</code></td>
31+
<td><code>10350</code></td>
32+
</tr>
33+
<tr>
34+
<td><code>spark.cassandra.connection.ssl.enabled</code></td>
35+
<td><code>true</code></td>
36+
</tr>
37+
<tr>
38+
<td><code>spark.cassandra.auth.username</code></td>
39+
<td><code>COSMOSDB_ACCOUNTNAME</code></td>
40+
</tr>
41+
<tr>
42+
<td><code>spark.cassandra.auth.password</code></td>
43+
<td><code>COSMOSDB_KEY</code></td>
44+
</tr>
45+
</table>
46+
47+
## Configurations for Throughput optimization
48+
Because Cosmos DB follows a provisioned throughput model, it is important to tune the relevant configurations of the connector to optimize for this model.
49+
General information regarding these configurations can be found on the [Configuration Reference](https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md) page of the DataStax Spark Cassandra Connector github repository.
50+
<table class="table">
51+
<tr><th>Property Name</th><th>Description</th></tr>
52+
<tr>
53+
<td><code>spark.cassandra.output.batch.size.rows</code></td>
54+
<td>Leave this to <code>1</code>. This is prefered for Cosmos DB's provisioning model in order to achieve higher throughput for heavy workloads.</td>
55+
</tr>
56+
<tr>
57+
<td><code>spark.cassandra.connection.connections_per_executor_max</code></td>
58+
<td><code>10*n</code><br/><br/>Which would be equivalent to 10 connections per node in an n-node Cassandra cluster. Hence if you require 5 connections per node per executor for a 5 node Cassandra cluster, then you would need to set this configuration to 25.<br/>(Modify based on the degree of parallelism/number of executors that your spark job are configured for)</td>
59+
</tr>
60+
<tr>
61+
<td><code>spark.cassandra.output.concurrent.writes</code></td>
62+
<td><code>100</code><br/><br/>Defines the number of parallel writes that can occur per executor. As batch.size.rows is <code>1</code>, make sure to scale up this value accordingly. (Modify this based on the degree of parallelism/throughput that you want to achieve for your workload)</td>
63+
</tr>
64+
<tr>
65+
<td><code>spark.cassandra.concurrent.reads</code></td>
66+
<td><code>512</code><br /><br />Defines the number of parallel reads that can occur per executor. (Modify this based on the degree of parallelism/throughput that you want to achieve for your workload)</td>
67+
</tr>
68+
<tr>
69+
<td><code>spark.cassandra.output.throughput_mb_per_sec</code></td>
70+
<td>Defines the total write throughput per executor. This can be used as an upper cap for your spark job throughput, and base it on the provisioned throughput of your Cosmos DB Collection.</td>
71+
</tr>
72+
<tr>
73+
<td><code>spark.cassandra.input.reads_per_sec</code></td>
74+
<td>Defines the total read throughput per executor. This can be used as an upper cap for your spark job throughput, and base it on the provisioned throughput of your Cosmos DB Collection.</td>
75+
</tr>
76+
<tr>
77+
<td><code>spark.cassandra.output.batch.grouping.buffer.size</code></td>
78+
<td>1000</td>
79+
</tr>
80+
<tr>
81+
<td><code>spark.cassandra.connection.keep_alive_ms</code></td>
82+
<td>60000</td>
83+
</tr>
84+
</table>
85+
86+
Regarding throughput and degree of parallelism, it is important to tune the relevant parameters based on the amount of load you expect your upstream/downstream flows to be, the executors provisioned for your spark jobs, and the throughput you have provisioned for your Cosmos DB account.
87+
88+
## Connection Factory Configuration and Retry Policy
89+
As part of this sample, we have provided a connection factory and custom retry policy for Cosmos DB. We need a custom connection factory as that is the only way to configure a retry policy on the connector - [SPARKC-437](https://datastax-oss.atlassian.net/browse/SPARKC-437).
90+
* <code>CosmosDbConnectionFactory.scala</code>
91+
* <code>CosmosDbMultipleRetryPolicy.scala</code>
92+
93+
### Retry Policy
94+
The retry policy for Cosmos DB is configured to handle http status code 429 - Request Rate Large exceptions. The Cosmos Db Cassandra API, translates these exceptions to overloaded errors on the Cassandra native protocol, which we want to retry with back-offs.
95+
The reason for doing so is because Cosmos DB follows a provisioned throughput model, and having this retry policy would protect your spark jobs against spikes of data ingress/egress that would momentarily exceed the allocated throughput for your collection, resulting in the request rate limiting exceptions.
96+
97+
*Note - that this retry policy is meant to only protect your spark jobs against momentary spikes. If you have not configured enough RUs on your collection for the intended throughput of your workload such that the retries don't catch up, then the retry policy will result in rethrows.*
98+
99+
## Known Issues
100+
101+
### Tokens and Token Range Filters
102+
We do not currently support methods that make use of Tokens for filtering data. Hence please avoid using any APIs that perform table scans.
50103

51104
## Resources
52-
53-
(Any additional resources or related projects)
54-
55-
- Link to supporting information
56-
- Link to similar sample
57-
- ...
105+
- [DataStax Spark Cassandra Connector](https://github.com/datastax/spark-cassandra-connector)
106+
- [CosmosDB Cassandra API](https://docs.microsoft.com/en-us/azure/cosmos-db/cassandra-introduction)
107+
- [Apache Spark](https://spark.apache.org/docs/latest/index.html)
108+
- [HDI Spark Cluster](https://docs.microsoft.com/en-us/azure/hdinsight/spark/apache-spark-jupyter-spark-sql)

pom.xml

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.microsoft.azure.cosmosdb</groupId>
8+
<artifactId>spark-connector-sample</artifactId>
9+
<version>1.0.0-SNAPSHOT</version>
10+
11+
<licenses>
12+
<license>
13+
<name>MIT License</name>
14+
<url>http://www.opensource.org/licenses/mit-license.php</url>
15+
</license>
16+
</licenses>
17+
18+
<dependencies>
19+
<dependency>
20+
<groupId>org.apache.spark</groupId>
21+
<artifactId>spark-core_2.11</artifactId>
22+
<version>2.1.0</version>
23+
</dependency>
24+
<dependency>
25+
<groupId>org.apache.spark</groupId>
26+
<artifactId>spark-sql_2.11</artifactId>
27+
<version>2.1.0</version>
28+
</dependency>
29+
<dependency>
30+
<groupId>org.apache.spark</groupId>
31+
<artifactId>spark-streaming_2.11</artifactId>
32+
<version>2.1.0</version>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.apache.spark</groupId>
36+
<artifactId>spark-mllib_2.11</artifactId>
37+
<version>2.1.0</version>
38+
</dependency>
39+
<dependency>
40+
<groupId>com.datastax.spark</groupId>
41+
<artifactId>spark-cassandra-connector_2.11</artifactId>
42+
<version>2.0.6</version>
43+
</dependency>
44+
</dependencies>
45+
46+
<build>
47+
<plugins>
48+
<plugin>
49+
<groupId>net.alchim31.maven</groupId>
50+
<artifactId>scala-maven-plugin</artifactId>
51+
<version>3.2.2</version>
52+
</plugin>
53+
<plugin>
54+
<artifactId>maven-compiler-plugin</artifactId>
55+
<version>3.3</version>
56+
<configuration>
57+
<source>1.6</source>
58+
<target>1.6</target>
59+
</configuration>
60+
</plugin>
61+
</plugins>
62+
</build>
63+
</project>
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Manifest-Version: 1.0
2+
Main-Class: com.microsoft.azure.cosmosdb.cassandra.SampleCosmosDBApp
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Set everything to be logged to the console
2+
log4j.rootCategory=INFO, console
3+
log4j.appender.console=org.apache.log4j.ConsoleAppender
4+
log4j.appender.console.target=System.err
5+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
6+
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
7+
8+
# Set the default spark-shell log level to WARN. When running the spark-shell, the
9+
# log level for this class is used to overwrite the root logger's log level, so that
10+
# the user can have different defaults for the shell and regular Spark apps.
11+
log4j.logger.org.apache.spark.repl.Main=WARN
12+
13+
# Settings to quiet third party logs that are too verbose
14+
log4j.logger.org.spark_project.jetty=WARN
15+
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
16+
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
17+
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
18+
19+
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
20+
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
21+
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
22+
23+
# Parquet related logging
24+
log4j.logger.org.apache.parquet.CorruptStatistics=ERROR
25+
log4j.logger.parquet.CorruptStatistics=ERROR
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/**
2+
* <copyright file="CosmosDbConnectionFactory.scala" company="Microsoft">
3+
* Copyright (c) Microsoft. All rights reserved.
4+
* </copyright>
5+
*/
6+
package com.microsoft.azure.cosmosdb.cassandra
7+
8+
import java.nio.file.{Files, Path, Paths}
9+
import java.security.{KeyStore, SecureRandom}
10+
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
11+
12+
import com.datastax.driver.core._
13+
import com.datastax.driver.core.policies.ExponentialReconnectionPolicy
14+
import com.datastax.spark.connector.cql.CassandraConnectorConf.CassandraSSLConf
15+
import com.datastax.spark.connector.cql._
16+
import org.apache.commons.io.IOUtils
17+
18+
object CosmosDbConnectionFactory extends CassandraConnectionFactory {
19+
20+
/** Returns the Cluster.Builder object used to setup Cluster instance. */
21+
def clusterBuilder(conf: CassandraConnectorConf): Cluster.Builder = {
22+
val options = new SocketOptions()
23+
.setConnectTimeoutMillis(conf.connectTimeoutMillis)
24+
.setReadTimeoutMillis(conf.readTimeoutMillis)
25+
26+
val builder = Cluster.builder()
27+
.addContactPoints(conf.hosts.toSeq: _*)
28+
.withPort(conf.port)
29+
/**
30+
* Make use of the custom RetryPolicy for Cosmos DB. This Is needed for retrying scenarios specific to Cosmos DB.
31+
* Please refer to the "Retry Policy" section of the README.md for more information regarding this.
32+
*/
33+
.withRetryPolicy(
34+
new CosmosDbMultipleRetryPolicy(conf.queryRetryCount))
35+
.withReconnectionPolicy(
36+
new ExponentialReconnectionPolicy(conf.minReconnectionDelayMillis, conf.maxReconnectionDelayMillis))
37+
.withLoadBalancingPolicy(
38+
new LocalNodeFirstLoadBalancingPolicy(conf.hosts, conf.localDC))
39+
.withAuthProvider(conf.authConf.authProvider)
40+
.withSocketOptions(options)
41+
.withCompression(conf.compression)
42+
.withQueryOptions(
43+
new QueryOptions()
44+
.setRefreshNodeIntervalMillis(0)
45+
.setRefreshNodeListIntervalMillis(0)
46+
.setRefreshSchemaIntervalMillis(0))
47+
48+
if (conf.cassandraSSLConf.enabled) {
49+
maybeCreateSSLOptions(conf.cassandraSSLConf) match {
50+
case Some(sslOptions) builder.withSSL(sslOptions)
51+
case None builder.withSSL()
52+
}
53+
} else {
54+
builder
55+
}
56+
}
57+
58+
private def getKeyStore(
59+
ksType: String,
60+
ksPassword: Option[String],
61+
ksPath: Option[Path]): Option[KeyStore] = {
62+
63+
ksPath match {
64+
case Some(path) =>
65+
val ksIn = Files.newInputStream(path)
66+
try {
67+
val keyStore = KeyStore.getInstance(ksType)
68+
keyStore.load(ksIn, ksPassword.map(_.toCharArray).orNull)
69+
Some(keyStore)
70+
} finally {
71+
IOUtils.closeQuietly(ksIn)
72+
}
73+
case None => None
74+
}
75+
}
76+
77+
private def maybeCreateSSLOptions(conf: CassandraSSLConf): Option[SSLOptions] = {
78+
lazy val trustStore =
79+
getKeyStore(conf.trustStoreType, conf.trustStorePassword, conf.trustStorePath.map(Paths.get(_)))
80+
lazy val keyStore =
81+
getKeyStore(conf.keyStoreType, conf.keyStorePassword, conf.keyStorePath.map(Paths.get(_)))
82+
83+
if (conf.enabled) {
84+
val trustManagerFactory = for (ts <- trustStore) yield {
85+
val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
86+
tmf.init(ts)
87+
tmf
88+
}
89+
90+
val keyManagerFactory = if (conf.clientAuthEnabled) {
91+
for (ks <- keyStore) yield {
92+
val kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
93+
kmf.init(ks, conf.keyStorePassword.map(_.toCharArray).orNull)
94+
kmf
95+
}
96+
} else {
97+
None
98+
}
99+
100+
val context = SSLContext.getInstance(conf.protocol)
101+
context.init(
102+
keyManagerFactory.map(_.getKeyManagers).orNull,
103+
trustManagerFactory.map(_.getTrustManagers).orNull,
104+
new SecureRandom)
105+
106+
Some(
107+
JdkSSLOptions.builder()
108+
.withSSLContext(context)
109+
.withCipherSuites(conf.enabledAlgorithms.toArray)
110+
.build())
111+
} else {
112+
None
113+
}
114+
}
115+
116+
/** Creates and configures the Cassandra connection */
117+
override def createCluster(conf: CassandraConnectorConf): Cluster = {
118+
clusterBuilder(conf).build()
119+
}
120+
121+
}

0 commit comments

Comments
 (0)