Understanding Akka streams

Table of Contents

Actor System

The ActorSystem is the foundational component in Akka. It manages the lifecycle of actors, including their creation, supervision, and communication. When working with Akka Streams, the ActorSystem plays a crucial role in setting up the runtime environment for stream materialization and execution.

Actors are lightweight, concurrent entities that encapsulate both state and behavior. They operate asynchronously, communicating exclusively through message passing. This design avoids shared mutable state, making Akka a powerful toolkit for building highly concurrent and distributed systems.

1implicit val system: ActorSystem = ActorSystem("StreamSystem")

Defining a Source

Akka Streams is built around three core components: Source, Flow, and Sink. A Source is a starting point that produces data with one output. A Sink represents the end of the stream and consumes data through one input. A Flow is an intermediate stage that transforms elements as they pass through, with one input and one output.

The Source companion object provides several ways to create data sources, such as:
Source.empty – creates an empty source
Source.single(element) – emits a single element
Source.future(fut) – emits elements from a Future once completed
Source.apply(iterable) – creates a source from the given Iterable

1val source: Source[String, NotUsed] = Source(
2  Seq("5,10", "15,15", "78,79", "12,12", "0,0", "456,456")
3)

In the example above, a Source is created from a sequence of strings, each representing two comma-separated numbers. The type Source[String, NotUsed] indicates that the source emits String elements and materializes to a value of type NotUsed.

NotUsed is Akka's way of indicating that the source does not produce any useful value upon materialization. In Akka Streams, materialization refers to the process of running the stream and obtaining a value, which can be used to monitor or control its execution.

For example, a stream may return a Future[Done] to indicate when it completes, or a Cancellable to allow manual termination. These materialized values provide visibility and control over the stream lifecycle.

Defining a Flow

A Flow represents a processing stage with one input and one output. You can create a Flow using the Flow.apply method and customize its behavior using a variety of operators. Some common ones include:
map() – transforms each element using a provided function
log() – logs each element as it flows through the stream
take() and takeWhile() – take a subset of elements based on conditions
filter() – allows only elements that satisfy a predicate to pass through

1val parse: Flow[String, (Int, Int), NotUsed] =
2  Flow[String]
3    .map { pair =>
4      val parts = pair.split(",")
5      (parts(0).toInt, parts(1).toInt)
6    }
7
8val compare: Flow[(Int, Int), Boolean, NotUsed] =
9  Flow[(Int, Int)]
10    .map { case (userAnswer, correctAnswer) => userAnswer == correctAnswer }

In the example above, the parse flow takes input elements of type String and splits each string by a comma to form a tuple of two Int values. It outputs elements of type (Int, Int) and has a materialized value of NotUsed. Note that this version does not include error handling — if a string is malformed or missing a comma, accessing parts(1) will cause an exception, leading to a stream failure.

The compare flow takes these integer pairs and evaluates whether the first and second values are equal. It emits Boolean values downstream. A Sink can then be used to process these Booleans, such as counting how many comparisons returned true.

Defining a Sink

Just like Source, the Sink object offers several methods to define stream consumers. Some commonly used ones include:
Sink.fold() – accumulates stream elements and returns a Future of the final result
Sink.head – retrieves the first element of the stream as a Future
Sink.headOption – like Sink.head, but returns None if the stream is empty
Sink.last and Sink.lastOption – get the last element of the stream
Sink.foreach – applies a side-effecting function to each element, without producing a result
Sink.seq – collects all elements into a Seq

1val sink: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) {
2  case ((correctCount, total), wasCorrect) =>
3    if (wasCorrect) (correctCount + 1, total + 1)
4    else (correctCount, total + 1)
5}

The example above defines a sink using Sink.fold(). It processes each incoming Boolean, incrementing the correctCount only if the element is true, while always incrementing the total. This mirrors the typical fold operation on collections, but in this case, it runs asynchronously and returns a Future.

1object Main extends App {
2  implicit val system: ActorSystem = ActorSystem("correctStream")
3
4  source
5    .via(parse)
6    .via(compare)
7    .runWith(sink)
8    .andThen {
9      case Failure(exception) => println(exception)
10      case Success((correct, total)) =>
11        println(s"$correct/$total correct answers")
12    }(system.dispatcher)
13    .onComplete(_ => system.terminate())(system.dispatcher)
14}

This full example ties together the stream components. The source is connected to the parse and compare flows, then run with the sink. The result is a Future[(Int, Int)], which is used to print the number of correct answers. After the stream completes, the ActorSystem is terminated.