状態共有並行性

メッセージ受け渡しは、並行性を扱う素晴らしい方法ですが、唯一の方法ではありません。 Go言語ドキュメンテーションのスローガンのこの部分を再び考えてください: 「メモリを共有することでやり取りする。」

メモリを共有することでやり取りするとはどんな感じなのでしょうか?さらに、 なぜメッセージ受け渡しに熱狂的な人は、それを使わず、代わりに全く反対のことをするのでしょうか?

ある意味では、どんなプログラミング言語のチャンネルも単独の所有権に類似しています。 一旦チャンネルに値を転送したら、その値は最早使用することがないからです。 メモリ共有並行性は、複数の所有権に似ています: 複数のスレッドが同時に同じメモリ位置にアクセスできるのです。 第15章でスマートポインタが複数の所有権を可能にするのを目の当たりにしたように、 異なる所有者を管理する必要があるので、複数の所有権は複雑度を増させます。 Rustの型システムと所有権規則は、この管理を正しく行う大きな助けになります。 例として、メモリ共有を行うより一般的な並行性の基本型の一つであるミューテックスを見てみましょう。

ミューテックスを使用して一度に1つのスレッドからデータにアクセスすることを許可する

ミューテックスは、どんな時も1つのスレッドにしかなんらかのデータへのアクセスを許可しないというように、 "mutual exclusion"(相互排他)の省略形です。ミューテックスにあるデータにアクセスするには、 ミューテックスのロックを所望することでアクセスしたいことをまず、スレッドは通知しなければなりません。 ロックとは、現在誰がデータへの排他的アクセスを行なっているかを追跡するミューテックスの一部をなすデータ構造です。 故に、ミューテックスはロックシステム経由で保持しているデータを死守する(guarding)と解説されます。

ミューテックスは、2つの規則を覚えておく必要があるため、難しいという評判があります:

  • データを使用する前にロックの獲得を試みなければならない。
  • ミューテックスが死守しているデータの使用が終わったら、他のスレッドがロックを獲得できるように、 データをアンロックしなければならない。

ミューテックスを現実世界の物で例えるなら、マイクが1つしかない会議のパネルディスカッションを思い浮かべてください。 パネリストが発言できる前に、マイクを使用したいと申し出たり、通知しなければなりません。マイクを受け取ったら、 話したいだけ話し、それから次に発言を申し出たパネリストにマイクを手渡します。パネリストが発言し終わった時に、 マイクを手渡すのを忘れていたら、誰も他の人は発言できません。共有されているマイクの管理がうまくいかなければ、 パネルは予定通りに機能しないでしょう!

ミューテックスの管理は、正しく行うのに著しく技巧を要することがあるので、多くの人がチャンネルに熱狂的になるわけです。 しかしながら、Rustの型システムと所有権規則のおかげで、ロックとアンロックをおかしくすることはありません。

Mutex<T>のAPI

ミューテックスの使用方法の例として、ミューテックスをシングルスレッドの文脈で使うことから始めましょう。 リスト16-12のようにですね:

ファイル名: src/main.rs

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}

リスト16-12: 簡潔性のためにMutex<T>のAPIをシングルスレッドの文脈で探究する

多くの型同様、newという関連関数を使用してMutex<T>を生成します。ミューテックス内部のデータにアクセスするには、 lockメソッドを使用してロックを獲得します。この呼び出しは、現在のスレッドをブロックするので、 ロックを得られる順番が来るまで何も作業はできません。

ロックを保持している他のスレッドがパニックしたら、lockの呼び出しは失敗するでしょう。その場合、 誰もロックを取得することは叶わないので、unwrapすると決定し、そのような状況になったら、 このスレッドをパニックさせます。

ロックを獲得した後、今回の場合、numと名付けられていますが、戻り値を中に入っているデータへの可変参照として扱うことができます。 型システムにより、mの値を使用する前にロックを獲得していることが確認されます: Mutex<i32>i32ではないので、 i32を使用できるようにするには、ロックを獲得しなければならないのです。忘れることはあり得ません; 型システムにより、それ以外の場合に内部のi32にアクセスすることは許されません。

