メッセージ受け渡しを使ってスレッド間でデータを転送する

人気度を増してきている安全な並行性を保証する一つのアプローチがメッセージ受け渡しで、 スレッドやアクターがデータを含むメッセージを相互に送り合うことでやり取りします。 こちらが、Go言語のドキュメンテーションのスローガンにある考えです: 「メモリを共有することでやり取りするな; 代わりにやり取りすることでメモリを共有しろ」

メッセージ送信並行性を達成するためにRustに存在する一つの主な道具は、チャンネルで、 Rustの標準ライブラリが実装を提供しているプログラミング概念です。プログラミングのチャンネルは、 水の流れのように考えることができます。小川とか川ですね。アヒルのおもちゃやボートみたいなものを流れに置いたら、 水路の終端まで下流に流れていきます。

プログラミングにおけるチャンネルは、2分割できます: 転送機と受信機です。転送機はアヒルのおもちゃを川に置く上流になり、 受信機は、アヒルのおもちゃが行き着く下流になります。コードのある箇所が送信したいデータとともに転送機のメソッドを呼び出し、 別の部分がメッセージが到着していないか受信側を調べます。転送機と受信機のどちらかがドロップされると、 チャンネルは閉じられたと言います。

ここで、1つのスレッドが値を生成し、それをチャンネルに送信し、別のスレッドがその値を受け取り、 出力するプログラムに取り掛かります。チャンネルを使用してスレッド間に単純な値を送り、 機能の説明を行います。一旦、そのテクニックに慣れてしまえば、チャンネルを使用してチャットシステムや、 多くのスレッドが計算の一部を担い、結果をまとめる1つのスレッドにその部分を送るようなシステムを実装できるでしょう。

まず、リスト16-6において、チャンネルを生成するものの、何もしません。 チャンネル越しにどんな型の値を送りたいのかコンパイラがわからないため、 これはまだコンパイルできないことに注意してください。

ファイル名: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
    tx.send(()).unwrap();
}

リスト16-6: チャンネルを生成し、2つの部品をtxrxに代入する

mpsc::channel関数で新しいチャンネルを生成しています; mpscmultiple producer, single consumerを表しています。 簡潔に言えば、Rustの標準ライブラリがチャンネルを実装している方法は、1つのチャンネルが値を生成する複数の送信側と、 その値を消費するたった1つの受信側を持つことができるということを意味します。 複数の小川が互いに合わさって1つの大きな川になるところを想像してください: どの小川を通っても、送られたものは最終的に1つの川に行き着きます。今は、1つの生成器から始めますが、 この例が動作するようになったら、複数の生成器を追加します。

mpsc::channel関数はタプルを返し、1つ目の要素は、送信側、2つ目の要素は受信側になります。 txrxという略称は、多くの分野で伝統的に転送機受信機にそれぞれ使用されているので、 変数をそのように名付けて、各終端を示します。タプルを分配するパターンを伴うlet文を使用しています; let文でパターンを使用することと分配については、第18章で議論しましょう。このようにlet文を使うと、 mpsc::channelで返ってくるタプルの部品を抽出するのが便利になります。

立ち上げたスレッドがメインスレッドとやり取りするように、転送機を立ち上げたスレッドに移動し、 1文字列を送らせましょう。リスト16-7のようにですね。川の上流にアヒルのおもちゃを置いたり、 チャットのメッセージをあるスレッドから別のスレッドに送るみたいですね。

ファイル名: src/main.rs

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

リスト16-7: txを立ち上げたスレッドに移動し、「やあ」を送る

今回も、thread::spawnを使用して新しいスレッドを生成し、それからmoveを使用して、 立ち上げたスレッドがtxを所有するようにクロージャにtxをムーブしています。立ち上げたスレッドは、 メッセージをチャンネルを通して送信できるように、チャンネルの送信側を所有する必要があります。

転送側には、送信したい値を取るsendメソッドがあります。sendメソッドはResult<T, E>型を返すので、 既に受信側がドロップされ、値を送信する場所がなければ、送信処理はエラーを返します。 この例では、エラーの場合には、パニックするようにunwrapを呼び出しています。ですが、実際のアプリケーションでは、 ちゃんと扱うでしょう: 第9章に戻ってちゃんとしたエラー処理の方法を再確認してください。

リスト16-8において、メインスレッドのチャンネルの受信側から値を得ます。 アヒルのおもちゃを川の終端で水から回収したり、チャットメッセージを取得するみたいですね。

ファイル名: src/main.rs

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    // 値は{}です
    println!("Got: {}", received);
}

リスト16-8: 「やあ」の値をメインスレッドで受け取り、出力する

チャンネルの受信側には有用なメソッドが2つあります: recvtry_recvです。 receiveの省略形であるrecvを使っています。これは、メインスレッドの実行をブロックし、 値がチャンネルを流れてくるまで待機します。一旦値が送信されたら、recvはそれをResult<T, E>に含んで返します。 チャンネルの送信側が閉じたら、recvはエラーを返し、もう値は来ないと通知します。

