Some Rust concurrency examples
Asynchronous I/O Pipeline Server with Tokio and Message Passing
1loop {
2 let (stream, _) = listener.accept().await?;
3
4 tokio::spawn(async move {
5 loop {
6 let mut buf = Vec::new();
7 stream.read(&mut buf).await?;
8 sq_tx.send(IoOperation::Read(stream, buf)).await?;
9 }
10 });
11
12 tokio::spawn(async move {
13 while let Some(op) = sq_rx.recv().await {
14 tokio::spawn(async move {
15 let res = process(op.data).await;
16 cq_tx.send(IoOperation::Write(op.stream, res)).await?;
17 });
18 }
19 });
20
21 tokio::spawn(async move {
22 while let Some(op) = cq_rx.recv().await {
23 op.stream.write_all(op.data).await?;
24 }
25 });
26}
Concurrent Echo Server Using Tokio and Async I/O
1async fn handle_client(stream: TcpStream) -> std::io::Result<()> {
2 let mut reader = BufReader::new(stream);
3 let mut buf: Vec<u8> = Vec::new();
4 loop {
5 let size = reader.read_until(b'
6', &mut buf).await?;
7 if size == 0 || buf[size - 1] != b'
8' {
9 break;
10 }
11 reader.get_mut().write_all(&buf[..size]).await?;
12 buf.clear();
13 }
14 Ok(())
15}
16
17#[tokio::main]
18async fn main() -> std::io::Result<()> {
19 let port = std::env::args()
20 .nth(1)
21 .map(|s| s.parse().unwrap())
22 .unwrap_or(50000u16);
23 let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], port))).await?;
24 loop {
25 let (socket, _) = listener.accept().await?;
26 tokio::spawn(async move {
27 eprintln!("Accepted connection");
28 std::mem::drop(handle_client(socket).await);
29 eprintln!("Connection ended");
30 });
31 }
32}
Concurrent Water Molecule Assembly with Tokio Semaphores and Barrier
1fn bond(s: &str) {
2 println!("bond {s}!");
3}
4
5#[tokio::main]
6async fn main() {
7 let n = 100;
8 let f = Arc::new(WaterFactory::new());
9
10 let hs = (0..n * 2).map(|i| {
11 let f = f.clone();
12 tokio::spawn((|| async move {
13 f.hydrogen(|| bond(&format!("h{i}"))).await;
14 })())
15 });
16 let os = (0..n).map(|i| {
17 let f = f.clone();
18 tokio::spawn((|| async move {
19 f.oxygen(|| bond(&format!("o{i}"))).await;
20 })())
21 });
22
23 join_all(Iterator::chain(hs, os)).await;
24}
25
26struct WaterFactory {
27 o_sem: Semaphore,
28 h_sem: Semaphore,
29 barrier: Barrier,
30}
31
32impl WaterFactory {
33 fn new() -> Self {
34 Self {
35 o_sem: Semaphore::new(1),
36 h_sem: Semaphore::new(2),
37 barrier: Barrier::new(3),
38 }
39 }
40
41 async fn oxygen(&self, bond: impl FnOnce()) {
42 let _permit = self.o_sem.acquire().await.unwrap();
43 self.barrier.wait().await;
44 bond();
45 }
46
47 async fn hydrogen(&self, bond: impl FnOnce()) {
48 let _permit = self.h_sem.acquire().await.unwrap();
49 self.barrier.wait().await;
50 bond();
51 }
52}
Shared Counter with Threads Using Arc<Mutex<>>
1fn main() {
2 let counter = Arc::new(Mutex::new(0));
3
4 let t0 = {
5 let counter = counter.clone();
6 thread::spawn(move || {
7 // THE FOLLOWING IS NOT IDIOMATIC RUST
8 use std::ops::{Deref, DerefMut};
9
10 let mutex: &Mutex<i32> = counter.deref();
11 let lock_result = mutex.lock();
12 let mut lock_guard = lock_result.unwrap();
13 let counter_ref: &mut i32 = lock_guard.deref_mut();
14 *counter_ref += 1;
15 // END UNIDIOMATIC RUST
16 })
17 };
18 let t1 = {
19 let counter = counter.clone();
20 thread::spawn(move || {
21 *counter.lock().unwrap() += 1;
22 })
23 };
24
25 t0.join().unwrap();
26 t1.join().unwrap();
27
28 println!("{}", *counter.lock().unwrap());
29}
atomics in rust
1fn main() {
2 let counter = AtomicI32::new(0);
3
4 thread::scope(|s| {
5 s.spawn(|| {
6 counter.fetch_add(1, Ordering::Relaxed);
7 });
8 s.spawn(|| {
9 counter.fetch_add(1, Ordering::Relaxed);
10 });
11 });
12
13 println!("{}", counter.load(Ordering::Relaxed));
14}