お察しかもしれませんが、Mutex<T>はスマートポインタです。より正確を期すなら、 lockの呼び出しがMutexGuardというスマートポインタを返却します。このスマートポインタが、 内部のデータを指すDerefを実装しています; このスマートポインタはさらにMutexGuardがスコープを外れた時に、 自動的にロックを解除するDrop実装もしていて、これがリスト16-12の内部スコープの終わりで発生します。 結果として、ロックの解除が自動的に行われるので、ロックの解除を忘れ、 ミューテックスが他のスレッドで使用されるのを阻害するリスクを負いません。

ロックをドロップした後、ミューテックスの値を出力し、内部のi32の値を6に変更できたことが確かめられるのです。

複数のスレッド間でMutex<T>を共有する

さて、Mutex<T>を使って複数のスレッド間で値を共有してみましょう。10個のスレッドを立ち上げ、 各々カウンタの値を1ずつインクリメントさせるので、カウンタは0から10まで上がります。 以下の数例は、コンパイルエラーになることに注意し、そのエラーを使用してMutex<T>の使用法と、 コンパイラがそれを正しく活用する手助けをしてくれる方法について学びます。リスト16-13が最初の例です:

ファイル名: src/main.rs

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

リスト16-13: Mutex<T>により死守されているカウンタを10個のスレッドがそれぞれインクリメントする

リスト16-12のように、counter変数を生成してMutex<T>の内部にi32を保持しています。 次に、数値の範囲をマッピングして10個のスレッドを生成しています。thread::spawnを使用して、 全スレッドに同じクロージャを与えています。このクロージャは、スレッド内にカウンタをムーブし、 lockメソッドを呼ぶことでMutex<T>のロックを獲得し、それからミューテックスの値に1を足します。 スレッドがクロージャを実行し終わったら、numはスコープ外に出てロックを解除するので、 他のスレッドが獲得できるわけです。

メインスレッドで全てのjoinハンドルを収集します。それからリスト16-2のように、各々に対してjoinを呼び出し、 全スレッドが終了するのを確かめています。その時点で、メインスレッドはロックを獲得し、このプログラムの結果を出力します。

この例はコンパイルできないでしょうと仄めかしました。では、理由を探りましょう!

error[E0382]: capture of moved value: `counter`
(エラー: ムーブされた値をキャプチャしています: `counter`)
  --> src/main.rs:10:27
   |
