Kotlin coroutines with arrow-fx

With the recent release of Arrow 0.10.X we've taken the last quality jump before 1.0. That final release is now looming on the horizon, promising quality of life changes through compiler extensions the same way Jetpack Components promise.

Sitting in the slack channels and the gitter most days, we get a handful of questions every hour. Months ago they were related to usage of Either, Option and other datatypes. Now that people are more familiar with them the questions have shifted more towards how to structure programs in a more functional way.

The purpose of this article is to summarize the approaches we've been promoting on those channels, and giving a tour of arrow-fx usage and APIs.

Fx: The concurrency library

Arrow-fx is the name of the concurrency/effects library by the Arrow team. It stands as a compatible alternative to kotlinx.coroutines by Jetbrains. It features most of the core constructs necessary for sequencing, parallelism, cancellation, threading, resource management, async error handling, plus clock and atomic operations.

The first thing to understand is that threaded operations are opt-in. Coroutines, via suspend functions, already give you a level of concurrency desirable for most business logic. It's a core construct of the Kotlin language.

suspend fun Api.getUser(id: UserId): User =
  query("SELECT * FROM users WHERE id = $id").parseUser()

interface Api {
  suspend fun query(q: String): Json
}

That simple. There is no need for any libraries for suspend functions to call each other. If you're limiting yourself to suspend functions you can achieve a level of concurrency similar to other languages without real threading like JavaScript of PHP. A scheduler will make sure that I/O-bound operations are awaited at the lowest possible layer.

Now, the first step to introduce Arrow is with arrow-core (not arrow-fx yet!), which helps modeling error states in a generic way. Our suspend functions can now be explicit about errors, and the Either Api will give us a handful of functions to work with them.

suspend fun Api.getUser(id: UserId): Either<ApiError, User> =
  query("SELECT * FROM users WHERE id = $id")
    .let {
      maybeParseUser(it)
        .mapLeft { ApiError("Failed to parse") }
    }
    

fun Json.maybeParseUser(): Either<JsonException, User>

The result of maybeParseUser propagates one level up, where we can mapLeft the error to something more meaningful on the domain. We can also compose Eithers if query returned one too using flatMap. This will prevent maybeParseUser to be called if the query fails.

interface Api {
  suspend fun query(q: String): Either<ApiError, Json>
}

suspend fun Api.getUser(id: UserId): Either<ApiError, User> =
  query("SELECT * FROM users WHERE id = $id")
    .flatMap { 
      maybeParseUser(it)
        .mapLeft { ApiError("Failed to parse") }
    }

We now have two basic tools: concurrency and explicit error handling. With these we can write all kind of programs possible in other ecosystems such as NodeJS, except we're using Kotlin ;)

A touch of syntax

Even at arrow-core, before we get into the library, we have some syntactic sugar to make the code more visually aligned with imperative code.

This sugar is called an fx block. It's similar to a DSL in that it adds several functions that aren't accessible at top level, and it has some similarities to coroutines in that you're short-circuiting execution if one of the steps fails.

The fx block is available as an extension on the companion of most datatypes in arrow-core. To run/execute/unwrap a datatype locally to the function we use the extension function .bind() that is only available within the fx block.

suspend fun Api.getUser(id: UserId): Either<ApiError, User> = fx {
  val result = api.query("SELECT * FROM users WHERE id = $id").bind()
  maybeParseUser(result)
    .mapLeft { ApiError("Failed to parse") }.bind()
}

Some nice constructor and extensions functions such as tupled(), flatTap(), tailrecM() are available inside fx for you to experiment.

Soon you will find that you cannot call suspend functions inside the functions declared for Either such as the ones mentioned above, and other fan favorites like map() and handleErrorWith(). For that you need a concurrency library!

Introducing threaded concurrency

Every program has one entry point in a main function. If you're using a framework like android or ktor this entry point has been taken over, and you're left to implement "plugins" that'll be called from the main execution. Each of these plugins now becomes an entry point for your logic, such as Activity in Android or a Controller in a backend service.

