-
Notifications
You must be signed in to change notification settings - Fork 1
ZIO
Helenus integrates with ZIO. This integration takes a slightly different form as the others. ZIO is a FP library that provides strong type-safe warranties, whereas integrations like Akka or Pekko relied more on throwing exceptions when needed (as the core library and the driver did).
For this, most of the useful abstractions are exposed through the ZIO monad. Beware
if you operate a type that isn't wrapped with it, and use ZIO.attempt when needed.
Include the library into you project definition:
libraryDependencies += "net.nmoncho" %% "helenus-zio" % "1.8.1"This integration requires ZIO and ZIO Streams in the classpath.
To integrate against ZIO first we need to create a ZLayer:
import zio._
import net.nmoncho.helenus.zio._
val layer: ZIO[Scope, CassandraException, ZCqlSession] = ZCqlSession.open(
ZDefaultCqlSession(host = "localhost", port = 9142)
)Helenus defines several extension methods to create and prepare queries, in both synchronous and asynchronous fashion:
import scala.util.Try
import com.datastax.oss.driver.api.core.cql.Row
val hotelsByName: ZScalaPreparedStatement1[String, Row] = "SELECT * FROM hotels WHERE name = ?".toZCQL.prepare[String]
val resultSet: ZPagingIterable[Try[Row]] = hotelsByName.execute("Rotterdam Marriott")
val result: ZIO[ZCqlSession, CassandraException, Option[Row]] = resultSet.oneOptionLet's take this step by step:
- Queries are defined as String literals with a CQL syntax.
- These have to be extended with the
.toZCQLmethod. - Then by using
preparewe define how many parameters the statement has and what types does those parameters have. - We can decide whenever we want to
executetheseBoundStatements, which returns aResultSet. - Finally, we also have extension methods on ResultSet to extract
Rows from them.
After a statement is prepared, Helenus will check at runtime if the provided parameters types match what the database is expecting. If these don't match, a warning will be logged.
We can define queries as Interpolated Strings, using the zcql prefix:
val hotel = "Rotterdam Marriott"
val iHotelsByName: ZWrappedBoundStatement[Row] = zcql"SELECT * FROM hotels WHERE name = $hotel"
val iResultSet: ZPagingIterable[Try[Row]] = iHotelsByName.execute()
val iResult: ZIO[ZCqlSession, CassandraException, Option[Row]] = resultSet.oneOptionAfter we interpolate at statement, we'll receive a ScalaBoundStatement, which
we can execute when we see fit.
Interpolated values will be replaced with Named Bind Parameters, making sure the query has been sanitized.
We can also define queries as streams, where ZStream is used to read from Cassandra,
and ZSink is used to write to it.
val hotelsStreams = "SELECT * FROM hotels".toZCQL.prepareUnit.stream()
// hotelsStreams: ZCqlStream[Try[Row]] = zio.stream.ZStream@6d6fb0b4
val hotelsSink = "INSERT INTO hotels() VALUES (?, ?, ?)".toZCQL.prepare[String, String, String].sink()
// hotelsSink: stream.ZSink[ZCqlSession, Throwable, (String, String, String), Nothing, Unit] = zio.stream.ZSink@9f465ab1Extracting and iterating results work in a similar fashion to other integrations.
The only small difference is that to map results from Rows to tuples or case classes
we have to use the to method instead of the as method, since the latter is already
taken by ZIO.
We can use nextOption and oneOption to take a look at the first result on a
result set. The former allows for a simpler iteration, although inconvenient.
If the result set is known to be small, the to method can be used to convert it
to a Scala collection.
import net.nmoncho.helenus.api.RowMapper
case class Hotel(name: String, country: String, capacity: Int, occupancy: Int)
implicit val hotelRowMapper: RowMapper[Hotel] = RowMapper[Hotel] // Derive RowMapper
val mHotelsByName: ZScalaPreparedStatement1[String, Hotel] =
"SELECT * FROM hotels WHERE name = ?".toZCQL.prepare[String].to[Hotel]
val mResult: ZIO[ZCqlSession, CassandraException, List[Hotel]] =
mHotelsByName.execute("Rotterdam Marriott").to(List)In the case where queries are executed as Streams, we can use the streamValidated
method to make sure the entire Chunk can be mapped properly:
val hotelsStreamsValidated = "SELECT * FROM hotels".toZCQL.prepareUnit.to[Hotel].streamValidated()
// hotelsStreamsValidated: ZCqlStream[Hotel] = zio.stream.ZStream@4a779baa