Skip to content
Gustavo De Micheli edited this page Dec 23, 2024 · 1 revision

Helenus integrates with ZIO. This integration takes a slightly different form as the others. ZIO is a FP library that provides strong type-safe warranties, whereas integrations like Akka or Pekko relied more on throwing exceptions when needed (as the core library and the driver did).

For this, most of the useful abstractions are exposed through the ZIO monad. Beware if you operate a type that isn't wrapped with it, and use ZIO.attempt when needed.

Installation

Include the library into you project definition:

libraryDependencies += "net.nmoncho" %% "helenus-zio" % "1.8.1"

This integration requires ZIO and ZIO Streams in the classpath.

Setup

To integrate against ZIO first we need to create a ZLayer:

import zio._
import net.nmoncho.helenus.zio._

val layer: ZIO[Scope, CassandraException, ZCqlSession] = ZCqlSession.open(
    ZDefaultCqlSession(host = "localhost", port = 9142)
)

Queries and Statements

Helenus defines several extension methods to create and prepare queries, in both synchronous and asynchronous fashion:

import scala.util.Try
import com.datastax.oss.driver.api.core.cql.Row

val hotelsByName: ZScalaPreparedStatement1[String, Row] = "SELECT * FROM hotels WHERE name = ?".toZCQL.prepare[String]

val resultSet: ZPagingIterable[Try[Row]] = hotelsByName.execute("Rotterdam Marriott")

val result: ZIO[ZCqlSession, CassandraException, Option[Row]] = resultSet.oneOption

Let's take this step by step:

  • Queries are defined as String literals with a CQL syntax.
  • These have to be extended with the .toZCQL method.
  • Then by using prepare we define how many parameters the statement has and what types does those parameters have.
  • We can decide whenever we want to execute these BoundStatements, which returns a ResultSet.
  • Finally, we also have extension methods on ResultSet to extract Rows from them.

After a statement is prepared, Helenus will check at runtime if the provided parameters types match what the database is expecting. If these don't match, a warning will be logged.

Queries as Interpolated Strings

We can define queries as Interpolated Strings, using the zcql prefix:

val hotel = "Rotterdam Marriott"

val iHotelsByName: ZWrappedBoundStatement[Row] = zcql"SELECT * FROM hotels WHERE name = $hotel"

val iResultSet: ZPagingIterable[Try[Row]] = iHotelsByName.execute()

val iResult: ZIO[ZCqlSession, CassandraException, Option[Row]] = resultSet.oneOption

After we interpolate at statement, we'll receive a ScalaBoundStatement, which we can execute when we see fit.

Interpolated values will be replaced with Named Bind Parameters, making sure the query has been sanitized.

Queries as Streams

We can also define queries as streams, where ZStream is used to read from Cassandra, and ZSink is used to write to it.

val hotelsStreams = "SELECT * FROM hotels".toZCQL.prepareUnit.stream()
// hotelsStreams: ZCqlStream[Try[Row]] = zio.stream.ZStream@6d6fb0b4

val hotelsSink = "INSERT INTO hotels() VALUES (?, ?, ?)".toZCQL.prepare[String, String, String].sink()
// hotelsSink: stream.ZSink[ZCqlSession, Throwable, (String, String, String), Nothing, Unit] = zio.stream.ZSink@9f465ab1

Extracting and Iterating Results

Extracting and iterating results work in a similar fashion to other integrations.

The only small difference is that to map results from Rows to tuples or case classes we have to use the to method instead of the as method, since the latter is already taken by ZIO.

We can use nextOption and oneOption to take a look at the first result on a result set. The former allows for a simpler iteration, although inconvenient.

If the result set is known to be small, the to method can be used to convert it to a Scala collection.

import net.nmoncho.helenus.api.RowMapper

case class Hotel(name: String, country: String, capacity: Int, occupancy: Int)

implicit val hotelRowMapper: RowMapper[Hotel] = RowMapper[Hotel] // Derive RowMapper

val mHotelsByName: ZScalaPreparedStatement1[String, Hotel] =
    "SELECT * FROM hotels WHERE name = ?".toZCQL.prepare[String].to[Hotel]

val mResult: ZIO[ZCqlSession, CassandraException, List[Hotel]] =
    mHotelsByName.execute("Rotterdam Marriott").to(List)

In the case where queries are executed as Streams, we can use the streamValidated method to make sure the entire Chunk can be mapped properly:

val hotelsStreamsValidated = "SELECT * FROM hotels".toZCQL.prepareUnit.to[Hotel].streamValidated()
// hotelsStreamsValidated: ZCqlStream[Hotel] = zio.stream.ZStream@4a779baa

Clone this wiki locally