はじめに (対象読者・この記事でわかること)

この記事は、Javaでマルチスレッド・スケジューリング処理を行う開発者、特に ScheduledThreadPoolExecutor を利用して定期タスクを実装している方を対象としています。
読者は以下を習得できます。

  • 同一ジョブがキューに重複して登録されると何が起きるかを把握できる
  • ScheduledThreadPoolExecutor に対して、ジョブ重複を検知し例外をスローさせる設計パターンが理解できる
  • 実装サンプルを自分のプロジェクトに組み込み、意図しない二重実行を防止できる

本記事は、過去に同一タスクが連続して走りリソース競合やデータ破壊が発生した経験をもとに執筆しました。

前提知識

この記事を読み進める上で、以下の知識があるとスムーズです。

  • Java 8 以降の基本的な文法とラムダ式
  • java.util.concurrent パッケージの概念(特に ExecutorService 系列)

同一ジョブが重複する背景と問題点

ScheduledThreadPoolExecutorscheduleAtFixedRatescheduleWithFixedDelay を用いることで、一定間隔でタスクを自動実行できます。しかし、タスク生成ロジックが外部から呼び出される形になると、同一ジョブが短時間に複数回登録されてしまうケースがあります。

なぜ重複が問題になるのか

  1. リソースの浪費
    同じ処理が同時に走ることで CPU、メモリ、I/O が無駄に消費され、システム全体のスループットが低下します。

  2. データ競合
    共有リソース(データベースやファイル)に対して同時書き込みが発生し、整合性が崩れる恐れがあります。

  3. 予測不可能なエラー
    例外ハンドリングが想定外の場所で発生し、アプリケーションが不安定になることがあります。

このようなリスクを回避するために、同一ジョブがすでにスケジュール済みかどうかをチェックし、重複した場合は例外を投げて即座に通知 する仕組みを導入します。

同一ジョブ重複検知と例外化の実装手順

以下では、ScheduledThreadPoolExecutor を拡張し、ジョブの識別子(例: タスク名やハッシュ)を管理することで重複登録を防止する具体的なコード例を示します。

ステップ1:ジョブ識別子を保持するコンテナを用意

まず、現在スケジュール中のジョブを管理する ConcurrentHashMap<String, ScheduledFuture<?>> を作成します。キーはタスクを一意に識別できる文字列(例: クラス名+パラメータ)です。

Java
import java.util.concurrent.*; public class UniqueScheduledExecutor { private final ScheduledThreadPoolExecutor executor; private final ConcurrentMap<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>(); public UniqueScheduledExecutor(int poolSize) { this.executor = new ScheduledThreadPoolExecutor(poolSize); // 例外が投げられたときにタスクが自動的に除外されるように設定 this.executor.setRemoveOnCancelPolicy(true); }

ステップ2:ジョブ登録メソッドを実装

次に、ジョブを登録するメソッド scheduleUnique を実装します。重複チェックは putIfAbsent で原子的に行い、既に存在すれば IllegalStateException をスローします。

Java
public ScheduledFuture<?> scheduleUnique( String taskId, Runnable command, long initialDelay, long period, TimeUnit unit) { // すでに同じ taskId が登録されているかチェック if (scheduledTasks.putIfAbsent(taskId, dummyFuture()) != null) { throw new IllegalStateException("Task with ID '" + taskId + "' is already scheduled."); } // 実際のタスクをスケジュール ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> { try { command.run(); } finally { // タスクが正常終了したか、例外で中断した場合にマップから削除 if (future.isCancelled() || future.isDone()) { scheduledTasks.remove(taskId); } } }, initialDelay, period, unit); // マップに正しい Future を上書き scheduledTasks.put(taskId, future); return future; } // ダミーの Future(putIfAbsent で一時的に使用) private ScheduledFuture<?> dummyFuture() { return new ScheduledFuture<Object>() { public long getDelay(TimeUnit unit) { return 0; } public int compareTo(Delayed o) { return 0; } public boolean cancel(boolean mayInterruptIfRunning) { return false; } public boolean isCancelled() { return false; } public boolean isDone() { return false; } public Object get() { return null; } public Object get(long timeout, TimeUnit unit) { return null; } }; }

ポイント解説

  • putIfAbsent による原子性確保で、競合状態でも正しく重複判定が行える。
  • ダミー Future は、登録前に一時的にプレースホルダーとしてマップに入れるために使用。実際の ScheduledFuture が取得できたら上書きする。
  • setRemoveOnCancelPolicy(true) を設定すると、タスクがキャンセルされた際に自動的にキューから除去され、メモリリークを防げる。

ステップ3:タスクキャンセルとクリーンアップ

ジョブを手動でキャンセルしたい場合は、scheduledTasks から Future を取得し cancel(true) を呼び出すと同時にマップから削除します。

Java
public void cancelTask(String taskId) { ScheduledFuture<?> future = scheduledTasks.remove(taskId); if (future != null) { future.cancel(true); } }

ハマった点やエラー解決

1. IllegalStateException が常に出る

原因: putIfAbsent でダミー Future を入れた後、スケジューリングが失敗した(例外がスローされた)ために scheduledTasks に本来の Future が上書きされず、次回の呼び出しで「既に存在する」状態になる。
対策: タスク実行前に try-catch で例外を捕捉し、失敗したら必ず scheduledTasks.remove(taskId) を実行する。

2. ConcurrentModificationException が出た

原因: タスク内部で scheduledTasks.remove(taskId) を呼び出す際に、同じスレッドが future の状態を参照しつつ削除しようとした。
対策: future.isCancelled() || future.isDone() の判定を finally ブロックで行い、必ず単一スレッドから削除するようにした。

3. スレッドプールが停止しない

原因: executor.shutdown() を呼び出すタイミングが遅れ、残存タスクが永遠に走り続けた。
対策: アプリケーション終了時に executor.shutdownNow() を実行し、全タスクを強制的にキャンセルするロジックを追加。

完全なサンプルコード

Java
public class Demo { public static void main(String[] args) throws InterruptedException { UniqueScheduledExecutor usExecutor = new UniqueScheduledExecutor(2); // 正常にスケジュール usExecutor.scheduleUnique( "heartbeat", () -> System.out.println("Heartbeat " + System.currentTimeMillis()), 0, 5, TimeUnit.SECONDS); // 重複登録を試みる → 例外が発生 try { usExecutor.scheduleUnique( "heartbeat", () -> System.out.println("Duplicated heartbeat"), 0, 5, TimeUnit.SECONDS); } catch (IllegalStateException e) { System.err.println(e.getMessage()); } // 10 秒待ってからキャンセル Thread.sleep(10000); usExecutor.cancelTask("heartbeat"); usExecutor.shutdown(); // Executor のクリーンアップ } }

このサンプルは、heartbeat タスクが最初に正常に登録された後、同一 ID で再登録しようとした際に例外がスローされることを示しています。さらに、10 秒経過後にタスクをキャンセルし、リソースを解放しています。

まとめ

本記事では、ScheduledThreadPoolExecutor における同一ジョブの重複登録を検知し、例外で通知する実装パターンを解説しました。

  • 重複ジョブがもたらすリソース浪費・データ競合リスク を整理
  • ConcurrentHashMapputIfAbsent による原子的重複判定手法を提示
  • タスクキャンセル・クリーンアップ のベストプラクティスを示し、ハマりポイントと対策を共有

この手法を取り入れることで、スケジュール系アプリケーションの信頼性が向上し、予期せぬ二重実行による障害を未然に防げます。次回は、分散環境でのジョブ重複防止(例えば Redis でのロック取得)について掘り下げる予定です。

参考資料