Some C++ concurrency examples

Table of Contents

Concurrent Queue with limited capacity

1struct Request {
2    int data;
3};
4
5class ConcurrentRing {
6private:
7    std::queue<Request> queue;
8    const size_t capacity;
9
10    std::mutex mtx;
11    std::condition_variable not_empty;
12    std::counting_semaphore<> available_slots;
13
14public:
15    explicit ConcurrentRing(size_t size)
16        : capacity(size), available_slots(size) {}
17
18    void submit_request(const Request& req) {
19        available_slots.acquire(); // Wait for a slot
20
21        {
22            std::lock_guard<std::mutex> lock(mtx);
23            queue.push(req);
24        }
25        not_empty.notify_one(); // Notify one waiting consumer
26    }
27
28    Request retrieve_request() {
29        std::unique_lock<std::mutex> lock(mtx);
30        not_empty.wait(lock, [&] { return !queue.empty(); });
31
32        Request req = queue.front();
33        queue.pop();
34
35        lock.unlock();
36        available_slots.release(); 
37        return req;
38    }
39};

ConcurrentHashMap

1template <typename K, typename V, size_t BUCKETS = 16>
2class ConcurrentHashMap {
3private:
4    struct Bucket {
5        std::unordered_map<K, V> map;
6        mutable std::shared_mutex mtx;
7    };
8
9    std::vector<Bucket> buckets;
10
11    Bucket& getBucket(const K& key) {
12        return buckets[std::hash<K>{}(key) % BUCKETS];
13    }
14
15public:
16    ConcurrentHashMap() : buckets(BUCKETS) {}
17
18    void insert(const K& key, const V& value) {
19        auto& bucket = getBucket(key);
20        std::unique_lock lock(bucket.mtx);
21        bucket.map[key] = value;
22    }
23
24    bool get(const K& key, V& value) {
25        auto& bucket = getBucket(key);
26        std::shared_lock lock(bucket.mtx);
27        auto it = bucket.map.find(key);
28        if (it != bucket.map.end()) {
29            value = it->second;
30            return true;
31        }
32        return false;
33    }
34
35    void erase(const K& key) {
36        auto& bucket = getBucket(key);
37        std::unique_lock lock(bucket.mtx);
38        bucket.map.erase(key);
39    }
40};

Readers-writers problem

1class ThreadSafeCounter {
2public:
3    ThreadSafeCounter() = default;
4
5    unsigned int get() const {
6        std::shared_lock lock(mutex);
7        return value;
8    }
9
10    unsigned int increment() {
11        std::unique_lock lock(mutex);
12        return ++value;
13    }
14
15    void reset() {
16        std::unique_lock lock(mutex);
17        value = 0;
18    }
19
20private:
21    mutable std::shared_mutex mutex;
22    unsigned int value = 0;
23};
24
25int main() {
26    ThreadSafeCounter counter;
27
28    std::vector<std::thread> threads;
29
30    for (int i = 0; i < 5; ++i) {
31        threads.emplace_back([&counter]() {
32            std::cout << "Read: " << counter.get() << std::endl;
33        });
34    }
35    for (int i = 0; i < 3; ++i) {
36        threads.emplace_back([&counter]() {
37            std::cout << "Incremented to: " << counter.increment() << std::endl;
38        });
39    }
40    for (auto& t : threads) {
41        t.join();
42    }
43    return 0;
44}

H20 example

1struct WaterFactory3 {
2  std::counting_semaphore<> oxygenSem;
3  std::counting_semaphore<> hydrogenSem;
4  std::barrier<> barrier;
5
6  WaterFactory3() : oxygenSem{1}, hydrogenSem{2}, barrier{3} {}
7
8  void oxygen(void (*bond)()) {
9    oxygenSem.acquire();      
10    barrier.arrive_and_wait(); 
11    bond();
12    oxygenSem.release();       
13  }
14
15  void hydrogen(void (*bond)()) {
16    hydrogenSem.acquire();     // Lets at most two hydrogen through
17    barrier.arrive_and_wait(); 
18    bond();
19    hydrogenSem.release();     
20  }
21};

Lock free Queue with recycling centre

