From 13f58c87bd481747c7f9019d485135a6fef18707 Mon Sep 17 00:00:00 2001 From: Denis Nutiu Date: Thu, 7 Nov 2024 22:33:08 +0200 Subject: [PATCH] record progress: threads --- .rustlings-state.txt | 7 +++- exercises/20_threads/threads1.rs | 2 +- exercises/20_threads/threads2.rs | 13 +++---- exercises/20_threads/threads3.rs | 5 ++- solutions/20_threads/threads1.rs | 37 +++++++++++++++++- solutions/20_threads/threads2.rs | 43 +++++++++++++++++++-- solutions/20_threads/threads3.rs | 64 ++++++++++++++++++++++++++++++-- 7 files changed, 150 insertions(+), 21 deletions(-) diff --git a/.rustlings-state.txt b/.rustlings-state.txt index 7523b62..c976c0b 100644 --- a/.rustlings-state.txt +++ b/.rustlings-state.txt @@ -1,6 +1,6 @@ DON'T EDIT THIS FILE! -threads1 +macros1 intro1 intro2 @@ -80,4 +80,7 @@ iterators5 box1 rc1 arc1 -cow1 \ No newline at end of file +cow1 +threads1 +threads2 +threads3 \ No newline at end of file diff --git a/exercises/20_threads/threads1.rs b/exercises/20_threads/threads1.rs index 01f9ff4..525a7e1 100644 --- a/exercises/20_threads/threads1.rs +++ b/exercises/20_threads/threads1.rs @@ -22,8 +22,8 @@ fn main() { let mut results = Vec::new(); for handle in handles { - // TODO: Collect the results of all threads into the `results` vector. // Use the `JoinHandle` struct which is returned by `thread::spawn`. + results.push(handle.join().unwrap()) } if results.len() != 10 { diff --git a/exercises/20_threads/threads2.rs b/exercises/20_threads/threads2.rs index 7020cb9..3b6df09 100644 --- a/exercises/20_threads/threads2.rs +++ b/exercises/20_threads/threads2.rs @@ -3,23 +3,21 @@ // shared value: `JobStatus.jobs_done` use std::{sync::Arc, thread, time::Duration}; +use std::sync::Mutex; struct JobStatus { jobs_done: u32, } fn main() { - // TODO: `Arc` isn't enough if you want a **mutable** shared state. - let status = Arc::new(JobStatus { jobs_done: 0 }); + let status = Arc::new(Mutex::new(JobStatus { jobs_done: 0 })); let mut handles = Vec::new(); for _ in 0..10 { - let status_shared = Arc::clone(&status); + let mut status_shared = Arc::clone(&status); let handle = thread::spawn(move || { thread::sleep(Duration::from_millis(250)); - - // TODO: You must take an action before you update a shared value. - status_shared.jobs_done += 1; + status_shared.lock().unwrap().jobs_done += 1; }); handles.push(handle); } @@ -29,6 +27,5 @@ fn main() { handle.join().unwrap(); } - // TODO: Print the value of `JobStatus.jobs_done`. - println!("Jobs done: {}", todo!()); + println!("Jobs done: {}", status.lock().unwrap().jobs_done); } diff --git a/exercises/20_threads/threads3.rs b/exercises/20_threads/threads3.rs index 6d16bd9..85e4121 100644 --- a/exercises/20_threads/threads3.rs +++ b/exercises/20_threads/threads3.rs @@ -15,8 +15,9 @@ impl Queue { } fn send_tx(q: Queue, tx: mpsc::Sender) { - // TODO: We want to send `tx` to both threads. But currently, it is moved + // We want to send `tx` to both threads. But currently, it is moved // into the first thread. How could you solve this problem? + let tx2 = tx.clone(); thread::spawn(move || { for val in q.first_half { println!("Sending {val:?}"); @@ -28,7 +29,7 @@ fn send_tx(q: Queue, tx: mpsc::Sender) { thread::spawn(move || { for val in q.second_half { println!("Sending {val:?}"); - tx.send(val).unwrap(); + tx2.send(val).unwrap(); thread::sleep(Duration::from_millis(250)); } }); diff --git a/solutions/20_threads/threads1.rs b/solutions/20_threads/threads1.rs index dcf2377..7f3dd29 100644 --- a/solutions/20_threads/threads1.rs +++ b/solutions/20_threads/threads1.rs @@ -1,4 +1,37 @@ +// This program spawns multiple threads that each run for at least 250ms, and +// each thread returns how much time they took to complete. The program should +// wait until all the spawned threads have finished and should collect their +// return values into a vector. + +use std::{ + thread, + time::{Duration, Instant}, +}; + fn main() { - // DON'T EDIT THIS SOLUTION FILE! - // It will be automatically filled after you finish the exercise. + let mut handles = Vec::new(); + for i in 0..10 { + let handle = thread::spawn(move || { + let start = Instant::now(); + thread::sleep(Duration::from_millis(250)); + println!("Thread {i} done"); + start.elapsed().as_millis() + }); + handles.push(handle); + } + + let mut results = Vec::new(); + for handle in handles { + // Collect the results of all threads into the `results` vector. + results.push(handle.join().unwrap()); + } + + if results.len() != 10 { + panic!("Oh no! Some thread isn't done yet!"); + } + + println!(); + for (i, result) in results.into_iter().enumerate() { + println!("Thread {i} took {result}ms"); + } } diff --git a/solutions/20_threads/threads2.rs b/solutions/20_threads/threads2.rs index dcf2377..bc268d6 100644 --- a/solutions/20_threads/threads2.rs +++ b/solutions/20_threads/threads2.rs @@ -1,4 +1,41 @@ -fn main() { - // DON'T EDIT THIS SOLUTION FILE! - // It will be automatically filled after you finish the exercise. +// Building on the last exercise, we want all of the threads to complete their +// work. But this time, the spawned threads need to be in charge of updating a +// shared value: `JobStatus.jobs_done` + +use std::{ + sync::{Arc, Mutex}, + thread, + time::Duration, +}; + +struct JobStatus { + jobs_done: u32, +} + +fn main() { + // `Arc` isn't enough if you want a **mutable** shared state. + // We need to wrap the value with a `Mutex`. + let status = Arc::new(Mutex::new(JobStatus { jobs_done: 0 })); + // ^^^^^^^^^^^ ^ + + let mut handles = Vec::new(); + for _ in 0..10 { + let status_shared = Arc::clone(&status); + let handle = thread::spawn(move || { + thread::sleep(Duration::from_millis(250)); + + // Lock before you update a shared value. + status_shared.lock().unwrap().jobs_done += 1; + // ^^^^^^^^^^^^^^^^ + }); + handles.push(handle); + } + + // Waiting for all jobs to complete. + for handle in handles { + handle.join().unwrap(); + } + + println!("Jobs done: {}", status.lock().unwrap().jobs_done); + // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ } diff --git a/solutions/20_threads/threads3.rs b/solutions/20_threads/threads3.rs index dcf2377..7ceefea 100644 --- a/solutions/20_threads/threads3.rs +++ b/solutions/20_threads/threads3.rs @@ -1,4 +1,62 @@ -fn main() { - // DON'T EDIT THIS SOLUTION FILE! - // It will be automatically filled after you finish the exercise. +use std::{sync::mpsc, thread, time::Duration}; + +struct Queue { + first_half: Vec, + second_half: Vec, +} + +impl Queue { + fn new() -> Self { + Self { + first_half: vec![1, 2, 3, 4, 5], + second_half: vec![6, 7, 8, 9, 10], + } + } +} + +fn send_tx(q: Queue, tx: mpsc::Sender) { + // Clone the sender `tx` first. + let tx_clone = tx.clone(); + thread::spawn(move || { + for val in q.first_half { + println!("Sending {val:?}"); + // Then use the clone in the first thread. This means that + // `tx_clone` is moved to the first thread and `tx` to the second. + tx_clone.send(val).unwrap(); + thread::sleep(Duration::from_millis(250)); + } + }); + + thread::spawn(move || { + for val in q.second_half { + println!("Sending {val:?}"); + tx.send(val).unwrap(); + thread::sleep(Duration::from_millis(250)); + } + }); +} + +fn main() { + // You can optionally experiment here. +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn threads3() { + let (tx, rx) = mpsc::channel(); + let queue = Queue::new(); + + send_tx(queue, tx); + + let mut received = Vec::with_capacity(10); + for value in rx { + received.push(value); + } + + received.sort(); + assert_eq!(received, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + } }