Элегантное внедрение индикаторов длины очереди в ExecutorServices

Почему, а почему java.util.concurrent не предоставляет индикаторы длины очереди для своих ExecutorService s? Недавно я обнаружил, что делаю что-то вроде этого:

ExecutorService queue = Executors.newSingleThreadExecutor();
AtomicInteger queueLength = new AtomicInteger();
...
public void addTaskToQueue(Runnable runnable) {
 if (queueLength.get() < MAX_QUEUE_LENGTH) {
 queueLength.incrementAndGet(); // Increment queue when submitting task.
 queue.submit(new Runnable() {
 public void run() {
 runnable.run();
 queueLength.decrementAndGet(); // Decrement queue when task done.
 }
 });
 } else {
 // Trigger error: too long queue
 }
}

Что работает нормально, но... Я думаю, что это действительно должно быть реализовано как часть ExecutorService. Он немой и ошибка, склонная переносить счетчик, отделенный от фактической очереди, длина которой указывает счетчик (напоминает мне массивы C). Но ExecutorService получаются с помощью статических методов factory, поэтому нет возможности просто расширить отличный исполнитель одиночного потока и добавить счетчик очереди. Итак, что мне делать:

  • Reinvent, уже реализованный в JDK?
  • Другое умное решение?
2 ответа

Существует более прямой способ:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newSingleThreadExecutor();
// add jobs
// ...
int size = executor.getQueue().size();

Хотя вы можете не использовать удобные методы создания Executor, а скорее создать исполнителя непосредственно, чтобы избавиться от приведения, и, таким образом, убедитесь, что исполнитель всегда будет ThreadPoolExecutor, даже если реализация Executors.newSingleThreadExecutor изменится когда-нибудь.

ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<runnable>() );
</runnable>

Это прямое копирование из Executors.newSingleThreadExecutor в JDK 1.6. LinkedBlockingQueue, который передается конструктору, на самом деле является тем самым объектом, с которого вы вернетесь из getQueue.


Пока вы можете напрямую проверить размер очереди. Другой способ справиться с очередью, которая слишком длинна, - сделать внутреннюю очередь ограниченной.

public static
ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
 return new ThreadPoolExecutor(nThreads, nThreads,
 5000L, TimeUnit.MILLISECONDS,
 new ArrayBlockingQueue<runnable>(queueSize, true));
}
</runnable>

Это приведет к RejectedExecutionExceptions (см. this), когда вы превысите предел.

Если вы хотите избежать исключения, вызывающий поток может быть захвачен для выполнения функции. См. этот вопрос SO для решения.

Ссылки

licensed under cc by-sa 3.0 with attribution.