Building akka actor model using c++

Table of Contents

What is the actor model?

The actor model is a concurrency paradigm where actors are tiny, independent units of computation. Actors never share mutable state. Instead, they communicate exclusively by sending messages to one another. Each actor owns a mailbox (an inbound message queue) and processes messages one at a time, which avoids data races without heavy locking.

In an Akka-style system, actors are lightweight (you can have thousands to millions), scheduled by a dispatcher (typically a thread pool) that pulls work from actor mailboxes and runs actor message handlers.

Building the mailbox

The mailbox is the backbone of the actor system—it's where all messages sent to an actor are queued before being processed. In our C++ implementation, we design the mailbox as a thread-safe queue that supports concurrent producers (threads pushing messages) and consumers (actors popping messages).

To achieve thread safety, we wrap a std::queue with a std::mutex for locking and use a std::condition_variable to efficiently block consumers until new messages arrive. This ensures minimal CPU usage while still providing low-latency message delivery.

Mailbox operations

push(T elem):

Acquires the lock with std::lock_guard, pushes the element, and signals the condition variable so waiting threads can wake up.

try_pop(T &elem):

Non-blocking pop. If the queue is empty, returns false. Otherwise, moves the front element into elem and returns true.

pop():

Blocking pop. Waits on the condition variable until the queue has an element, then removes and returns it.

1// mailbox.hpp
2#pragma once
3
4#include <mutex>
5#include <queue>
6#include <condition_variable>
7
8template <typename T>
9class MailBox
10{
11    std::queue<T> queue;
12    std::mutex mutex;
13    std::condition_variable cv;
14
15public:
16    // Push a new element and notify one waiting thread
17    void push(T elem)
18    {
19        {
20            std::lock_guard<std::mutex> lock(mutex);
21            queue.push(std::move(elem));
22        }
23        cv.notify_one();
24    }
25
26    // Try to pop without blocking
27    bool try_pop(T &elem)
28    {
29        std::lock_guard<std::mutex> lock(mutex);
30        if (queue.empty())
31        {
32            return false;
33        }
34        elem = std::move(queue.front());
35        queue.pop();
36        return true;
37    }
38
39    // Blocking pop that waits until an element is available
40    T pop()
41    {
42        std::unique_lock<std::mutex> lock(mutex);
43        cv.wait(lock, [&] { return !queue.empty(); });
44        T elem = std::move(queue.front());
45        queue.pop();
46        return elem;
47    }
48};

With this mailbox implementation, we now have a safe and efficient way to deliver messages between actors. In the next step, we will integrate this mailbox with the Actor abstract class, allowing actors to send and receive messages asynchronously.

Defining our abstract Actor class

The Actor is the fundamental building block of our system. Each actor is responsible for defining how it reacts when it receives a message. To capture this idea in C++, we start by defining an abstract base class that contains a pure virtual method called receive. This makes the Actor class uninstantiable on its own, and instead serves as a contract for all derived classes. Any class that inherits from Actor must provide its own implementation of receive, which allows different actors to express their own unique behavior.

We also declare a virtual destructor to ensure proper cleanup when an actor is deleted through a pointer to the base class. This prevents undefined behavior and guarantees that destructors of derived actor classes are called correctly.

1#pragma once
2#include <string>
3
4class Actor
5{
6public:
7    // Pure virtual function: must be implemented by derived actors
8    virtual void receive(std::string message) = 0;
9
10    // Virtual destructor: ensures correct cleanup when deleting via Actor*
11    virtual ~Actor() = default;
12};
13
14// This abstract Actor defines a contract for all actors in the system.
15// Each concrete actor must override 'receive' to handle messages in its own way.

With the base Actor class defined, the next step is to implement the Dispatcher, which will be responsible for coordinating the thread pool and scheduling actors to process their messages.

Implementing the Dispatcher

The dispatcher is responsible for managing the thread pool and executing tasks on behalf of the actors. Whenever an actor receives a message, the dispatcher assigns it to one of the available worker threads so that the message can be processed asynchronously.

Internally, the dispatcher maintains a queue of tasks, a condition variable, and a pool of worker threads. When it is constructed, it spawns a number of worker threads (by default, four). Each worker runs a small lambda function in a loop: it waits on the condition variable until either a new task is submitted or a shutdown signal is received. If there is a task, the worker pops it from the queue and executes it. If the dispatcher has been stopped and the queue is empty, the worker exits its loop and terminates.

The submit()method is the mechanism for safely adding new tasks. It first locks the task queue with a std::unique_lock, pushes the function object into the queue, and then uses cv_.notify_one() to wake up one of the sleeping worker threads. This ensures that tasks are enqueued without data races and that idle workers are immediately signaled to begin processing.

The destructor (~Dispatcher()) is responsible for cleanly shutting down the thread pool. It locks the queue, sets the stop_ flag to true, and then calls cv_.notify_all()so that all waiting threads are released from their condition variable wait. Each worker will then notice the stop_ flag, exit its loop, and return. Finally, the destructor joins all the worker threads, ensuring they are fully terminated before the dispatcher object is destroyed. This guarantees that no background work is left running and prevents resource leaks.

1//dispatcher.hpp
2#pragma once
3
4#include <functional>
5#include <vector>
6#include <thread>
7#include <mutex>
8#include <queue>
9#include <condition_variable>
10
11class Dispatcher : std::enable_shared_from_this<Dispatcher>
12{
13    std::vector<std::thread> workers_;
14    std::mutex mutex_;
15    std::condition_variable cv_;
16    bool stop_ = false;
17    std::queue<std::function<void()>> tasks_;
18
19public:
20    explicit Dispatcher(int numThreads = 4);
21    void submit(std::function<void()>);
22    ~Dispatcher();
23};
1//dispatcher.cpp
2#include "Dispatcher.hpp"
3#include <functional>
4#include <mutex>
5#include <thread>
6
7Dispatcher::Dispatcher(int numThreads)
8{
9    auto threadLambda = [&]() {
10        while (true) {
11            std::function<void()> task;
12            {
13                std::unique_lock<std::mutex> lock(mutex_);
14                cv_.wait(lock, [&] { return !tasks_.empty() || stop_; });
15                if (stop_ && tasks_.empty()) {
16                    return;
17                }
18                task = std::move(tasks_.front());
19                tasks_.pop();
20            }
21            task();
22        }
23    };
24
25    for (int i = 0; i < numThreads; i++) {
26        workers_.emplace_back(std::thread(threadLambda));
27    }
28}
29
30void Dispatcher::submit(std::function<void()> task)
31{
32    {
33        std::unique_lock<std::mutex> lock(mutex_);
34        tasks_.push(task);
35    }
36    cv_.notify_one();
37}
38
39Dispatcher::~Dispatcher()
40{
41    {
42        std::unique_lock<std::mutex> lock(mutex_);
43        stop_ = true;
44    }
45    cv_.notify_all();
46    for (auto &worker : workers_) {
47        worker.join();
48    }
49}

Implementing the ActorInstance

With the mailbox and dispatcher in place, we can now define the ActorInstance. Each ActorInstance represents a single running actor and is responsible for storing its messages, executing them using the actor's logic, and interacting with the dispatcher to schedule work across threads.

An ActorInstance is made up of several key parts. It contains a mailbox for incoming messages, which is owned directly by the instance. It also holds a shared pointer to an Actor that defines how messages are processed, along with a shared pointer to the dispatcher that is responsible for submitting tasks to the thread pool. Finally, it maintains an atomic boolean called scheduled_, which tracks whether a processing task is already scheduled. This prevents multiple threads from redundantly submitting the same work.

The enqueue() method adds a new message to the mailbox. If the actor is not already scheduled, it flips the scheduled_ flag to true and submits a task to the dispatcher that will call process_message(). This ensures that message processing is triggered as soon as work is available.

The process_message() method dequeues and handles all messages currently in the mailbox by calling the actor's receive() method. Once the mailbox is empty, the method resets scheduled_ to false. However, if new messages arrive in the meantime, it atomically sets scheduled_ back to true and resubmits itself to the dispatcher. This lock-free approach ensures that no messages are lost and avoids scheduling duplicate tasks.

Notice that tasks capture shared_from_this()instead of a raw this pointer. This guarantees that the ActorInstance remains alive until the submitted task finishes executing, preventing dangling references and potential segmentation faults in a multithreaded environment.

