はじめに
この記事は、RxJavaを使ったリアクティブプログラミングに興味があるけれど「Observable
非同期ストリームを購読するだけでなく、特定のタイミングで「今の値だけ即座に取得したい」「つまりTをBlockingして取り出したい」という場面は頻出します。本記事では、blockingFirst()/blockingSingle()などの基本的な取り出し方から、エラーハンドリング、タイムアウト、そしてテストしやすい設計まで、実装例を交えて解説します。読み終えると、Observable
前提知識
- Java 11以上の基本的な文法が読める
- RxJava 3.xの依存をGradle/Mavenへ追加済みである(
implementation 'io.reactivex.rxjava3:rxjava:3.1.8'など) - 非同期・スレッドの基礎用語(Scheduler、SubscribeOn、ObserveOn)を聞いたことがある
Observableを取り出すことの意味と落とし穴
Observableは「データの流れ」そのものです。つまり、単一もしくは複数のTを時間軸を伴って発行します。
「Tを取り出す」とは、つまり「時系列の流れを止めて、現在到達している値を強制的に取得する」ことに等しく、ブロッキング操作になります。
もしメインスレッドで安易にblocking系オペレータを呼ぶと、アプリケーションがフリーズしかねません。
また、Observableがエラーを発行したりCompleteせずに終了しないケースでは、どんなにblockingFirst()を呼んでも戻ってこない、という地獄もあります。
このセクションでは、なぜ「取り出し」が難しいのかを理解したうえで、安全に取り出すための考え方を整理します。
blockingFirst()/blockingSingle()/blockingLast()の使い分けと実装例
ステップ1:最もシンプルに取り出す
JavaObservable<String> obs = Observable.just("A", "B", "C"); // 最初の1件を取得 String first = obs.blockingFirst(); System.out.println(first); // A // 条件にマッチする最初の1件を取得 String firstLen2 = obs.filter(s -> s.length() == 1) .blockingFirst(); System.out.println(firstLen2); // A
ポイント
- blockingFirst()はHotなObservableでも動作しますが、すでに発行済みの値を見逃すとNoSuchElementExceptionになります
- デフォルト値を渡したい場合はblockingFirst(defaultItem)を使いましょう
ステップ2:完了を保証して安全に取り出す
Javapublic Optional<User> fetchUserOnce(Long userId) { return userService.findById(userId) // Observable<User> .take(1) // 1件取得後にComplete .timeout(3, TimeUnit.SECONDS) // 3秒で放棄 .onErrorReturnItem(User.EMPTY) // エラー時のダミー .map(u -> u.equals(User.EMPTY) ? null : u) .blockingFirst() .map(Optional::ofNullable) .orElse(Optional.empty()); }
解説
1. take(1)で「1件受け取ったらComplete」を保証 → blockingFirstが確実に復帰
2. timeoutで永遠に待つリスクを排除
3. onErrorReturnItemで例外を握りつぶし、Optionalでヌル安全にしている
これにより、メインスレッドでも安全に呼び出せます。
ステップ3:複数スレッドで結果を回収する
Javapublic List<Result> parallelFetch(List<String> urls) { return Observable.fromIterable(urls) .flatMap(url -> downloadAsync(url) .subscribeOn(Schedulers.io())) .toList() // Single<List<Result>> .blockingGet(); // List<Result>に変換 }
toList()でSingleに変換後、blockingGet()を使うことで、全要素が揃ってからList
flatMapの内部でsubscribeOn(Schedulers.io())を指定することで、各ダウンロードが並列化され、ブロッキングしてもスループットが向上します。
ハマった点とエラー解決
事象
blockingFirst()を呼ぶと無限にスタックして戻ってこない。
原因
- ObservableがCompleteしていない
- もしくはSubscribeするSchedulerがメインスレッドと同じでデッドロック
解決策
1. 必ずtake(1)やsingleElement()で完了を保証
2. ブロッキング呼び出し側を専用のSchedulerでラップ
java
Schedulers.single().scheduleDirect(() -> {
T item = obs.blockingFirst();
// 後続処理
});
3. タイムアウトを必ず設定する
java
obs.timeout(800, TimeUnit.MILLISECONDS)
.blockingFirst();
まとめ
本記事では、RxJavaのObservable
blockingFirst()は最も手軽だが、Complete/エラーを考慮する必要があるtake(1)+timeout+onErrorReturnItemで無限ブロッキングを防げるtoList()→blockingGet()で複数件を一括取得できる- ブロッキング呼び出しは専用Schedulerで囲ってデッドロックを回避する
この知識があれば、リアクティブストリームの恩恵を受けつつ、必要に応じて同期的に値を抽出できるようになります。
次回は「Observable
参考資料
- RxJava 3.x公式Wiki – Blocking Operators
https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators - リアクティブ・Java (オライリー・ジャパン)
https://www.oreilly.co.jp/books/9784873119475/ - RxJava実践ガイド – Qiita
https://qiita.com/k_moto/items/c5f70c07b7a2d75915a7
