はじめに (対象読者・この記事でわかること)
この記事は、Javaでマルチスレッド・スケジューリング処理を行う開発者、特に ScheduledThreadPoolExecutor を利用して定期タスクを実装している方を対象としています。
読者は以下を習得できます。
- 同一ジョブがキューに重複して登録されると何が起きるかを把握できる
ScheduledThreadPoolExecutorに対して、ジョブ重複を検知し例外をスローさせる設計パターンが理解できる- 実装サンプルを自分のプロジェクトに組み込み、意図しない二重実行を防止できる
本記事は、過去に同一タスクが連続して走りリソース競合やデータ破壊が発生した経験をもとに執筆しました。
前提知識
この記事を読み進める上で、以下の知識があるとスムーズです。
- Java 8 以降の基本的な文法とラムダ式
java.util.concurrentパッケージの概念(特にExecutorService系列)
同一ジョブが重複する背景と問題点
ScheduledThreadPoolExecutor は scheduleAtFixedRate や scheduleWithFixedDelay を用いることで、一定間隔でタスクを自動実行できます。しかし、タスク生成ロジックが外部から呼び出される形になると、同一ジョブが短時間に複数回登録されてしまうケースがあります。
なぜ重複が問題になるのか
-
リソースの浪費
同じ処理が同時に走ることで CPU、メモリ、I/O が無駄に消費され、システム全体のスループットが低下します。 -
データ競合
共有リソース(データベースやファイル)に対して同時書き込みが発生し、整合性が崩れる恐れがあります。 -
予測不可能なエラー
例外ハンドリングが想定外の場所で発生し、アプリケーションが不安定になることがあります。
このようなリスクを回避するために、同一ジョブがすでにスケジュール済みかどうかをチェックし、重複した場合は例外を投げて即座に通知 する仕組みを導入します。
同一ジョブ重複検知と例外化の実装手順
以下では、ScheduledThreadPoolExecutor を拡張し、ジョブの識別子(例: タスク名やハッシュ)を管理することで重複登録を防止する具体的なコード例を示します。
ステップ1:ジョブ識別子を保持するコンテナを用意
まず、現在スケジュール中のジョブを管理する ConcurrentHashMap<String, ScheduledFuture<?>> を作成します。キーはタスクを一意に識別できる文字列(例: クラス名+パラメータ)です。
Javaimport java.util.concurrent.*; public class UniqueScheduledExecutor { private final ScheduledThreadPoolExecutor executor; private final ConcurrentMap<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>(); public UniqueScheduledExecutor(int poolSize) { this.executor = new ScheduledThreadPoolExecutor(poolSize); // 例外が投げられたときにタスクが自動的に除外されるように設定 this.executor.setRemoveOnCancelPolicy(true); }
ステップ2:ジョブ登録メソッドを実装
次に、ジョブを登録するメソッド scheduleUnique を実装します。重複チェックは putIfAbsent で原子的に行い、既に存在すれば IllegalStateException をスローします。
Javapublic ScheduledFuture<?> scheduleUnique( String taskId, Runnable command, long initialDelay, long period, TimeUnit unit) { // すでに同じ taskId が登録されているかチェック if (scheduledTasks.putIfAbsent(taskId, dummyFuture()) != null) { throw new IllegalStateException("Task with ID '" + taskId + "' is already scheduled."); } // 実際のタスクをスケジュール ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> { try { command.run(); } finally { // タスクが正常終了したか、例外で中断した場合にマップから削除 if (future.isCancelled() || future.isDone()) { scheduledTasks.remove(taskId); } } }, initialDelay, period, unit); // マップに正しい Future を上書き scheduledTasks.put(taskId, future); return future; } // ダミーの Future(putIfAbsent で一時的に使用) private ScheduledFuture<?> dummyFuture() { return new ScheduledFuture<Object>() { public long getDelay(TimeUnit unit) { return 0; } public int compareTo(Delayed o) { return 0; } public boolean cancel(boolean mayInterruptIfRunning) { return false; } public boolean isCancelled() { return false; } public boolean isDone() { return false; } public Object get() { return null; } public Object get(long timeout, TimeUnit unit) { return null; } }; }
ポイント解説
putIfAbsentによる原子性確保で、競合状態でも正しく重複判定が行える。- ダミー
Futureは、登録前に一時的にプレースホルダーとしてマップに入れるために使用。実際のScheduledFutureが取得できたら上書きする。 setRemoveOnCancelPolicy(true)を設定すると、タスクがキャンセルされた際に自動的にキューから除去され、メモリリークを防げる。
ステップ3:タスクキャンセルとクリーンアップ
ジョブを手動でキャンセルしたい場合は、scheduledTasks から Future を取得し cancel(true) を呼び出すと同時にマップから削除します。
Javapublic void cancelTask(String taskId) { ScheduledFuture<?> future = scheduledTasks.remove(taskId); if (future != null) { future.cancel(true); } }
ハマった点やエラー解決
1. IllegalStateException が常に出る
原因: putIfAbsent でダミー Future を入れた後、スケジューリングが失敗した(例外がスローされた)ために scheduledTasks に本来の Future が上書きされず、次回の呼び出しで「既に存在する」状態になる。
対策: タスク実行前に try-catch で例外を捕捉し、失敗したら必ず scheduledTasks.remove(taskId) を実行する。
2. ConcurrentModificationException が出た
原因: タスク内部で scheduledTasks.remove(taskId) を呼び出す際に、同じスレッドが future の状態を参照しつつ削除しようとした。
対策: future.isCancelled() || future.isDone() の判定を finally ブロックで行い、必ず単一スレッドから削除するようにした。
3. スレッドプールが停止しない
原因: executor.shutdown() を呼び出すタイミングが遅れ、残存タスクが永遠に走り続けた。
対策: アプリケーション終了時に executor.shutdownNow() を実行し、全タスクを強制的にキャンセルするロジックを追加。
完全なサンプルコード
Javapublic class Demo { public static void main(String[] args) throws InterruptedException { UniqueScheduledExecutor usExecutor = new UniqueScheduledExecutor(2); // 正常にスケジュール usExecutor.scheduleUnique( "heartbeat", () -> System.out.println("Heartbeat " + System.currentTimeMillis()), 0, 5, TimeUnit.SECONDS); // 重複登録を試みる → 例外が発生 try { usExecutor.scheduleUnique( "heartbeat", () -> System.out.println("Duplicated heartbeat"), 0, 5, TimeUnit.SECONDS); } catch (IllegalStateException e) { System.err.println(e.getMessage()); } // 10 秒待ってからキャンセル Thread.sleep(10000); usExecutor.cancelTask("heartbeat"); usExecutor.shutdown(); // Executor のクリーンアップ } }
このサンプルは、heartbeat タスクが最初に正常に登録された後、同一 ID で再登録しようとした際に例外がスローされることを示しています。さらに、10 秒経過後にタスクをキャンセルし、リソースを解放しています。
まとめ
本記事では、ScheduledThreadPoolExecutor における同一ジョブの重複登録を検知し、例外で通知する実装パターンを解説しました。
- 重複ジョブがもたらすリソース浪費・データ競合リスク を整理
ConcurrentHashMapとputIfAbsentによる原子的重複判定手法を提示- タスクキャンセル・クリーンアップ のベストプラクティスを示し、ハマりポイントと対策を共有
この手法を取り入れることで、スケジュール系アプリケーションの信頼性が向上し、予期せぬ二重実行による障害を未然に防げます。次回は、分散環境でのジョブ重複防止(例えば Redis でのロック取得)について掘り下げる予定です。
参考資料
- Java Docs – ScheduledThreadPoolExecutor
- Effective Java – 第2章 スレッド安全なクラスの設計
- ConcurrentHashMap の内部構造と使用例
