Exploring Cats Effect
In two previous articles we explored how you can use ZIO to easily create programs in a type-safe manner:
In those articles we created a simple server which polls a REST endpoint at a fixed interval, and wrote the results in a database (MongoDB). The data was then exposed through an HTTP4S REST endpoint. In that approach we used the ZIO monad throughout the stack, which allowed us to use (in those examples only a small number) of specific ZIO features to make our lives easier. If you want to jump directly to the code you can find that here: https://github.com/josdirksen/tagless-playground
Another common approach for creating modules and services is by using the “Tagless Final” pattern. If you search around you’ll find numerous explanations about what this pattern does, but the main idea behind this pattern is:
- Defer choosing a IO implementation. So our services and traits are defined using
F[_]
as the type. - Dependencies for a certain function (or module) are defined as implicit parameters. E.g like
F[_] : ConfigDSL
, means that this function (or case class) requires a typeF[_]
for which aConfigDSL[F]
implementation is in the implicit scope.
There are a number of other features in this pattern, but the above captures most of this. There are a couple of libraries that adopt this pattern, where dependencies need to be provided through implicits, and many of those use the generic Cats Effect library to define fine grained effects that functions need.
In this article we’ll convert the setup from Exploring ZIO - Part II - ZStream and modules to one where we use Cats Effect together with (part of) the “Tagless Final” pattern.
Before we start a quick summary of what we’re going to build:
- Read a configuration file using PureConfig
- Use this configuration to create a HTTP4S server and HTTP4S client
- Create a stream using FS2 which is used to get the latest temperatures from a remote server.
- Store the results from the external client in MongoDB.
All the steps above will be defined using ‘Cats Effect’ and we’ll only provide a concrete IO monad implementation at the edge of the world in our main
function.
Loading configuration
The main difference with the ZIO approach is that we define the trait using a F[_]
case class Config(apiConfig: ApiConfig, temperatureConfig: TemperatureConfig, dbConfig: DBConfig)
case class ApiConfig(endpoint: String, port: Int)
case class TemperatureConfig(endpoint: String, apiKey: String, interval: FiniteDuration)
case class DBConfig(endpoint: String)
/**
* Basic DSL trait is type, constraint agnostic.
*/
trait ConfigDSL[F[_]] {
val config: F[Config]
}
object ConfigDSL {
def apply[F[_]](implicit F: ConfigDSL[F]): F.type = F
}
To use this config we’re going to define an interpreter for it. While we can have multiple interpreters, we should preferably only have one. A simple implementation using PureConfig is shown here. Note that in this implementation we still don’t decide on the actual IO monad we’re going to run the final program in. That’s something we decide at the latest possible time.
/**
* A pureconfig based interpreter. Note that we use the kind projector, so we can
* correctly specify the MonadError constraint. If we don't use this we have to
* either add it as a desugared implicit, or create a custom type.
*/
case class LiveConfigInterpreter[F[_]: MonadError[*[_], Throwable]]() extends ConfigDSL[F] {
override val config: F[Config] = {
/**
* We can wrap it in an either, if we do that then we don't
* really need any other typeclasses, we can also say that
* we expect the result to be mapped to a monadError. That
* means that the F[_] used, should satisfy the MonadError
* constraint.
*/
val c = ConfigSource.default
.load[Config]
.left
.map(pureconfig.error.ConfigReaderException.apply)
// summon the monadError instance and convert to an either
MonadError[F, Throwable].fromEither(c)
}
}
In the code above we want to communicate to the users that loading this config can fail. So we expect that the monad the user provides supports at least the functionality defined in MonadError
. This could for instance be an Either
or a Try
. We define this dependency as a constraint on F[_]
like this: F[_]: MonadError[*[_], Throwable]
. This simpler syntax is provided by the kind project (https://github.com/typelevel/kind-projector), which allows for easier syntax. This is enabled in the build.sbt
:
ThisBuild / scalaVersion := "2.13.4"
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / organization := "org.smartjava"
ThisBuild / organizationName := "smartjava"
val CatsVersion = "3.1.1"
val Http4sVersion = "1.0.0-M23"
lazy val hello = (project in file("."))
.settings(
name := "Exploring Tagless",
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % CatsVersion,
"com.github.pureconfig" %% "pureconfig" % "0.15.0",
"org.http4s" %% "http4s-blaze-server" % Http4sVersion,
"org.http4s" %% "http4s-dsl" % Http4sVersion,
"org.http4s" %% "http4s-blaze-client" % Http4sVersion,
"org.http4s" %% "http4s-circe" % Http4sVersion,
"ch.qos.logback" % "logback-core" % "1.2.3",
"org.slf4j" % "slf4j-api" % "1.7.30",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"co.fs2" %% "fs2-core" % "3.0.4",
"co.fs2" %% "fs2-io" % "3.0.4",
"co.fs2" %% "fs2-reactive-streams" % "3.0.4",
// JSON Mapping
"io.circe" %% "circe-generic" % "0.12.3",
"io.circe" %% "circe-literal" % "0.12.3",
"org.mongodb.scala" %% "mongo-scala-driver" % "4.2.3"
),
scalacOptions += "-Ymacro-annotations"
)
addCompilerPlugin("org.typelevel" % "kind-projector" % "0.11.3" cross CrossVersion.full)
In the final main we’ll use this configuration something like this:
import cats.effect.IO
implicit val configService = LiveConfigInterpreter[IO]()
With the configuration out of the way we can look at configuring our HTTP4S client and server
Setting up HTTP4s
We’ll start by looking at the client.
object http4sClient {
/**
* Simple DSL to get an instance of a client
*/
trait Http4sClientDSL[F[_]] {
val client: Resource[F, Client[F]]
}
object Http4sClientDSL {
def apply[F[_]](implicit F: Http4sClientDSL[F]): F.type = F
}
/**
* Live version
*/
case class LiveHttp4sClientInterpreter[F[_]: Async]()(implicit ec: ExecutionContext) extends Http4sClientDSL[F] {
override val client: Resource[F, Client[F]] = BlazeClientBuilder[F](ec).resource
}
}
If you look at the trait, we’re going to return a Resource[F, Client[F]]
. Which basically allows us to get access to a Client[F]
by calling use
on the resource. Since HTTP4s already is based on Cats Effect, we only have to call BlazeClientBuilder[F](ec).resource
and we’re done… well almost. If you look at the signature:
def apply[F[_]: Async](executionContext: ExecutionContext): BlazeClientBuilder[F] =
This function requires an Async[F]
to be in scope. So that’s why we also added it to the case class (which serves as our module / service). You can also see that this function requires an explicit execution context. It seems to be rather arbitrary when the ExectionContext
is passed explicitly or implicitly, so in our module we expect this to be passed in implicitly.
The Http4sServer
setup isn’t that different:
object http4sServer {
/**
* This trait provides access to a Http4sServer dependency. It is
* wrapped in the F monad, so we can signal errors during setup.
*
* Wondering whether these should be called DSL, since these are
* just the low level dependencies.
*/
trait Http4sServerDSL[F[_]] {
val server: Resource[F, Server]
}
/**
* Helper object to access the instance of the DSL that is in scope
*/
object Http4ServerDSL {
def apply[F[_]](implicit F: Http4sServerDSL[F]): F.type = F
}
case class routes[F[_]: Monad]() {
private val dsl = Http4sDsl[F]
import dsl._
val routes: HttpRoutes[F] = HttpRoutes
.of[F] {
case GET -> Root / "users" / IntVar(id) => {
Created("A value here")
}
case POST -> Root / "users" => {
Created("And another one here")
}
}
}
/**
* Live instance of the HttpsServer service.
*
* @param ec
*/
case class LiveHttp4sServerInterpreter[
F[_]: MonadCancel[*[_], Throwable]: Async: ConfigDSL
]()(implicit ec: ExecutionContext)
extends Http4sServerDSL[F] {
override val server: Resource[F, Server] =
for {
config <- Resource.eval(ConfigDSL[F].config)
server <- BlazeServerBuilder[F](ec)
.bindHttp(config.apiConfig.port, config.apiConfig.endpoint)
.withHttpApp(routes[F]().routes.orNotFound)
.resource
} yield (
server
)
}
}
As you can see in the code above, the process is the same. We pass in a couple of effects from Cats Effect (MonadCancel
and Async
) since those are required by BlazeServerBuilder[F]
. Additionally we also add ConfigDSL
to the list of constraints. This means that to create this module, we need to make sure a ConfigDSL[F]
is in scope as well. If we want to instantiate this module we’d need to do something like this:
implicit val ec = ExecutionContext.global
implicit val configService = LiveConfigInterpreter[IO]()
// implicit since it'll be used a dependency later on.
implicit val http4sClient = services.http4sClient.LiveHttp4sClientInterpreter[IO]()
val http4s = LiveHttp4sServerInterpreter[IO]()
That way we can easily get access to the Http4S server.
Configuring MongoDB for data storage
Now let’s look at the bottom layer of the application where we want to store data in a mongo database. First we’ll look at how to acquire the mongo client:
object mongo {
trait MongoDBConnectionDSL[F[_]] {
val mongoClient: Resource[F, MongoClient]
}
object MongoDBConnectionDSL {
def apply[F[_]](implicit F: MongoDBConnectionDSL[F]): F.type = F
}
case class LiveMongoDBConnectionInterpreter[F[_]: Sync: ConfigDSL]() extends MongoDBConnectionDSL[F] {
val ME = MonadError[F, Throwable]
val Config = ConfigDSL[F]
override val mongoClient = Resource.make(acquireConnection)(releaseConnection)
/**
* Acquire a connection
*/
private def acquireConnection()(implicit config: ConfigDSL[F]) =
for {
c <- config.config
r <- Sync[F].blocking(MongoClient(c.dbConfig.endpoint))
} yield {
r
}
/**
* Release the mongo client, wrap in sync since it's effectful
*/
private def releaseConnection(mongoClient: MongoClient) = Sync[F].blocking(mongoClient.close)
}
}
Here you can again see a very simple trait explaining what this DSL offers (not much in this simple example), and an implementation (still agnostic on F[_]
). The only implementation here is that we require a Sync[F]
implementation, which we use to call the effectfull instantiation of the MongoClient
. Since we do this as a resource we have to provide a function to acquire the resource acquireConnection
and a function to release the resource again releaseConnection
. Using Sync[F]
will defer the code within the blocking
function until the effect is evaluated.
Now that we’ve got a way to access the mongo client, we can also implement storing and retrieving our domain entity (a Temperature
object). The code below is almost exactly the same as it was in the ZIO example with ZStream
, so we won’t explain the details:
object storage {
trait TemperatureStorageDSL[F[_]] {
def insert(temperature: Temperature): F[Unit]
def getAll(): F[List[Temperature]]
}
object TemperatureStorageDSL {
def apply[F[_]](implicit F: TemperatureStorageDSL[F]): F.type = F
}
case class TemperatureStorageInterpreter[
F[_]: MonadCancel[*[_], Throwable]: Async: ConfigDSL: MongoDBConnectionDSL
]() extends TemperatureStorageDSL[F] {
val temperatureCodecProvider = Macros.createCodecProvider[Temperature]()
val codecRegistry = fromRegistries(fromProviders(temperatureCodecProvider), DEFAULT_CODEC_REGISTRY)
val ME = MonadError[F, Throwable]
override def insert(temperature: Temperature): F[Unit] =
withCollection[Unit, Temperature] { c =>
c.insertOne(temperature)
.toStream[F]
.compile
.toList
.map(_.length)
// naively assume that when we get 1 insert result everything is fine
.flatMap {
case 1 => ME.pure()
case _ => ME.raiseError[Unit](new IllegalArgumentException("Expected result from mongodb"))
}
}
override def getAll(): F[List[Temperature]] = {
withCollection[List[Temperature], Temperature] {
_.find()
.toStream[F]
.compile
.toList
}
}
/**
* Get the database and collection to which to store.
*
* @param f function to call within the context of this collection
* @return result of wrapped
*/
private def withCollection[A, T: ClassTag](
f: MongoCollection[T] => F[A]
): F[A] = MongoDBConnectionDSL[F].mongoClient.use[A] { mongoClient =>
val collection =
mongoClient
.getDatabase("sampleservice")
.withCodecRegistry(codecRegistry)
.getCollection[T]("temperatures")
f(collection)
}
}
}
The only interesting part here are the dependencies we want for this module:
case class TemperatureStorageInterpreter[
F[_]: MonadCancel[*[_], Throwable]: Async: ConfigDSL: MongoDBConnectionDSL
]() extends TemperatureStorageDSL[F] {
So what we need are a couple of classes from Cats Effect, we require an instance of the ConfigDSL
and we require an instance of the MongoDBConnectionDSL
as well.
And of course since the client is a resource we have to use use
to access the client in a safe manner:
private def withCollection[A, T: ClassTag](
f: MongoCollection[T] => F[A]
): F[A] = MongoDBConnectionDSL[F].mongoClient.use[A] { mongoClient =>
val collection =
mongoClient
.getDatabase("sampleservice")
.withCodecRegistry(codecRegistry)
.getCollection[T]("temperatures")
f(collection)
}
When this effect is evaluated, the acquire
function we defined for retrieving a MongoClient
will be called, and at the end of the function the release
function we defined is called.
And the final part that is interesting is the integration we have here with FS2. The mongo client we use is based on the Reactive Streams implementation. Luckily we can just integrate with that directly with FS2:
c.insertOne(temperature)
.toStream[F]
That will result in a monad agnostic stream (def toStream[F[_]: Async]: Stream[F, A]
) and the only thing we need to do is make sure the monad we provide has the Async
constraint.
Create a stream of temperature updates
The final thing we need to do is create a client that polls an http endpoint, and returns a stream of temperature updates. For that we just define a new module where we expose a single value which is a stream of temperatures. The type is defined like this: F[Stream[F, Temperature]]
. This means that retrieving the stream itself is also effectful (since we use information from the configuration) and when we evaluate the stream we also do this within the context of the supplied F[_]
. So our stream itself uses functions which aren’t pure (e.g simply calculations), in this case we make a call using the passed in Http4sClientDSL
:
object tempClient {
trait TempClientDSL[F[_]] {
val temperatureStream: F[Stream[F, Temperature]]
}
case class LiveTempClientInterpreter[F[_]: MonadError[*[_], Throwable]: Temporal: ConfigDSL: Http4sClientDSL]()
extends TempClientDSL[F] {
import org.http4s.circe._
import io.circe.generic.auto._
import TempClientLive.OpenWeather._
implicit val userDecoder = jsonOf[F, OWResult]
/**
* Get a stream in an effectful way.
*/
override val temperatureStream: F[Stream[F, Temperature]] = for {
config <- ConfigDSL[F].config
stream = (Stream(1) ++ Stream.awakeEvery(config.temperatureConfig.interval)).evalMap { _ =>
makeTemperatureCall(config.temperatureConfig.endpoint + config.temperatureConfig.apiKey)
}
} yield {
stream
}
/**
* Make an actual call to a rest endpoint
*/
private def makeTemperatureCall(url: String)(implicit clientDSL: Http4sClientDSL[F]): F[Temperature] =
clientDSL.client.use {
_.expect[TempClientLive.OpenWeather.OWResult](url).map(t => Temperature(t.dt, t.main.temp))
}
}
object TempClientLive {
object OpenWeather {
// the openweather model
case class OWResult(coord: OWCoord, main: OWMain, visibility: Integer, wind: OWWind, dt: Long)
case class OWCoord(lat: Double, lon: Double)
case class OWMain(
temp: Double,
feels_like: Double,
temp_min: Double,
temp_max: Double,
pressure: Int,
humidity: Int
)
case class OWWind(speed: Double, deg: Long, gust: Double)
}
}
}
When looking at this implementation you can see that in the case class definition we define a number of dependencies, just like we did for the other modules. The interesting part here is where we actually make and use the FS2 stream:
/**
* Get a stream in an effectful way.
*/
override val temperatureStream: F[Stream[F, Temperature]] = for {
config <- ConfigDSL[F].config
stream = (Stream(1) ++ Stream.awakeEvery(config.temperatureConfig.interval)).evalMap { _ =>
makeTemperatureCall(config.temperatureConfig.endpoint + config.temperatureConfig.apiKey)
}
} yield {
stream
}
/**
* Make an actual call to a rest endpoint
*/
private def makeTemperatureCall(url: String)(implicit clientDSL: Http4sClientDSL[F]): F[Temperature] =
clientDSL.client.use {
_.expect[TempClientLive.OpenWeather.OWResult](url).map(t => Temperature(t.dt, t.main.temp))
}
We concatenate two stream together, this means that as soon as we evaluate this effect our stream immediately emits a value, and then it’ll emit values at the specified interval. Whenever a value is emitted, we run an effectfull function using evalMap
which uses the Http4sClientDSL[F]
to make the actual call.
The interesting thing is, that all this is still done without knowing anything (well almost anything) about the F[_]
monad.
Glue everything together
All that is left to do is tie everything together and select the IO monad that we want to use:
object Runner extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
// get all the services, and add them to the implicit scope
implicit val ec = ExecutionContext.global
// the concrete implementations mapped to the IO monad
implicit val configService = LiveConfigInterpreter[IO]()
implicit val http4sClient = services.http4sClient.LiveHttp4sClientInterpreter[IO]()
implicit val MongoDBConnectionDSL = mongo.LiveMongoDBConnectionInterpreter[IO]()
val http4s = LiveHttp4sServerInterpreter[IO]()
implicit val tempClient = services.tempClient.LiveTempClientInterpreter[IO]()
val tempStorage = storage.TemperatureStorageInterpreter[IO]()
val program = for {
// consume the stream
stream <- tempClient.temperatureStream
_ <- stream
.evalMap(tempStorage.insert)
.compile
.drain
.start
// start the server and keep running
server <- http4s.server.use(_ => IO.never).as(ExitCode.Success)
} yield {
server
}
// return the program to execute
program
}
}
The program above uses the Cats IO monad as the monad to run everything in. What you see here is that the dependencies that are injected are defined as implicit val
so they are automatically applied when we create the other modules. The program itself is just a simple for-comprehension which:
- Gets the stream of temperatures, and drains them. Whenever we receive a new temperature we store it in the database.
- We use
.start
to start the stream, since that allows us to start this effect in the background:
* Start execution of the source suspended in the `IO` context.
*
* This can be used for non-deterministic / concurrent execution.
- Then we start the webserver as well, which we’ll just keep running forever.
The result is a val program: IO[ExitCode]
, which is also the expected type of the def run
from IOApp
that we’re using. When we now run this application, it’ll start making HTTP requests and stores the result.
We mentioned a couple of times that you can plug in different IO implementations. For instance if you want to run this program using ZIO, the code would look something like this:
import tagless.services.LiveConfigInterpreter
import tagless.services.http4sServer.LiveHttp4sServerInterpreter
import scala.concurrent.ExecutionContext
import tagless.services.repo.storage
import tagless.services.db.mongo
import zio._
import zio.interop.catz.implicits._
import zio.interop.catz._
/**
* Simple runner which uses the ZIO monad to run everything
*/
object RunnerZIO extends zio.App {
def run(args: List[String]): URIO[ZEnv, ExitCode] = {
type Task[A] = ZIO[Any, Throwable, A]
// get all the services, and add them to the implicit scope
implicit val ec = ExecutionContext.global
// the concrete implementations mapped to the IO monad
implicit val configService = LiveConfigInterpreter[Task]()
implicit val http4sClient = services.http4sClient.LiveHttp4sClientInterpreter[Task]()
implicit val MongoDBConnectionDSL = mongo.LiveMongoDBConnectionInterpreter[Task]()
val http4s = LiveHttp4sServerInterpreter[Task]()
implicit val tempClient = services.tempClient.LiveTempClientInterpreter[Task]()
val tempStorage = storage.TemperatureStorageInterpreter[Task]()
// define the tasks to run in parallel
val streamTask = tempClient.temperatureStream.flatMap { _.evalMap(tempStorage.insert).compile.drain }
val http4sServerTask = http4s.server.toManagedZIO.useForever
// run the tasks in parallel
Task.collectAllPar(Set(streamTask, http4sServerTask)).exitCode
}
}
Note that there are different ways of running tasks in parallel. I just used collectAllPar
here.
conclusion
Converting the ZIO based approach from the previous articles to a cats-effect approach where we inject dependend modules using implicit constraints was fairly straightforward. The effects from cats-effect provide sufficient functionality for the implementation. This is however a fairly simple application, so I wonder a bit what is to gain from having the F[_]
abstraction instead of choosing an IO implementation beforehand and sticking with that one.
The way dependencies are injected feels rather nice with the constraint approach, it makes it very clear what is needed where and at least gives an idea what the implementation are going to do, instead of just providing a big IO and giving the modules access to everything. I do miss the layer abstraction from ZIO a bit since that felt like quite a nice abstraction on top to manage dependencies.
In the end, however, I think Cats Effect and the Cats IO monad, ZIO, Monix and probably some other stuff I’ve forgotten show how great the FP and effects support is in Scala. When building libraries I really see the big advantage of not restricting yourself to a single IO implementation. For developing business logic, or functionality that doesn’t need to be shared, I wonder whether abstracting away the IO implementation is that useful, since it doesn’t allow direct access to the great stuff from the Cats IO monad or the ZIO implementations.
Nevertheless, been a great experience implementing this, and I keep finding it amazing how far Scala has come from just a couple of years ago!