Each entry point is responsible for executing the suspend function that starts your business logic. The Kotlin standard library does not provide a way fo running suspend functions from outside a suspend function, so it's up to libraries (or DIY mavericks) to implement the mechanism to start and await for that initial suspend.

Depending on your framework, entry points may support suspend functions. If they do behind the scenes they're using a library, either community ones like arrow-fx, Reaktive and KIO, or the one developed by Jetbrains, kotlinx.coroutines.

Let's see one complete main in arrow-fx.

fun main() {
  myIOProgram().unsafeRunSync()
}

Short, isn't it? And if the framework supports suspend functions you have a different call.

suspend fun frameworkMain() {
  myIOProgram().suspended()
}

Because being able to execute suspend functions isn't the only feature by these libraries, each implements a series of constructs that enable parallelism, async error handling, and cancellation, and are only compatible with its own runner methods. Worry not, for you can write adapters between them!

In this context, IO is a class that's at the center of the arrow-fx concurrency library. It's the one that defines the runner and implements the constructs. It's also enhanced by fx blocks.

Primitives: wrap and execute

IO is the return type for non-suspend functions that require some advanced features. The most useful constructor is IO { }, which allows suspend function calls inside.

IO { 
  MyApi().queryUser("123")
}

Same as Either, IO supports fx blocks with even more features. You can call the main IO constructor inside fx as effect(), and run/execute/unwrap it locally using bind().

IO.fx { 
  val userRequest: IO<User> = effect { 
    MyApi().queryUser("123")
  }
  val user: User = userRequest.bind()
  effect { println("Seen $user") }.bind()
}

As IO is always lazily executed/cold you can declare operations before they're run, completely separating both behaviors.

IO.fx { 
  val logEnd: IO<Unit> = effect { 
    println("Operation complete!")
  }

  val city = effect { 
    askGeoLoc(51.5079, 0.0877)
  }.bind()
  effect { println("Welcome to ${city.name}") }.bind()

  logEnd.bind()
}
// Welcome to London
// Operation complete!

Primitives: exception handling

The error handling model for IO is consistent and automatic, both for synchronous and asynchronous operations. There are multiple functions to recover and retry IO operations, of which the main ones are handleError and handleErrorWith.

In this example we'll capture exceptions in the Api call and convert them to a domain error, and log if there are any uncaught exceptions at all in any other level.

IO.fx { 
  val userRequest : IO<Profile> =
    effect { MyApi().queryUser("123").makeProfile() }
  val profile: Profile = userRequest
    .handleError { EmptyProfile() }
    .bind()
  effect { println("Seen $profile") }.bind()
}.handleErrorWith {
  IO { println("Seen unexpected error: $it") }
}

I wish there was more to write about it, as this is all there is. Each layer is responsible for its errors, and the handler can be installed at any level.

Primitives: schedulers and fibers

The threading model in arrow-fx is based off RxJava and Scala libraries. Arrow uses lightweight threads, called fibers, that don't incur hard performance penalties as systems threads do.

When you jump to a fiber the rest of the sequence continues on it. If you'd like to jump back you have to explicitly specify which fiber you're going back to.

This makes the code able to be read sequentially, and the last fiber jump is where execution will happen.

The library provides a typeclass Dispatchers for dispatchers/executors that's parametrized to the datatype, i.e. IO.dispatchers().

val background = IO.dispatchers().io()
val main = IO.dispatchers().default()

IO(background) { requestUser() }
  .continueOn(main)
  .flatMap {
    effect { displayInUI(it) }
  }

And fx blocks come with its own version.

IO.fx {
  continueOn(background)
  val user = requestUser().bind()
  continueOn(main)
  effect { displayInUI(user) }.bind()
}

You can make a Dispatcher out of a Java's Executor and Rx/Reactor Scheduler, or even use the ones in kotlinx.coroutines directly!

