-
Notifications
You must be signed in to change notification settings - Fork 1
Akka
Helenus integrates with Akka through Alpakka.
Note: We only provide integration against Scala 2.13, due to the Alpakka's latest Apache License release.
Include the library into you project definition:
libraryDependencies += "net.nmoncho" %% "helenus-akka" % "1.6.1"Include the library into you project definition:
libraryDependencies += "net.nmoncho" %% "helenus-akka-busl" % "1.6.1"The integration with Alpakka tries to be as seamless as possible. Marking a CassandraSession implicit is the only thing required to use Helenus:
implicit val system: ActorSystem = ActorSystem()
val sessionSettings = CassandraSessionSettings()
import system.dispatcher
implicit val cassandraSession: CassandraSession =
CassandraSessionRegistry.get(system).sessionFor(sessionSettings)If you are already using Alpakka, this is probably already setup.
Cassandra queries, just like in Alpakka,
are mapped to Akka Stream Sources. With Helenus we can leverage any defined
RowMapper to adapt a Source[Row] to a Source[A].
import net.nmoncho.helenus._
import net.nmoncho.helenus.api.RowMapper
import net.nmoncho.helenus.api.cql.Adapter
import net.nmoncho.helenus.akka._
case class Person(id: Int, name: String, city: String)
implicit val rowMapper: RowMapper[Person] = RowMapper[Person]
val peopleQuery = "SELECT * FROM akka_people WHERE id = ?".toCQLAsync
.prepare[Int].as[Person]
val helenusSource: Source[Person, NotUsed] = peopleQuery.asReadSource(1)The integration usage is inteded as follows:
- Transform your query with
.toCQLAsync. - Prepare the query with
.prepare, providing the query parameter types. - Define a query result, which uses an implicit
RowMapper. If this isn't provided,Rowwill be used as query result type. - Transform your query into a source with
asReadSource. This method follows the same principle asScalaPreparedStatement. Use this method as a factory forSources.
This last method isn't the only integration point with Alpakka.
CassandraSource can also be used with Helenus:
import _root_.akka.stream.alpakka.cassandra.scaladsl.CassandraSource
val alpakkaSource: Source[Person, NotUsed] =
CassandraSource.fromFuture(peopleQuery.map(query => query(1))).map(rowMapper.apply)We arrive at the same Source[Person, NotUsed]. Executing these queries
is performed as usual:
Await.result(helenusSource.runWith(Sink.head), 5.seconds)
// res4: Person = Person(id = 1, name = "John", city = "Rome")
Await.result(alpakkaSource.runWith(Sink.head), 5.seconds)
// res5: Person = Person(id = 1, name = "John", city = "Rome")We can use the same facilities when inserting, or updating, to Cassandra.
Unlike Alpakka, Helenus offers both Sink and Flow for this purpose.
implicit val personAdapter: Adapter[Person, (Int, String, String)] = Adapter[Person]
val sink: Sink[Person, Future[Done]] =
"INSERT INTO akka_people(id, name, city) VALUES (?, ?, ?)".toCQLAsync
.prepare[Int, String, String]
.from[Person]
.asWriteSink(CassandraWriteSettings.defaults)
val people: Source[Person, NotUsed] = Source(List(
Person(4, "Jane", "Amsterdam"),
Person(5, "Lisa", "Paris"),
Person(6, "Maria", "Madrid")
))
val insert = people.runWith(sink)We can then query these data as shown above:
Await.result(
peopleQuery.asReadSource(5).runWith(Sink.head),
5.seconds
)
// res7: Person = Person(id = 5, name = "Lisa", city = "Paris")