ExecutorService, стандартный способ избежать слишком большой очереди задач

Я использую ExecutorService для удобства параллельной многопоточной программы. Возьмите следующий код:

while(xxx)
 ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
 ... 
 Future<..> ... = exService.submit(..);
 ...
}

В моем случае проблема заключается в том, что submit() не блокируется, если заняты все NUMBER_THREADS. Следствием этого является то, что очередь задач наводняется многими задачами. Следствием этого является то, что закрытие службы выполнения с помощью ExecutorService.shutdown() занимает много времени (ExecutorService.isTerminated() будет ложным в течение длительного времени). Причина в том, что очередь задач все еще достаточно полная.

В настоящее время моим обходным решением является работа с семафорами, чтобы запретить иметь много записей внутри очереди задач ExecutorService:

...
Semaphore semaphore=new Semaphore(NUMBER_THREADS);
while(xxx)
 ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
 ...
 semaphore.aquire(); 
 // internally the task calls a finish callback, which invokes semaphore.release()
 // -> now another task is added to queue
 Future<..> ... = exService.submit(..); 
 ...
}

Я уверен, что есть лучшее более инкапсулированное решение?

5 ответов

Хитрость заключается в использовании фиксированного размера очереди и:

new ThreadPoolExecutor.CallerRunsPolicy()

Я также рекомендую использовать Guava ListeningExecutorService. Вот пример очередей потребителей/производителей.

private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
 return new ThreadPoolExecutor(nThreads, nThreads,
 5000L, TimeUnit.MILLISECONDS,
 new ArrayBlockingQueue<runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
</runnable>

Что-нибудь лучше, и вы можете рассмотреть MQ, например, RabbitMQ или ActiveMQ, поскольку у них есть технология QoS.


Вы можете ThreadPoolExecutor.getQueue(). size(), чтобы узнать размер очереди ожидания. Вы можете выполнить действие, если очередь слишком длинная. Я предлагаю запустить задачу в текущем потоке, если очередь слишком длинная, чтобы замедлить работу производителя (если это подходит)


Вам лучше создать ThreadPoolExecutor (что и делает Executors.newXXX()).

В конструкторе вы можете передать BlockingQueue для Исполнителя для использования в качестве своей очереди задач. Если вы передадите размер BlockingQueue с ограничением размера (например, LinkedBlockingQueue), он должен достичь желаемого эффекта.

ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<runnable>(workQueueSize));
</runnable>


Истинная блокировка ThreadPoolExecutor была в списке желаний многих, там даже ошибка JDC открылась на нем. Я столкнулся с той же проблемой и наткнулся на это: http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html

Это реализация BlockingThreadPoolExecutor, реализованная с использованием RejectionPolicy, которая использует предложение для добавления задачи в очередь, ожидая, что очередь будет иметь место. Выглядит хорошо.


вы можете добавить еще одну ******** очередь с ограниченным размером, чтобы контролировать размер внутренней очереди в executorService, некоторые думают как семафор, но очень легко. перед исполнителем вы ставите(), и когда задается задача take(). take() должен находиться внутри кода задачи

licensed under cc by-sa 3.0 with attribution.