Почему localInit Func вызывается несколько раз в потоке в Parallel.ForEach

Я писал код для обработки большого количества данных, и я подумал, что было бы полезно, чтобы Parallel.ForEach создавал файл для каждого потока, который он создает, поэтому вывод не нужно синхронизировать (по крайней мере, мне).

Это выглядит примерно так:

Parallel.ForEach(vals,
 new ParallelOptions { MaxDegreeOfParallelism = 8 },
 ()=>GetWriter(), // returns a new BinaryWriter backed by a file with a guid name
 (item, state, writer)=>
 {
 if(something)
 {
 state.Break();
 return writer;
 }
 List<result> results = new List<result>();
 foreach(var subItem in item.SubItems)
 results.Add(ProcessItem(subItem));
 if(results.Count > 0)
 {
 foreach(var result in results)
 result.Write(writer);
 }
 return writer;
 },
 (writer)=>writer.Dispose());
</result></result>

Я ожидал, что будет создано до 8 файлов и будет сохраняться в течение всего времени выполнения. Затем каждый из них будет Disposed, когда закончится весь вызов ForEach. Что действительно происходит, так это то, что localInit, кажется, вызывается один раз для каждого элемента, поэтому я получаю сотни файлов. Кроме того, авторы обрабатываются в конце каждого обрабатываемого элемента.

Это показывает то же самое:

var vals = Enumerable.Range(0, 10000000).ToArray();
 long sum = 0;
 Parallel.ForEach(vals,
 new ParallelOptions { MaxDegreeOfParallelism = 8 },
 () => { Console.WriteLine("init " + Thread.CurrentThread.ManagedThreadId); return 0L; },
 (i, state, common) =>
 {
 Thread.Sleep(10);
 return common + i;
 },
 (common) => Interlocked.Add(ref sum, common));

Я вижу:

init 10
init 14
init 11
init 13
init 12
init 14
init 11
init 12
init 13
init 11
... // hundreds of lines over < 30 seconds
init 14
init 11
init 18
init 17
init 10
init 11
init 14
init 11
init 14
init 11
init 18

Примечание: если я не использую вызов Thread.Sleep, он иногда кажется "правильно". localInit только вызывается один раз для каждого из 4 потоков, которые он решает использовать на моем компьютере. Не каждый раз, однако.

Является ли это желаемым поведением функции? Что происходит за кулисами, которые заставляют это делать это? И наконец, какой хороший способ получить желаемую функциональность, ThreadLocal?

Это, кстати, на .NET 4.5.

4 ответа

Parallel.ForEach не работает, как вы думаете. Важно отметить, что метод построен поверх классов Task и что отношение между Task и Thread не равно 1:1. Вы можете иметь, например, 10 задач, которые выполняются на 2 управляемых потоках.

Попробуйте использовать эту строку в своем тесте метода вместо текущего:

Console.WriteLine("ThreadId {0} -- TaskId {1} ",
 Thread.CurrentThread.ManagedThreadId, Task.CurrentId);

Вы должны увидеть, что ThreadId будет повторно использоваться во многих разных задачах, показывая их уникальные идентификаторы. Вы увидите это больше, если вы ушли или увеличили свой призыв до Thread.Sleep.

Основная идея того, как работает метод Parallel.ForEach, заключается в том, что он требует, чтобы ваш счетчик создавал серию задач, которые будут запускать разделы процесса перечисления, так как это делается, многое зависит от ввода. Существует также специальная логика, которая проверяет случай, когда задача превышает определенное количество миллисекунд без завершения. Если этот случай верен, тогда может быть создана новая задача, помогающая облегчить работу.

Если вы посмотрели документацию для функции localinit в Parallel.ForEach, вы заметите, что она говорит, что она returns the initial state of the local data for each _task_, а не каждый поток.

Вы можете спросить, почему возникает более 8 заданий. Этот ответ похож на последний, найденный в документации для ParallelOptions.MaxDegreeOfParallelism.

Изменение MaxDegreeOfParallelism по умолчанию ограничивает количество параллельных задач.

Это ограничение относится только к числу одновременных задач, а не к жесткому пределу количества задач, которые будут созданы за все время обработки. И, как я уже упоминал выше, бывают случаи, когда возникает отдельная задача, в результате ваша функция localinit вызывается несколько раз и записывает сотни файлов на диск.