try_recvメソッドはブロックせず、代わりに即座にResult<T, E>を返します: メッセージがあったら、それを含むOk値、今回は何もメッセージがなければ、Err値です。 メッセージを待つ間にこのスレッドにすることが他にあれば、try_recvは有用です: try_recvを頻繁に呼び出し、メッセージがあったら処理し、それ以外の場合は、 再度チェックするまでちょっとの間、他の作業をするループを書くことができるでしょう。

この例では、簡潔性のためにrecvを使用しました; メッセージを待つこと以外にメインスレッドがすべき作業はないので、 メインスレッドをブロックするのは適切です。

リスト16-8のコードを実行したら、メインスレッドから値が出力されるところを目撃するでしょう:

Got: hi

完璧です!

チャンネルと所有権の転送

安全な並行コードを書く手助けをしてくれるので、所有権規則は、メッセージ送信で重要な役割を担っています。 並行プログラミングでエラーを回避することは、Rustプログラム全体で所有権について考える利点です。 実験をしてチャンネルと所有権がともに動いて、どう問題を回避するかをお見せしましょう: val値を立ち上げたスレッドで、チャンネルに送ったに使用を試みます。 リスト16-9のコードのコンパイルを試みて、このコードが許容されない理由を確認してください:

ファイル名: src/main.rs

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        // valは{}
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

リスト16-9: チャンネルに送信後にvalの使用を試みる

ここで、tx.send経由でチャンネルに送信後にvalを出力しようとしています。これを許可するのは、悪い考えです: 一旦、値が他のスレッドに送信されたら、再度値を使用しようとする前にそのスレッドが変更したりドロップできてしまいます。 可能性として、その別のスレッドの変更により、矛盾していたり存在しないデータのせいでエラーが発生したり、 予期しない結果になるでしょう。ですが、リスト16-9のコードのコンパイルを試みると、Rustはエラーを返します:

error[E0382]: use of moved value: `val`
  --> src/main.rs:10:31
   |
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value used here after move
   |
   = note: move occurs because `val` has type `std::string::String`, which does
not implement the `Copy` trait

並行性のミスがコンパイルエラーを招きました。send関数は引数の所有権を奪い、 値がムーブされると、受信側が所有権を得るのです。これにより、送信後に誤って再度値を使用するのを防いでくれます; 所有権システムが、万事問題ないことを確認してくれます。

複数の値を送信し、受信側が待機するのを確かめる

リスト16-8のコードはコンパイルでき、動きましたが、2つの個別のスレッドがお互いにチャンネル越しに会話していることは、 明瞭に示されませんでした。リスト16-10において、リスト16-8のコードが並行に動いていることを証明する変更を行いました: 立ち上げたスレッドは、複数のメッセージを送信し、各メッセージ間で、1秒待機します。

ファイル名: src/main.rs

use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        // スレッドからやあ(hi from the thread)
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

リスト16-10: 複数のメッセージを送信し、メッセージ間で停止する

今回は、メインスレッドに送信したい文字列のベクタを立ち上げたスレッドが持っています。 それらを繰り返し、各々個別に送信し、Durationの値1秒とともにthread::sleep関数を呼び出すことで、 メッセージ間で停止します。

メインスレッドにおいて、最早recv関数を明示的に呼んではいません: 代わりに、 rxをイテレータとして扱っています。受信した値それぞれを出力します。 チャンネルが閉じられると、繰り返しも終わります。

リスト16-10のコードを走らせると、各行の間に1秒の待機をしつつ、以下のような出力を目の当たりにするはずです:

Got: hi
Got: from
Got: the
Got: thread

メインスレッドのforループには停止したり、遅れせたりするコードは何もないので、 メインスレッドが立ち上げたスレッドから値を受け取るのを待機していることがわかります。

転送機をクローンして複数の生成器を作成する

mpscは、mutiple producer, single consumerの頭字語であると前述しました。 mpscを使い、リスト16-10のコードを拡張して、全ての値を同じ受信機に送信する複数のスレッドを生成しましょう。 チャンネルの転送の片割れをクローンすることでそうすることができます。リスト16-11のようにですね:

ファイル名: src/main.rs

use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
// --snip--

let (tx, rx) = mpsc::channel();

let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
    ];

    for val in vals {
        tx1.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

thread::spawn(move || {
    // 君のためにもっとメッセージを(more messages for you)
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

for received in rx {
    println!("Got: {}", received);
}

// --snip--
}

リスト16-11: 複数の生成器から複数のメッセージを送信する

今回、最初のスレッドを立ち上げる前に、チャンネルの送信側に対してcloneを呼び出しています。 これにより、最初に立ち上げたスレッドに渡せる新しい送信ハンドルが得られます。 元のチャンネルの送信側は、2番目に立ち上げたスレッドに渡します。これにより2つスレッドが得られ、 それぞれチャンネルの受信側に異なるメッセージを送信します。

コードを実行すると、出力は以下のようなものになるはずです:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

別の順番で値が出る可能性もあります; システム次第です。並行性が面白いと同時に難しい部分でもあります。 異なるスレッドで色々な値を与えてthread::sleepで実験をしたら、走らせるたびにより非決定的になり、 毎回異なる出力をするでしょう。

チャンネルの動作方法を見たので、他の並行性に目を向けましょう。

関連キーワード:  チャンネル, メッセージ, 送信, from, val, thread, リスト, mpsc, let, 転送