Some C++ concurrency examples
Concurrent Queue with limited capacity
1struct Request {
2 int data;
3};
4
5class ConcurrentRing {
6private:
7 std::queue<Request> queue;
8 const size_t capacity;
9
10 std::mutex mtx;
11 std::condition_variable not_empty;
12 std::counting_semaphore<> available_slots;
13
14public:
15 explicit ConcurrentRing(size_t size)
16 : capacity(size), available_slots(size) {}
17
18 void submit_request(const Request& req) {
19 available_slots.acquire(); // Wait for a slot
20
21 {
22 std::lock_guard<std::mutex> lock(mtx);
23 queue.push(req);
24 }
25 not_empty.notify_one(); // Notify one waiting consumer
26 }
27
28 Request retrieve_request() {
29 std::unique_lock<std::mutex> lock(mtx);
30 not_empty.wait(lock, [&] { return !queue.empty(); });
31
32 Request req = queue.front();
33 queue.pop();
34
35 lock.unlock();
36 available_slots.release();
37 return req;
38 }
39};
ConcurrentHashMap
1template <typename K, typename V, size_t BUCKETS = 16>
2class ConcurrentHashMap {
3private:
4 struct Bucket {
5 std::unordered_map<K, V> map;
6 mutable std::shared_mutex mtx;
7 };
8
9 std::vector<Bucket> buckets;
10
11 Bucket& getBucket(const K& key) {
12 return buckets[std::hash<K>{}(key) % BUCKETS];
13 }
14
15public:
16 ConcurrentHashMap() : buckets(BUCKETS) {}
17
18 void insert(const K& key, const V& value) {
19 auto& bucket = getBucket(key);
20 std::unique_lock lock(bucket.mtx);
21 bucket.map[key] = value;
22 }
23
24 bool get(const K& key, V& value) {
25 auto& bucket = getBucket(key);
26 std::shared_lock lock(bucket.mtx);
27 auto it = bucket.map.find(key);
28 if (it != bucket.map.end()) {
29 value = it->second;
30 return true;
31 }
32 return false;
33 }
34
35 void erase(const K& key) {
36 auto& bucket = getBucket(key);
37 std::unique_lock lock(bucket.mtx);
38 bucket.map.erase(key);
39 }
40};
Readers-writers problem
1class ThreadSafeCounter {
2public:
3 ThreadSafeCounter() = default;
4
5 unsigned int get() const {
6 std::shared_lock lock(mutex);
7 return value;
8 }
9
10 unsigned int increment() {
11 std::unique_lock lock(mutex);
12 return ++value;
13 }
14
15 void reset() {
16 std::unique_lock lock(mutex);
17 value = 0;
18 }
19
20private:
21 mutable std::shared_mutex mutex;
22 unsigned int value = 0;
23};
24
25int main() {
26 ThreadSafeCounter counter;
27
28 std::vector<std::thread> threads;
29
30 for (int i = 0; i < 5; ++i) {
31 threads.emplace_back([&counter]() {
32 std::cout << "Read: " << counter.get() << std::endl;
33 });
34 }
35 for (int i = 0; i < 3; ++i) {
36 threads.emplace_back([&counter]() {
37 std::cout << "Incremented to: " << counter.increment() << std::endl;
38 });
39 }
40 for (auto& t : threads) {
41 t.join();
42 }
43 return 0;
44}
H20 example
1struct WaterFactory3 {
2 std::counting_semaphore<> oxygenSem;
3 std::counting_semaphore<> hydrogenSem;
4 std::barrier<> barrier;
5
6 WaterFactory3() : oxygenSem{1}, hydrogenSem{2}, barrier{3} {}
7
8 void oxygen(void (*bond)()) {
9 oxygenSem.acquire();
10 barrier.arrive_and_wait();
11 bond();
12 oxygenSem.release();
13 }
14
15 void hydrogen(void (*bond)()) {
16 hydrogenSem.acquire(); // Lets at most two hydrogen through
17 barrier.arrive_and_wait();
18 bond();
19 hydrogenSem.release();
20 }
21};
Lock free Queue with recycling centre
1struct Job
2{
3 int id;
4 int data;
5};
6
7class JobQueue5
8{
9 using stdmo = std::memory_order;
10
11 struct Node
12 {
13 std::atomic<Node*> next = QUEUE_END;
14 Job job;
15 };
16
17 static inline Node* const QUEUE_END = nullptr;
18 static inline Node* const STACK_END = QUEUE_END + 1;
19
20 struct alignas(16) GenNodePtr
21 {
22 Node* node;
23 uintptr_t gen;
24 };
25
26 static_assert(std::atomic<GenNodePtr>::is_always_lock_free);
27
28 alignas(64) std::atomic<Node*> m_queue_back; // producer end
29 alignas(64) std::atomic<GenNodePtr> m_queue_front; // consumer end
30 alignas(64) std::atomic<GenNodePtr> m_recycled_stack_top; // recycled node stack
31
32public:
33 JobQueue5() //
34 : m_queue_back(new Node())
35 , m_queue_front(GenNodePtr { m_queue_back.load(stdmo::relaxed), 1 })
36 , m_recycled_stack_top(GenNodePtr { STACK_END, 1 })
37 {
38 }
39
40 ~JobQueue5()
41 {
42 Node* cur_node = m_queue_front.load(stdmo::relaxed).node;
43 while(cur_node != QUEUE_END)
44 {
45 Node* next = cur_node->next;
46 delete cur_node;
47
48 cur_node = next;
49 }
50
51 cur_node = m_recycled_stack_top.load(stdmo::relaxed).node;
52 while(cur_node != STACK_END)
53 {
54 Node* next = cur_node->next;
55 delete cur_node;
56
57 cur_node = next;
58 }
59 }
60
61 // either get a node from the recycling stack if we have some,
62 // or allocate a new one if we don't.
63 Node* get_recycled_node_or_allocate_new()
64 {
65 GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
66 while(true)
67 {
68 if(old_stack_top.node == STACK_END)
69 {
70 return new Node();
71 }
72
73 // here: use **acquire**. synchronise with the release-store of
74 // node->next in add_node_to_recycling_stack
75 Node* cur_stack_next = old_stack_top.node->next.load(stdmo::acquire);
76
77 GenNodePtr new_stack_top { cur_stack_next, old_stack_top.gen + 1 };
78
79 if(m_recycled_stack_top.compare_exchange_weak( //
80 old_stack_top, //
81 new_stack_top, //
82 stdmo::relaxed))
83 {
84 return old_stack_top.node;
85 }
86 }
87 }
88
89 // Put node in recycling centre
90 void add_node_to_recycling_stack(Node* node)
91 {
92 GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
93 while(true)
94 {
95 // here: use **release**. synchronise with the acquire-load of
96 // node->next in get_recycled_node_or_allocate_new
97 node->next.store(old_stack_top.node, stdmo::release);
98 GenNodePtr new_stack_top { node, old_stack_top.gen + 1 };
99
100 if(m_recycled_stack_top.compare_exchange_weak( //
101 old_stack_top, //
102 new_stack_top, //
103 stdmo::relaxed))
104 {
105 break;
106 }
107 }
108 }
109
110
111public:
112 void push(Job job)
113 {
114 Node* new_dummy = get_recycled_node_or_allocate_new();
115 new_dummy->next.store(QUEUE_END, stdmo::relaxed);
116
117 Node* work_node = m_queue_back.exchange(new_dummy, stdmo::acq_rel);
118
119 work_node->job = job;
120 work_node->next.store(new_dummy, stdmo::release);
121 }
122
123 std::optional<Job> try_pop()
124 {
125 GenNodePtr old_front = m_queue_front.load(stdmo::acquire);
126 while(true)
127 {
128 Node* old_front_next = old_front.node->next.load(stdmo::acquire);
129 if(old_front_next == QUEUE_END)
130 return std::nullopt;
131
132 GenNodePtr new_front { old_front_next, old_front.gen + 1 };
133
134 if(m_queue_front.compare_exchange_weak(old_front, //
135 new_front, stdmo::acq_rel))
136 {
137 break;
138 }
139 }
140
141 Job job = old_front.node->job;
142 add_node_to_recycling_stack(old_front.node);
143
144 return job;
145 }
146};