Testcase: map-reduce
Rust робить дуже легким паралелізацію обробки даних, без багатьох головних болів, традиційно пов’язаних із такою спробою.
Стандартна бібліотека надає чудові примітиви потоків з коробки. Ці примітиви, у поєднанні з концепцією Ownership у Rust і правилами аліасингу, автоматично запобігають станам гонки даних.
Правила аліасингу (одне змінне посилання XOR багато читаючих посилань) автоматично запобігають
вам маніпулювати станом, який видимий для інших потоків. (Коли потрібна синхронізація,
існують примітиви синхронізації
такі як Mutexes або Channels.)
У цьому прикладі ми обчислимо суму всіх цифр у блоці чисел. Ми зробимо це, розподіливши шматки блоку між різними потоками. Кожен потік підсумує свій маленький блок цифр, а згодом ми підсумуємо проміжні суми, отримані кожним потоком.
Зверніть увагу, що хоча ми передаємо посилання через межі потоків, Rust розуміє, що ми
передаємо лише посилання лише для читання, і тому не може виникнути ані небезпеки, ані станів гонки даних. Також через те, що
посилання, які ми передаємо, мають часи життя 'static, Rust розуміє, що наші дані не будуть
знищені, поки ці потоки все ще працюють. (Коли вам потрібно ділити дані з не-static
часами життя між потоками, ви можете використати розумний вказівник, такий як Arc, щоб підтримувати дані живими й уникнути не-static
часів життя.)
use std::thread;
// This is the `main` thread
fn main() {
// This is our data to process.
// We will calculate the sum of all digits via a threaded map-reduce algorithm.
// Each whitespace separated chunk will be handled in a different thread.
//
// TODO: see what happens to the output if you insert spaces!
let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";
// Make a vector to hold the child-threads which we will spawn.
let mut children = vec![];
/*************************************************************************
* "Map" phase
*
* Divide our data into segments, and apply initial processing
************************************************************************/
// split our data into segments for individual calculation
// each chunk will be a reference (&str) into the actual data
let chunked_data = data.split_whitespace();
// Iterate over the data segments.
// .enumerate() adds the current loop index to whatever is iterated
// the resulting tuple "(index, element)" is then immediately
// "destructured" into two variables, "i" and "data_segment" with a
// "destructuring assignment"
for (i, data_segment) in chunked_data.enumerate() {
println!("data segment {} is \"{}\"", i, data_segment);
// Process each data segment in a separate thread
//
// spawn() returns a handle to the new thread,
// which we MUST keep to access the returned value
//
// 'move || -> u32' is syntax for a closure that:
// * takes no arguments ('||')
// * takes ownership of its captured variables ('move') and
// * returns an unsigned 32-bit integer ('-> u32')
//
// Rust is smart enough to infer the '-> u32' from
// the closure itself so we could have left that out.
//
// TODO: try removing the 'move' and see what happens
children.push(thread::spawn(move || -> u32 {
// Calculate the intermediate sum of this segment:
let result = data_segment
// iterate over the characters of our segment..
.chars()
// .. convert text-characters to their number value..
.map(|c| c.to_digit(10).expect("should be a digit"))
// .. and sum the resulting iterator of numbers
.sum();
// println! locks stdout, so no text-interleaving occurs
println!("processed segment {}, result={}", i, result);
// "return" not needed, because Rust is an "expression language", the
// last evaluated expression in each block is automatically its value.
result
}));
}
/*************************************************************************
* "Reduce" phase
*
* Collect our intermediate results, and combine them into a final result
************************************************************************/
// combine each thread's intermediate results into a single final sum.
//
// we use the "turbofish" ::<> to provide sum() with a type hint.
//
// TODO: try without the turbofish, by instead explicitly
// specifying the type of final_result
let final_result = children.into_iter().map(|c| c.join().unwrap()).sum::<u32>();
println!("Final sum result: {}", final_result);
}
Завдання
Не розумно дозволяти нашій кількості потоків залежати від введених користувачем даних. Що, якщо користувач вирішить вставити багато пробілів? Чи справді ми хочемо створити 2,000 потоків? Змініть програму так, щоб дані завжди розбивалися на обмежену кількість шматків, визначену статичною константою на початку програми.
Див. також:
- Потоки
- вектори і ітератори
- замикання, семантика move і
moveclosures - деструктуризувальні присвоювання
- позначення turbofish для допомоги виведенню типів
- unwrap vs. expect
- enumerate