1struct Job
2{
3    int id;
4    int data;
5};
6
7class JobQueue5
8{
9    using stdmo = std::memory_order;
10
11    struct Node
12    {
13        std::atomic<Node*> next = QUEUE_END;
14        Job job;
15    };
16
17    static inline Node* const QUEUE_END = nullptr;
18    static inline Node* const STACK_END = QUEUE_END + 1;
19
20    struct alignas(16) GenNodePtr
21    {
22        Node* node;
23        uintptr_t gen;
24    };
25
26    static_assert(std::atomic<GenNodePtr>::is_always_lock_free);
27
28    alignas(64) std::atomic<Node*> m_queue_back;              // producer end
29    alignas(64) std::atomic<GenNodePtr> m_queue_front;        // consumer end
30    alignas(64) std::atomic<GenNodePtr> m_recycled_stack_top; // recycled node stack
31
32public:
33    JobQueue5() //
34        : m_queue_back(new Node())
35        , m_queue_front(GenNodePtr { m_queue_back.load(stdmo::relaxed), 1 })
36        , m_recycled_stack_top(GenNodePtr { STACK_END, 1 })
37    {
38    }
39
40    ~JobQueue5()
41    {
42        Node* cur_node = m_queue_front.load(stdmo::relaxed).node;
43        while(cur_node != QUEUE_END)
44        {
45            Node* next = cur_node->next;
46            delete cur_node;
47
48            cur_node = next;
49        }
50
51        cur_node = m_recycled_stack_top.load(stdmo::relaxed).node;
52        while(cur_node != STACK_END)
53        {
54            Node* next = cur_node->next;
55            delete cur_node;
56
57            cur_node = next;
58        }
59    }
60
61    // either get a node from the recycling stack if we have some,
62    // or allocate a new one if we don't.
63    Node* get_recycled_node_or_allocate_new()
64    {
65        GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
66        while(true)
67        {
68            if(old_stack_top.node == STACK_END)
69            {
70                return new Node();
71            }
72
73            // here: use **acquire**. synchronise with the release-store of
74            // node->next in add_node_to_recycling_stack
75            Node* cur_stack_next = old_stack_top.node->next.load(stdmo::acquire);
76
77            GenNodePtr new_stack_top { cur_stack_next, old_stack_top.gen + 1 };
78
79            if(m_recycled_stack_top.compare_exchange_weak( //
80                   old_stack_top,                          //
81                   new_stack_top,                          //
82                   stdmo::relaxed))
83            {
84                return old_stack_top.node;
85            }
86        }
87    }
88
89    // Put node in recycling centre
90    void add_node_to_recycling_stack(Node* node)
91    {
92        GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
93        while(true)
94        {
95            // here: use **release**. synchronise with the acquire-load of
96            // node->next in get_recycled_node_or_allocate_new
97            node->next.store(old_stack_top.node, stdmo::release);
98            GenNodePtr new_stack_top { node, old_stack_top.gen + 1 };
99
100            if(m_recycled_stack_top.compare_exchange_weak( //
101                   old_stack_top,                          //
102                   new_stack_top,                          //
103                   stdmo::relaxed))
104            {
105                break;
106            }
107        }
108    }
109
110
111public:
112    void push(Job job)
113    {
114        Node* new_dummy = get_recycled_node_or_allocate_new();
115        new_dummy->next.store(QUEUE_END, stdmo::relaxed);
116
117        Node* work_node = m_queue_back.exchange(new_dummy, stdmo::acq_rel);
118
119        work_node->job = job;
120        work_node->next.store(new_dummy, stdmo::release);
121    }
122
123    std::optional<Job> try_pop()
124    {
125        GenNodePtr old_front = m_queue_front.load(stdmo::acquire);
126        while(true)
127        {
128            Node* old_front_next = old_front.node->next.load(stdmo::acquire);
129            if(old_front_next == QUEUE_END)
130                return std::nullopt;
131
132            GenNodePtr new_front { old_front_next, old_front.gen + 1 };
133
134            if(m_queue_front.compare_exchange_weak(old_front, //
135                   new_front, stdmo::acq_rel))
136            {
137                break;
138            }
139        }
140
141        Job job = old_front.node->job;
142        add_node_to_recycling_stack(old_front.node);
143
144        return job;
145    }
146};