Primitives: parallelism

We have one function that specifies the fiber where a series of IOs will be run, and a mapping operation to apply once they complete. As this is similar to map in parallel, we called it parMapN.

IO.parMapN(
  IO.dispatchers().default(),
  oneIO,
  otherIO
) { one, other ->
  one to other
}

If either of the parameters uses its own fiber they'll be run on them, so local reasoning always trumps the caller.

Primitives: races and cancellation

This is the Android developer's favorite feature. To prevent leaking the UI it is a requirement to be able to stop any fibers that capture callbacks to the UI components such as Activity or Fragment.

Arrow provides one key concept for cancellation, called racing. When you race two IOs, the first one to complete cancels the second. You can start a race in any fiber to prevent blocking the current one.

IO.fx {
  val local = effect { localUserRequest("123") }
  val remote = effect { remoteUserRequest("123") }

 val winner = IO.dispatchers().default().raceN(local, remote).bind()
 val user = winner.fold({
  effect { println("Local request: $it") }
 }, {
  effect { println("Remote request: $it") }
 }).bind()
}

When working with a lifecycle you can race against it to force cancellation.

val program = IO { businessLogic() }
val lifecycle = IO.delay(5)

fun start() {
  IO.dispatchers().default()
    .raceN(program, lifecycle)
    .unsafeRunAsync { }
}

We can build lifecycles for imperative frameworks using some tools built on top of IO, such as Promise, that we'll describe on a later section.

As per 0.10.X, IO allows cancellation at most operator boundaries. That means cancellation is checked when any operation such as map, flatMap, continueOn, handleError is run. It does not check cancellation when suspend functions are calling each other, although that feature will be added on a future release. So, be mindful when using suspended() because cancellation won't propagate even when called from IO { } or effect { }.

Primitives: resource management

Cancellation is a mechanical operation, and in FP we're all about descriptive Apis. One advanced use case for cancellation and error handling is safe resource acquisition.

Whether it's a socket, a database, a file, or some UI widgets, you want to run some business logic to clean them up after the operation has either completed, failed, or been cancelled.

In these cases patterns like try-with-resources won't work due to the asynchronous nature of error handling. Same as you cannot try/catch inside a callback, you cannot easily capture exceptions that span across multiple fibers, or trigger a cleanup block when cancellation happens.

The name of this feature is bracket, as they resemble self-cleanup blocks in languages like C++. To work with bracket you have to specify what resource you're adquiring, how you're going to clean it up, and what you will do with it.

IO { openSocket() }
  .bracket(
    release = { socket ->
      IO { socket.close() }
    },
    use = { socket ->
      socket.sendMessage(makeUserRequestMessage("123"))
    }
  )

If you'd like more fine-grained control about the reason of the termination you can use bracketCase, which has a sealed class with more granular information.

