はじめに

この記事は、RxJavaを使ったリアクティブプログラミングに興味があるけれど「Observableの中身(T)をどうやって取り出せばいいのか分からない」というJavaプログラマー向けです。
非同期ストリームを購読するだけでなく、特定のタイミングで「今の値だけ即座に取得したい」「つまりTをBlockingして取り出したい」という場面は頻出します。本記事では、blockingFirst()/blockingSingle()などの基本的な取り出し方から、エラーハンドリング、タイムアウト、そしてテストしやすい設計まで、実装例を交えて解説します。読み終えると、Observableを安全かつ意図したタイミングでTへ変換できるようになります。

前提知識

  • Java 11以上の基本的な文法が読める
  • RxJava 3.xの依存をGradle/Mavenへ追加済みである(implementation 'io.reactivex.rxjava3:rxjava:3.1.8'など)
  • 非同期・スレッドの基礎用語(Scheduler、SubscribeOn、ObserveOn)を聞いたことがある

Observableを取り出すことの意味と落とし穴

Observableは「データの流れ」そのものです。つまり、単一もしくは複数のTを時間軸を伴って発行します。
「Tを取り出す」とは、つまり「時系列の流れを止めて、現在到達している値を強制的に取得する」ことに等しく、ブロッキング操作になります。
もしメインスレッドで安易にblocking系オペレータを呼ぶと、アプリケーションがフリーズしかねません。
また、Observableがエラーを発行したりCompleteせずに終了しないケースでは、どんなにblockingFirst()を呼んでも戻ってこない、という地獄もあります。
このセクションでは、なぜ「取り出し」が難しいのかを理解したうえで、安全に取り出すための考え方を整理します。

blockingFirst()/blockingSingle()/blockingLast()の使い分けと実装例

ステップ1:最もシンプルに取り出す

Java
Observable<String> obs = Observable.just("A", "B", "C"); // 最初の1件を取得 String first = obs.blockingFirst(); System.out.println(first); // A // 条件にマッチする最初の1件を取得 String firstLen2 = obs.filter(s -> s.length() == 1) .blockingFirst(); System.out.println(firstLen2); // A

ポイント
- blockingFirst()HotなObservableでも動作しますが、すでに発行済みの値を見逃すとNoSuchElementExceptionになります
- デフォルト値を渡したい場合はblockingFirst(defaultItem)を使いましょう

ステップ2:完了を保証して安全に取り出す

Java
public Optional<User> fetchUserOnce(Long userId) { return userService.findById(userId) // Observable<User> .take(1) // 1件取得後にComplete .timeout(3, TimeUnit.SECONDS) // 3秒で放棄 .onErrorReturnItem(User.EMPTY) // エラー時のダミー .map(u -> u.equals(User.EMPTY) ? null : u) .blockingFirst() .map(Optional::ofNullable) .orElse(Optional.empty()); }

解説
1. take(1)で「1件受け取ったらComplete」を保証 → blockingFirstが確実に復帰
2. timeoutで永遠に待つリスクを排除
3. onErrorReturnItemで例外を握りつぶし、Optionalでヌル安全にしている
これにより、メインスレッドでも安全に呼び出せます。

ステップ3:複数スレッドで結果を回収する

Java
public List<Result> parallelFetch(List<String> urls) { return Observable.fromIterable(urls) .flatMap(url -> downloadAsync(url) .subscribeOn(Schedulers.io())) .toList() // Single<List<Result>> .blockingGet(); // List<Result>に変換 }

toList()でSingleに変換後、blockingGet()を使うことで、全要素が揃ってからListとして取得できます。
flatMapの内部でsubscribeOn(Schedulers.io())を指定することで、各ダウンロードが並列化され、ブロッキングしてもスループットが向上します。

ハマった点とエラー解決

事象
blockingFirst()を呼ぶと無限にスタックして戻ってこない。

原因
- ObservableがCompleteしていない
- もしくはSubscribeするSchedulerがメインスレッドと同じでデッドロック

解決策
1. 必ずtake(1)singleElement()で完了を保証
2. ブロッキング呼び出し側を専用のSchedulerでラップ
java Schedulers.single().scheduleDirect(() -> { T item = obs.blockingFirst(); // 後続処理 }); 3. タイムアウトを必ず設定する
java obs.timeout(800, TimeUnit.MILLISECONDS) .blockingFirst();

まとめ

本記事では、RxJavaのObservableからTを安全に取り出す方法を解説しました。

  • blockingFirst()は最も手軽だが、Complete/エラーを考慮する必要がある
  • take(1)timeoutonErrorReturnItemで無限ブロッキングを防げる
  • toList()blockingGet()で複数件を一括取得できる
  • ブロッキング呼び出しは専用Schedulerで囲ってデッドロックを回避する

この知識があれば、リアクティブストリームの恩恵を受けつつ、必要に応じて同期的に値を抽出できるようになります。
次回は「ObservableをCompletionStageへ相互変換する」について深掘りします。お楽しみに!

参考資料

  • RxJava 3.x公式Wiki – Blocking Operators
    https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators
  • リアクティブ・Java (オライリー・ジャパン)
    https://www.oreilly.co.jp/books/9784873119475/
  • RxJava実践ガイド – Qiita
    https://qiita.com/k_moto/items/c5f70c07b7a2d75915a7