1//ActorInstance.hpp
2#pragma once
3
4#include "Mailbox.hpp"
5#include "Actor.hpp"
6#include "Dispatcher.hpp"
7#include <string>
8
9class ActorInstance : std::enable_shared_from_this<ActorInstance>
10{
11    MailBox<std::string> mailbox_;
12    std::shared_ptr<Actor> actor_;
13    std::shared_ptr<Dispatcher> dispatcher_;
14    std::atomic<bool> scheduled_ = false;
15
16public:
17    ActorInstance(std::shared_ptr<Dispatcher> dispatcher, std::shared_ptr<Actor> actor);
18    void enqueue(const std::string &message, std::shared_ptr<ActorInstance> self);
19    void process_message();
20};
1//ActorInstance.cpp
2#include "ActorInstance.hpp"
3#include <functional>
4#include <iostream>
5
6ActorInstance::ActorInstance(std::shared_ptr<Dispatcher> dispatcher, std::shared_ptr<Actor> actor) 
7  : dispatcher_(std::move(dispatcher)), actor_(std::move(actor)) {}
8
9void ActorInstance::enqueue(const std::string &message, std::shared_ptr<ActorInstance> self)
10{
11    mailbox_.push(message);
12    if (!scheduled_.exchange(true)) { // schedule only if not already scheduled
13        try {
14            std::function<void()> func = [self]() {
15                self->process_message();
16            };
17            dispatcher_->submit(func);
18        } catch (const std::bad_weak_ptr &e) {
19            scheduled_ = false;
20            std::cout << "error: " << e.what() << std::endl;
21        }
22    }
23}
24
25void ActorInstance::process_message()
26{
27    std::string message;
28    while (mailbox_.try_pop(message)) {
29        actor_->receive(message);
30    }
31    scheduled_ = false;
32
33    // check again to avoid missing messages that arrived after reset
34    if (!mailbox_.try_pop(message)) {
35        return; // mailbox truly empty
36    }
37    if (!scheduled_.exchange(true)) {
38        dispatcher_->submit([self = shared_from_this()]() {
39            self->process_message();
40        });
41    }
42}

Creating an ActorRef

The ActorRef acts as a handle or reference to an ActorInstance. Instead of exposing the internal details of the actor, it provides a clean and simple interface for other parts of the system to communicate with it.

Internally, the ActorRef holds a std::shared_ptr<ActorInstance>. This ensures that the underlying actor remains alive as long as at least one reference to it exists, preventing accidental destruction while it is still being used.

The most important method here is tell. When a message is sent using tell, it is forwarded to the actor's internal enqueue function. This allows the actor to receive messages asynchronously without the sender needing to know about its implementation.

1// ActorRef.hpp
2#pragma once
3#include "ActorInstance.hpp"
4
5class ActorRef
6{
7    std::shared_ptr<ActorInstance> actorInstance_;
8
9public:
10    explicit ActorRef(std::shared_ptr<ActorInstance> actorInstance);
11    void tell(const std::string &message);
12};
13
14// ActorRef is a reference to an ActorInstance. It provides a clean interface 
15// to send messages without exposing the actor's internal details.
1// ActorRef.cpp
2#include "ActorRef.hpp"
3#include <iostream>
4
5ActorRef::ActorRef(std::shared_ptr<ActorInstance> actorInstance) 
6    : actorInstance_(actorInstance)
7{
8}
9
10void ActorRef::tell(const std::string &message)
11{
12    actorInstance_->enqueue(message, actorInstance_);
13}

Implementing our ActorSystem

The ActorSystem serves as the central runtime environment that manages the lifecycle of actors. It is responsible for creating new actors, assigning them to a dispatcher, and returning anActorRef that other components can use to communicate with them.

Internally, the system maintains a shared instance of a Dispatcher. Since multiple actors will rely on the same dispatcher to process messages, we store it as a std::shared_ptr. This ensures that the dispatcher remains alive for as long as the actor system is active and continues to serve any actor instances created within it.

The key method here is spawn, which takes a shared pointer to anActor and wraps it into an ActorInstance. The actor instance is then connected to the dispatcher, ensuring its messages are properly scheduled and processed. Finally, an ActorRef is returned, allowing external code to interact with the actor without exposing its internal details.

Notice that we use std::move for the actor but not for the dispatcher. The dispatcher belongs to the ActorSystem itself and should persist for the entire lifetime of the system, since it may be reused to spawn many actors. By contrast, moving the actor into the instance is safe and ensures that ownership of the actor is transferred into the actor system.

1// ActorSystem.hpp
2#pragma once
3
4#include "Dispatcher.hpp"
5#include "ActorRef.hpp"
6#include "Actor.hpp"
7#include <memory>
8
9class ActorSystem
10{
11    std::shared_ptr<Dispatcher> dispatcher_; // Shared_ptr because multiple actor instances share this dispatcher
12
13public:
14    ActorSystem();
15    ActorRef spawn(std::shared_ptr<Actor> actor);
16};
17
18// The ActorSystem manages the creation of actors and provides ActorRefs 
19// for external communication. It ensures all actors share the same dispatcher.
1// ActorSystem.cpp
2#include "ActorSystem.hpp"
3#include <memory>
4#include <iostream>
5
6ActorSystem::ActorSystem()
7{
8    dispatcher_ = std::make_shared<Dispatcher>(4);
9}
10
11ActorRef ActorSystem::spawn(std::shared_ptr<Actor> actor)
12{
13    auto actorInstance = std::make_shared<ActorInstance>(dispatcher_, std::move(actor));
14
15    ActorRef actorRef(actorInstance);
16    return actorRef;
17}
18
19// We move the actor into the ActorInstance since ownership is transferred,
20// but we keep the dispatcher_ as part of ActorSystem so it can be reused
21// for spawning multiple actors throughout the system's lifetime.