9  |         let handle = thread::spawn(move || {
   |                                    ------- value moved (into closure) here
10 |             let mut num = counter.lock().unwrap();
   |                           ^^^^^^^ value captured here after move
   |
   = note: move occurs because `counter` has type `std::sync::Mutex<i32>`,
   which does not implement the `Copy` trait

error[E0382]: use of moved value: `counter`
  --> src/main.rs:21:29
   |
9  |         let handle = thread::spawn(move || {
   |                                    ------- value moved (into closure) here
...
21 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value used here after move
   |
   = note: move occurs because `counter` has type `std::sync::Mutex<i32>`,
   which does not implement the `Copy` trait

error: aborting due to 2 previous errors
(エラー: 前述の2つのエラーによりアボート)

エラーメッセージは、counter値はクロージャにムーブされ、それからlockを呼び出したときにキャプチャされていると述べています。 その説明は、所望した動作のように聞こえますが、許可されていないのです!

プログラムを単純化してこれを理解しましょう。forループで10個スレッドを生成する代わりに、 ループなしで2つのスレッドを作るだけにしてどうなるか確認しましょう。 リスト16-13の最初のforループを代わりにこのコードと置き換えてください:

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    let handle = thread::spawn(move || {
        let mut num = counter.lock().unwrap();

        *num += 1;
    });
    handles.push(handle);

    let handle2 = thread::spawn(move || {
        let mut num2 = counter.lock().unwrap();

        *num2 += 1;
    });
    handles.push(handle2);

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

2つのスレッドを生成し、2番目のスレッドの変数名をhandle2num2に変更しています。 今回このコードを走らせると、コンパイラは以下の出力をします:

error[E0382]: capture of moved value: `counter`
  --> src/main.rs:16:24
   |
8  |     let handle = thread::spawn(move || {
   |                                ------- value moved (into closure) here
...
16 |         let mut num2 = counter.lock().unwrap();
   |                        ^^^^^^^ value captured here after move
   |
   = note: move occurs because `counter` has type `std::sync::Mutex<i32>`,
   which does not implement the `Copy` trait

error[E0382]: use of moved value: `counter`
  --> src/main.rs:26:29
   |
8  |     let handle = thread::spawn(move || {
   |                                ------- value moved (into closure) here
...
26 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value used here after move
   |
   = note: move occurs because `counter` has type `std::sync::Mutex<i32>`,
   which does not implement the `Copy` trait

error: aborting due to 2 previous errors

なるほど!最初のエラーメッセージは、handleに紐づけられたスレッドのクロージャにcounterがムーブされていることを示唆しています。 そのムーブにより、それに対してlockを呼び出し、結果を2番目のスレッドのnum2に保持しようとした時に、 counterをキャプチャすることを妨げています!ゆえに、コンパイラは、counterの所有権を複数のスレッドに移すことはできないと教えてくれています。 これは、以前では確認しづらかったことです。なぜなら、スレッドはループの中にあり、 ループの違う繰り返しにある違うスレッドをコンパイラは指し示せないからです。 第15章で議論した複数所有権メソッドによりコンパイルエラーを修正しましょう。

複数のスレッドで複数の所有権

第15章で、スマートポインタのRc<T>を使用して参照カウントの値を作ることで、1つの値に複数の所有者を与えました。 同じことをここでもして、どうなるか見ましょう。リスト16-14でRc<T>Mutex<T>を包含し、 所有権をスレッドに移す前にRc<T>をクローンします。今やエラーを確認したので、 forループの使用に立ち戻り、クロージャにmoveキーワードを使用し続けます。

ファイル名: src/main.rs

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

リスト16-14: Rc<T>を使用して複数のスレッドにMutex<T>を所有させようとする

再三、コンパイルし……別のエラーが出ました!コンパイラはいろんなことを教えてくれています。

error[E0277]: the trait bound `std::rc::Rc<std::sync::Mutex<i32>>:
std::marker::Send` is not satisfied in `[closure@src/main.rs:11:36:
15:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`
(エラー: トレイト境界`std::rc::Rc<std::sync::Mutex<i32>>:
std::marker::Send`は`[closure@src/main.rs:11:36:15:10
counter:std::rc::Rc<std::sync::Mutex<i32>>]`で満たされていません)
  --> src/main.rs:11:22
   |
11 |         let handle = thread::spawn(move || {
   |                      ^^^^^^^^^^^^^ `std::rc::Rc<std::sync::Mutex<i32>>`
cannot be sent between threads safely
                          (`std::rc::Rc<std::sync::Mutex<i32>>`は、スレッド間で安全に送信できません)
   |
   = help: within `[closure@src/main.rs:11:36: 15:10
counter:std::rc::Rc<std::sync::Mutex<i32>>]`, the trait `std::marker::Send` is
not implemented for `std::rc::Rc<std::sync::Mutex<i32>>`
     (ヘルプ: `[closure@src/main.rs:11:36 15:10
     counter:std::rc::Rc<std::sync::Mutex<i32>>]`内でトレイト`std::marker::Send`は、
     `std::rc::Rc<std::sync::Mutex<i32>>`に対して実装されていません)
   = note: required because it appears within the type
`[closure@src/main.rs:11:36: 15:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`
     (注釈: 型`[closure@src/main.rs:11:36 15:10
     counter:std::rc::Rc<std::sync::Mutex<i32>>]`内に出現するので必要です)
   = note: required by `std::thread::spawn`
     (注釈: `std::thread::spawn`により必要とされています)

おお、このエラーメッセージはとても長ったらしいですね!こちらが、注目すべき重要な部分です: 最初のインラインエラーは `std::rc::Rc<std::sync::Mutex<i32>>` cannot be sent between threads safelyと述べています。この理由は、エラーメッセージの次に注目すべき重要な部分にあります。 洗練されたエラーメッセージは、 the trait bound `Send` is not satisfiedと述べています。 Sendについては、次の節で語ります: スレッドとともに使用している型が並行な場面で使われることを意図したものであることを保証するトレイトの1つです。

残念ながら、Rc<T>はスレッド間で共有するには安全ではないのです。Rc<T>が参照カウントを管理する際、 cloneが呼び出されるたびにカウントを追加し、クローンがドロップされるたびにカウントを差し引きます。 しかし、並行基本型を使用してカウントの変更が別のスレッドに妨害されないことを確認していないのです。 これは間違ったカウントにつながる可能性があり、今度はメモリリークや、使用し終わる前に値がドロップされることにつながる可能性のある潜在的なバグです。 必要なのは、いかにもRc<T>のようだけれども、参照カウントへの変更をスレッドセーフに行うものです。

Arc<T>で原子的な参照カウント

幸いなことに、Arc<T>Rc<T>のような並行な状況で安全に使用できる型ですaatomicを表し、原子的に参照カウントする型を意味します。アトミックは、 ここでは詳しく講義しない並行性の別の基本型です: 詳細は、 std::sync::atomicの標準ライブラリドキュメンテーションを参照されたし。現時点では、 アトミックは、基本型のように動くけれども、スレッド間で共有しても安全なことだけ知っていれば良いです。

そうしたらあなたは、なぜ全ての基本型がアトミックでなく、標準ライブラリの型も標準でArc<T>を使って実装されていないのか疑問に思う可能性があります。 その理由は、スレッド安全性が、本当に必要な時だけ支払いたいパフォーマンスの犠牲とともに得られるものだからです。 シングルスレッドで値に処理を施すだけなら、アトミックが提供する保証を強制する必要がない方がコードはより速く走るのです。

例に回帰しましょう: Arc<T>Rc<T>のAPIは同じなので、use行とnewの呼び出しとcloneの呼び出しを変更して、 プログラムを修正します。リスト16-15は、ようやくコンパイルでき、動作します:

ファイル名: src/main.rs

use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

リスト16-15: Arc<T>を使用してMutex<T>をラップし、所有権を複数のスレッド間で共有できるようにする

このコードは、以下のように出力します:

Result: 10

やりました!0から10まで数え上げました。これは、あまり印象的ではないように思えるかもしれませんが、 本当にMutex<T>とスレッド安全性についていろんなことを教えてくれました。このプログラムの構造を使用して、 カウンタをインクリメントする以上の複雑な処理を行うこともできるでしょう。この手法を使えば、 計算を独立した部分に小分けにし、その部分をスレッドに分割し、それからMutex<T>を使用して、 各スレッドに最終結果を更新させることができます。

RefCell<T>/Rc<T>Mutex<T>/Arc<T>の類似性

counterは不変なのに、その内部にある値への可変参照を得ることができたことに気付いたでしょうか; つまり、Mutex<T>は、Cell系のように内部可変性を提供するわけです。 第15章でRefCell<T>を使用してRc<T>の内容を可変化できるようにしたのと同様に、 Mutex<T>を使用してArc<T>の内容を可変化しているのです。

気付いておくべき別の詳細は、Mutex<T>を使用する際にあらゆる種類のロジックエラーからは、 コンパイラは保護してくれないということです。第15章でRc<T>は、循環参照を生成してしまうリスクを伴い、 そうすると、2つのRc<T>の値がお互いを参照し合い、メモリリークを引き起こしてしまうことを思い出してください。 同様に、Mutex<T>デッドロックを生成するリスクを伴っています。これは、処理が2つのリソースをロックする必要があり、 2つのスレッドがそれぞれにロックを1つ獲得して永久にお互いを待ちあってしまうときに起こります。 デッドロックに興味があるのなら、デッドロックのあるRustプログラムを組んでみてください; それからどんな言語でもいいので、ミューテックスに対してデッドロックを緩和する方法を調べて、 Rustで是非、それを実装してみてください。Mutex<T>MutexGuardに関する標準ライブラリのAPIドキュメンテーションは、 役に立つ情報を提供してくれます。

SendSyncトレイトと、それらを独自の型で使用する方法について語って、この章を締めくくります。

関連キーワード:  Mutex, counter, Rc, let, ロック, handle, lock, unwrap, num, 共有