Some Rust concurrency examples

Table of Contents

Asynchronous I/O Pipeline Server with Tokio and Message Passing

1loop {
2    let (stream, _) = listener.accept().await?;
3
4    tokio::spawn(async move {
5        loop {
6            let mut buf = Vec::new();
7            stream.read(&mut buf).await?;
8            sq_tx.send(IoOperation::Read(stream, buf)).await?;
9        }
10    });
11
12    tokio::spawn(async move {
13        while let Some(op) = sq_rx.recv().await {
14            tokio::spawn(async move {
15                let res = process(op.data).await;
16                cq_tx.send(IoOperation::Write(op.stream, res)).await?;
17            });
18        }
19    });
20
21    tokio::spawn(async move {
22        while let Some(op) = cq_rx.recv().await {
23            op.stream.write_all(op.data).await?;
24        }
25    });
26}

Concurrent Echo Server Using Tokio and Async I/O

1async fn handle_client(stream: TcpStream) -> std::io::Result<()> {
2    let mut reader = BufReader::new(stream);
3    let mut buf: Vec<u8> = Vec::new();
4    loop {
5        let size = reader.read_until(b'
6', &mut buf).await?;
7        if size == 0 || buf[size - 1] != b'
8' {
9            break;
10        }
11        reader.get_mut().write_all(&buf[..size]).await?;
12        buf.clear();
13    }
14    Ok(())
15}
16
17#[tokio::main]
18async fn main() -> std::io::Result<()> {
19    let port = std::env::args()
20        .nth(1)
21        .map(|s| s.parse().unwrap())
22        .unwrap_or(50000u16);
23    let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], port))).await?;
24    loop {
25        let (socket, _) = listener.accept().await?;
26        tokio::spawn(async move {
27            eprintln!("Accepted connection");
28            std::mem::drop(handle_client(socket).await);
29            eprintln!("Connection ended");
30        });
31    }
32}

Concurrent Water Molecule Assembly with Tokio Semaphores and Barrier

1fn bond(s: &str) {
2    println!("bond {s}!");
3}
4
5#[tokio::main]
6async fn main() {
7    let n = 100;
8    let f = Arc::new(WaterFactory::new());
9
10    let hs = (0..n * 2).map(|i| {
11        let f = f.clone();
12        tokio::spawn((|| async move {
13            f.hydrogen(|| bond(&format!("h{i}"))).await;
14        })())
15    });
16    let os = (0..n).map(|i| {
17        let f = f.clone();
18        tokio::spawn((|| async move {
19            f.oxygen(|| bond(&format!("o{i}"))).await;
20        })())
21    });
22
23    join_all(Iterator::chain(hs, os)).await;
24}
25
26struct WaterFactory {
27    o_sem: Semaphore,
28    h_sem: Semaphore,
29    barrier: Barrier,
30}
31
32impl WaterFactory {
33    fn new() -> Self {
34        Self {
35            o_sem: Semaphore::new(1),
36            h_sem: Semaphore::new(2),
37            barrier: Barrier::new(3),
38        }
39    }
40
41    async fn oxygen(&self, bond: impl FnOnce()) {
42        let _permit = self.o_sem.acquire().await.unwrap();
43        self.barrier.wait().await;
44        bond();
45    }
46
47    async fn hydrogen(&self, bond: impl FnOnce()) {
48        let _permit = self.h_sem.acquire().await.unwrap();
49        self.barrier.wait().await;
50        bond();
51    }
52}

Shared Counter with Threads Using Arc<Mutex<>>

1fn main() {
2  let counter = Arc::new(Mutex::new(0));
3
4  let t0 = {
5    let counter = counter.clone();
6    thread::spawn(move || {
7      // THE FOLLOWING IS NOT IDIOMATIC RUST
8      use std::ops::{Deref, DerefMut};
9
10      let mutex: &Mutex<i32> = counter.deref();
11      let lock_result = mutex.lock();
12      let mut lock_guard = lock_result.unwrap();
13      let counter_ref: &mut i32 = lock_guard.deref_mut();
14      *counter_ref += 1;
15      // END UNIDIOMATIC RUST
16    })
17  };
18  let t1 = {
19    let counter = counter.clone();
20    thread::spawn(move || {
21      *counter.lock().unwrap() += 1;
22    })
23  };
24
25  t0.join().unwrap();
26  t1.join().unwrap();
27
28  println!("{}", *counter.lock().unwrap());
29}

atomics in rust

1fn main() {
2    let counter = AtomicI32::new(0);
3
4    thread::scope(|s| {
5        s.spawn(|| {
6            counter.fetch_add(1, Ordering::Relaxed);
7        });
8        s.spawn(|| {
9            counter.fetch_add(1, Ordering::Relaxed);
10        });
11    });
12
13    println!("{}", counter.load(Ordering::Relaxed));
14}