IO { openSocket() }
  .bracketCase(
    release = { (socket, exitCase) ->
      IO { socket.close() }
        .flatMap {
          when (exitCase) {
            is ExitCase.Completed -> IO.unit
            is ExitCase.Canceled -> IO { println("cancelled!") }
            is ExitCase.Error -> retry(exitCase.e)
        }
    },
    use = { it.sendMessage(makeUserRequestMessage("123")) }
  )

Primitives: async wrapping

A few sections ago we had described how to wrap any suspend and synchronous functions call. We needed to learn about cancellation before digging into how to wrap asynchronous calls.

Any library or Api with support for callbacks follows the same shape or pattern, which can be easily encapsulated in a call to the cancelable constructor. This constructor gives you a callback to work with, and requires returning the operation that'll be run on cancellation.

IO.cancelable { callback ->
  val subscription = myRx.subscribe({ onNext ->
    callback(onNext.right())
  }, { onError ->
    callback(onNext.left())
  })
  IO { subscription.cancel() }
}
IO.cancelable { callback ->
  coroutineScope.launch {
    try {
      callback(mySuspendCode().right())
    } catch (t: Throwable) {
      callback(t.left())
    }
  }
  IO { coroutineScope.coroutineContext[Job]?.cancel() }
}

Remember that IO is always executed lazily, so wrapping that call won't immediately cancel the operation.

If you have a long running imperative operation you can imagine using a cancellation flag that gets switched on the IO block.

If you don't need support for cancellation, the async constructor doesn't require returning an IO.

IO.async { callback ->
  myService.enqueue(object: Callback {
    override fun onSuccess(r:es Result) =
      callback(res.right())

    override fun onError(err: Exception) =
      callback(err.left())
  }
}

And with that we've covered most of the Api surface to do threaded and synchronized concurrency. Let's take a look at the constructs that are already built in arrow-fx.

Tools written with IO

With the primitives we've gone through in the other sections, we can start building some higher-level constructs to solve userland problems. Each can be described in a phrase, and I'll leave it up to you to investigate their docs for specifics!

Promise is a single-use placeholder for a value that'll be filled at a later date in an imperative way. One example can be a change in lifecycle state in an imperative framework.

Timer is an abstraction to control timed operations, such as delaying them. This can be useful for debounced operations.

Ref is a purely functional atomic reference, a value you can update in a block without fear of concurrent access. It can be used for global mutable state.

MVar is a more refined approach to shared state, as it works as a rendevouz channel for a single value. Think of it as a queue of size 1 where no more values can be put until the previous ones are consumed. It can be used to communicate across fibers.

Queue is a broadcast channel. It comes in different flavors, such as bounded, sliding, dropping, and unbounded.

Semaphore is an access barrier across fibers that we hope you never need to use :D

A glimpse of the future: IO + Either + Reader

If you remember at the beginning of the article we talked about modelling errors explicitly in suspend functions. Today, IO<A> models all errors as Throwable, which is not very expressive.

The most common solution is to feel a bit of pain and work with IO<Either<Throwable, A>> via the function attempt(). This means wrapping and unwrapping values in Either, paying a runtime cost.

We've been working on a refactor of IO where the Either is embedded to avoid runtime costs, so you can have IO<Error, A>.

Another improvement we're looking into is also embedding a dependency injector inside IO, that'd look like a call fun ask(): Module inside fx blocks to retrieve one dependency that's provided when a IO<Module, Error, A> is run.

Starting now we're going to add more people to the taskforce to get it out of the door! I can promise it'll be completed before the 1.0 release :)

Program structure in fx: programs are composable and fractal!

You may have caught on this already. IO doesn't execute until bind(), unsafeRunSync() or suspended() is called on it. That means that creation is completely separated from running, what is called "lazy execution".

The main benefit of the separation between creation and running is that you can setup any inter-dependencies such as listeners and loggers before the operation starts, effectively preventing ordering and deadlock issues.

Furthermore, because all operations are lazy, we could create functions that focus on needs, rather than operational workings. The example is parMapN, where a single function encompasses forking the execution, awaiting for multiple results while accounting async exceptions and cancelation, and aggregate them together if both succeed, all of it completely transparent to you, the user.

So, your program is composed of pure functions that return a datatype like Either, I/O operations that are suspend and also return a datatype like Either, functions that require threaded features and error handling that return IO, and we learned how all of them can call each other!

If you were to design an entry point that is an aggregation of features with a cancellation scheme, you could make use of all of them together like this :D

val endPromise: Promise<ForIo, Unit> = 
  Promise.unsafeUncancelable(IO.async()).fix()

override fun start() {
  val program = IO.parMapN(
    IO.dispatchers().default(),
    featureOne(),
    featureTwo(),
    featureThree(),
    featureFour(),
    { a, b, c, d -> Unit }
  )
  val cancelAndHandleErrors = IO.raceN(program, endPromise.get())
    .handleErrorWith { IO { errorLogger(it) } }
  cancelAndHandleErrors.unsafeRunAsync { }
}

override fun end() {
  endPromise.complete(Unit).unsafeRunAsync { }
}

fun featureOne(): IO<Unit> = IO.fx {
  val user = effect { requestUser() }.bind()
  effect { printUser(user) }.bind()
}

fun featureTwo(): IO<Unit> = IO.tailRecM {
  IO.fx { 
    val result = listenToPort(it).bind()
    processResult(result).map(Either::right).bind()
  }.handleError { Either.left(Unit) }
}

fun featureThree(): IO<Unit> =
  IO { warmupCache() }

fun featureFour(): IO<Unit> =
  IO.parMapN(
    IO.dispatchers().io(),
    controllerOne(),
    controllerTwo(),
    controllerThree(),
  )

...

And with the help of fx blocks, you can make multi-threaded and concurrent code look almost imperative.

IO.fx {
  continueOn(background)
  val user = requestUser("123").bind()
  val userPage = parMapN(
    background,
    effect { requestUserProfile(user) },
    effect { requestUserFriends(user) },
    ::makeUserPage
  ).handleError { make404Page() }.bind()
  continueOn(main)
  effect { renderPage(user) }.bind()
}.handleErrorWith {
  IO(main) { renderPage(make500Page(it)) }
}

In this example if your lifecycle is split or you have multiple end points that are asynchronous, then unsafeRunAsync() starts the execution without awaiting a result in the current fiber.

If you have a single entry point that requires waiting for the program to complete, such a main function, then unsafeRunSync() and suspended() await non-blockingly in the current thread until the operation is complete. It should only be used once you've composed all your business logic from the use cases in a sequential or parallel way.

fun main() {
  IO.parMapN(
      IO.dispatchers().default(), 
      IO { requestConfiguration() }, 
      IO { requestUser() }, 
      ::toSerializedTuple
    ),
    .flatMap {
      IO { dumpToTerminal(it) }
    }
    .unsafeRunSync()
}

From there on, you can implement your preferred MV* architecture, and a combination of interfaces and top-level functions to structure your program. What arrow-fx provides is the glue that binds between the layers of a ports-and-adapters program structure.

A note on testing

One of the most frequent patterns when migrating from OOP to FP is to start replacing "behavior classes" with lambdas.

class Manager(val managees: List<Elements>) {
  fun visit(v: Visitor<Element>) = ...
}

We'll replace Manager + Visitor with the function traverse from arrow-core.

listOf(element0, element1).traverse {
  IO { println("Seen $it") }
}

As we've changed the Visitor class for a simple lambda instead of creating a mock visitor and a mock manager, we can pass an IO that does what we want, like counting elements.

IO.fx {
  val testRef = Ref(0).bind()

  val program = listOf(element0, element1).parTraverse {
    testRef.update { it + 1 }
  }.bind()

  val result = testRef.get().bind()
  IO { assert(result, 2) }.bind()
}

Or another example to check failure states using attempt().

IO.fx {
  val program = listOf(userOne, userTwo).traverse { user ->
    IO.fx {
      val result = effect { runProgram(user) }.bind()
      when (result.name) {
        "Paco" -> raiseError(IllegalStateException())
        else -> just(Unit)
      }.bind()
    }
  }

  val result: Either<Throwable, Unit> = program.attempt().bind()
  result.fold({ 
    IO { assertIs<IllegalStateException>(it) }
  }, { 
    IO.unit 
  }).bind()
}

Your tests are run using unsafeRunSync() and suspended. We don't need a complex mocking library to do effectful computations that yield test results, so product and test code are exactly the same!

Go out and check it!

We have built a solution that's useful to those with large, complex codebases that require careful attention to details. It should be familiar for those that come from Rx/Reactor, yet being fully compatible with suspend functions and code written in the style of kotlinx.coroutines.

If you find any behavior that's hard to understand or seems broken, do contact us on our public channels in #arrow at the Kotlinlang Slack and the Arrow Gitter.

For everyone, we hope this article about arrow-fx programs has been helpful.