Запись на диск - это операция с небольшой задержкой, особенно если вы используете синхронный ввод-вывод. Когда происходит операция с диском, он блокирует весь поток; то же самое происходит с Thread.Sleep. Если a Task делает это, он блокирует поток, в котором он выполняется в настоящий момент, и никакие другие задачи не могут выполняться на нем. Обычно в этих случаях планировщик будет создавать новый Task, чтобы помочь получить слабину.

И наконец, какой хороший способ получить желаемую функциональность, ThreadLocal?

Суть в том, что локаторы потоков не имеют смысла с Parallel.ForEach, потому что вы не имеете дело с потоками; вы имеете дело с задачами. Местный поток может быть разделен между задачами, потому что многие задачи могут использовать один и тот же поток одновременно. Кроме того, локальный поток задач может изменять среднее выполнение, поскольку планировщик может упредить его от запуска, а затем продолжить его выполнение в другом потоке, который будет иметь другой поток.

Я не уверен, что это лучший способ, но вы можете положиться на функцию localinit, чтобы передать любой ресурс, который вам нужен, только позволяя использовать ресурс в одном потоке за раз. Вы можете использовать localfinally, чтобы пометить его как больше не используемого и, следовательно, доступного для получения другой задачи. Это то, для чего были разработаны эти методы; каждый метод вызывается только один раз для каждой заданной задачи (см. раздел замечаний Parallel.ForEach документации MSDN).

Вы также можете разделить работу самостоятельно и создать свой собственный набор потоков и выполнить свою работу. Однако, по моему мнению, это меньше, поскольку класс Parallel уже делает этот тяжелый подъем для вас.


То, что вы видите, - это реализация, пытающаяся как можно быстрее выполнить вашу работу.

Чтобы сделать это, он пытается использовать различное количество задач для максимизации пропускной способности. Он захватывает определенное количество потоков из пула потоков и немного выполняет вашу работу. Затем он пытается добавить и удалить потоки, чтобы узнать, что произойдет. Он продолжает делать это, пока не будет выполнена вся ваша работа.

Алгоритм довольно глупый, поскольку он не знает, использует ли ваша работа много CPU или много ввода-вывода, или даже если есть много синхронизации и потоки блокируют друг друга. Все, что он может сделать, это добавлять и удалять потоки и измерять, насколько быстро завершается каждая единица работы.

Это означает, что он постоянно вызывает ваши функции localInit и localFinally, поскольку он вводит и удаляет потоки - вот что вы нашли.

К сожалению, нет простого способа управления этим алгоритмом. Parallel.ForEach - это высокоуровневая конструкция, которая намеренно скрывает большую часть кода управления потоком.

Использование ThreadLocal может немного помочь, но оно зависит от того, что пул потоков повторно использует те же потоки, когда Parallel.ForEach запрашивает новые. Это не гарантировано - на самом деле маловероятно, что пул потоков будет использовать ровно 8 потоков для всего вызова. Это означает, что вы снова будете создавать больше файлов, чем необходимо.

Одна вещь, гарантирующая , заключается в том, что Parallel.ForEach никогда не будет использовать больше, чем MaxDegreeOfParallelism потоков в любой момент времени.

Вы можете использовать это в своих интересах, создав "пул" фиксированного размера файлов, которые могут быть повторно использованы любыми потоками, выполняемыми в определенное время. Вы знаете, что только потоки MaxDegreeOfParallelism могут запускаться сразу, поэтому вы можете создать это количество файлов перед вызовом ForEach. Затем возьмите один в своем localInit и отпустите его в localFinally.

Конечно, вам придется писать этот пул самостоятельно, и он должен быть потокобезопасным, поскольку он будет вызываться одновременно. Однако простая стратегия блокировки должна быть достаточно хорошей, поскольку потоки не вводятся и удаляются очень быстро по сравнению со стоимостью блокировки.


В соответствии с MSDN метод localInit вызывается один раз для каждой задачи, а не для каждого потока

Делегатор localInit вызывается один раз для каждой задачи, которая участвует в выполнении цикла и возвращает начальное локальное состояние для каждой из этих задач.


localInit вызывается при создании потока. если тело занимает так много времени, оно должно создать другой поток и приостановить текущий поток, и если он создает другой поток, он вызывает localInit

также, когда вызываемый Parallel.ForEach создает потоки столько же, сколько значение MaxDegreeOfParallelism, например:

var k = Enumerable.Range(0, 1);
Parallel.ForEach(k,new ParallelOptions(){MaxDegreeOfParallelism = 4}.....

он создает 4 потока, когда он сначала называется

licensed under cc by-sa 3.0 with attribution.