Defining the Main Class

Finally, we bring everything together in our main.cpp file, which serves as the entry point of the program. Here, we define a simple PrinterActor that overrides the receive method. Each time it processes a message, the actor acquires a global mutex before printing to the console. This ensures thread-safe output, preventing interleaved or corrupted text when multiple threads write at the same time.

Inside the main function, we first create an ActorSystem. We then instantiate a PrinterActor as a shared pointer and use the system's spawn method to create two ActorRef instances.

After spawning the two references, we send distinct messages to each one. Since message handling is asynchronous, we introduce a short sleep at the end of the program to give the dispatcher enough time to process the tasks before the application exits.

1// main.cpp
2#include "ActorSystem.hpp"
3#include "Actor.hpp"
4#include "Globals.hpp"
5
6#include <memory>
7#include <thread>
8#include <chrono>
9#include <iostream>
10
11std::mutex globals::cout_mutex;
12
13class PrinterActor : public Actor
14{
15public:
16    void receive(std::string message) override
17    {
18        std::lock_guard<std::mutex> lock(globals::cout_mutex);
19        std::cout << "Printer actor received " << message << std::endl;
20    }
21};
22
23int main()
24{
25    ActorSystem actorSystem;
26    auto sharedPrinter = std::make_shared<PrinterActor>();
27    ActorRef printerOne = actorSystem.spawn(sharedPrinter);
28    std::cout << "=== Spawning printerOne ===" << std::endl;
29    ActorRef printerTwo = actorSystem.spawn(sharedPrinter);
30    std::cout << "=== Spawning printerTwo ===" << std::endl;
31
32    printerOne.tell("hello to printer one");
33    printerTwo.tell("hello to printer two");
34    std::this_thread::sleep_for(std::chrono::seconds(2));
35    return 0;
36}

Configuring the Build with CMake

To build our project, we use CMakeLists.txt to define the compilation rules and generate the appropriate build files. In this setup, we first specify the C++ standard and then provide two optional build flags that allow us to enable either AddressSanitizer orThreadSanitizer. These tools are extremely helpful for detecting memory issues and data races during development.

The configuration collects all of our .cpp and .hpp files into a single executable named ActorSystem. Depending on the sanitizer option chosen at compile time, the corresponding flags are applied to both the compiler and the linker to instrument the build correctly.

To build with sanitizers enabled, you can pass the options directly from the command line. For example, -DENABLE_ASAN=ON enables AddressSanitizer, while -DENABLE_TSAN=ON enables ThreadSanitizer.

1cmake_minimum_required(VERSION 3.16)
2project(ActorSystem)
3
4set(CMAKE_CXX_STANDARD 17) # use C++17
5
6# Boolean flags that can be passed in from the command line
7option(ENABLE_ASAN "Enable AddressSanitizer" OFF) # detects memory leaks, use-after-free, etc.
8option(ENABLE_TSAN "Enable ThreadSanitizer" OFF) # detects data races and threading issues
9
10if (ENABLE_ASAN)
11    message(STATUS "AddressSanitizer enabled")
12    set(SANITIZER_FLAGS -fsanitize=address -fno-omit-frame-pointer)
13elseif (ENABLE_TSAN)
14    message(STATUS "ThreadSanitizer enabled")
15    set(SANITIZER_FLAGS -fsanitize=thread -fno-omit-frame-pointer)
16endif()
17
18add_executable(ActorSystem
19    main.cpp
20    Actor.hpp
21    ActorRef.hpp ActorRef.cpp
22    Mailbox.hpp
23    Globals.hpp
24    Dispatcher.hpp Dispatcher.cpp
25    ActorInstance.hpp ActorInstance.cpp
26    ActorSystem.hpp ActorSystem.cpp
27)
28
29target_compile_options(ActorSystem PRIVATE ${SANITIZER_FLAGS})
30target_link_options(ActorSystem PRIVATE ${SANITIZER_FLAGS})
31
32# Example build commands:
33# rm -rf build
34# mkdir build && cd build
35# cmake -DENABLE_ASAN=ON -DCMAKE_BUILD_TYPE=Debug ../CMakeLists.txt 
36# cmake -DENABLE_TSAN=ON -DCMAKE_BUILD_TYPE=Debug ../CMakeLists.txt