Невозможно многопоточность масштабируемого метода

ОБНОВЛЕНИЕ. Чтобы помочь прояснить, что я прошу, я опубликовал небольшой код Java, который использует эту идею.

Некоторое время назад я спросил question о том, как получить алгоритм для разбивки набора чисел, идея заключалась в том, чтобы дать ему список чисел (1,2,3,4,5) и итоговый (10), и он будет вычислять все кратные каждого числа, которые будут суммироваться с суммой ('1*10' или '1*1,1*2,1*3,1*4' или '2*5' и т.д.)., Это было первое упражнение по программированию, которое я когда-либо делал, поэтому мне потребовалось некоторое время, и я получил его, но теперь я хочу попытаться выяснить, смогу ли я его масштабировать. Человек в оригинальном вопросе сказал, что он масштабируемый, но я немного смущен тому, как это сделать. Рекурсивная часть - это область, в которой я застрял при масштабировании части, которая объединяет все результаты (таблица, на которую она ссылается, не масштабируема, но применяет кэширование, я могу сделать это быстро)

У меня есть следующий алгоритм (псевдокод):

//generates table
for i = 1 to k
 for z = 0 to sum:
 for c = 1 to z / x_i:
 if T[z - c * x_i][i - 1] is true:
 set T[z][i] to true
//uses table to bring all the parts together
function RecursivelyListAllThatWork(k, sum) // Using last k variables, make sum
 /* Base case: If we've assigned all the variables correctly, list this
 * solution.
 */
 if k == 0:
 print what we have so far
 return
 /* Recursive step: Try all coefficients, but only if they work. */
 for c = 0 to sum / x_k:
 if T[sum - c * x_k][k - 1] is true:
 mark the coefficient of x_k to be c
 call RecursivelyListAllThatWork(k - 1, sum - c * x_k)
 unmark the coefficient of x_k

Я действительно в недоумении, как поточно/многопроцессорная функция RecursivelyListAllThatWork. Я знаю, если я отправлю ему меньший K (который является int из общего количества элементов в списке), он обработает это подмножество, но я не знаю, как делать те, которые объединяют результаты по подмножеству. Например, если список [1,2,3,4,5,6,7,8,9,10], и я отправляю его K = 3, тогда обрабатывается только 1,2,3, но это нормально, но что делать, если мне нужны результаты, которые включают 1 и 10? Я попытался изменить таблицу (переменную T), так что только подмножество, которое я хочу, есть, но все еще не работает, потому что, как и решение выше, оно выполняет подмножество, но не может обрабатывать ответы, требующие более широкого диапазона.

Мне не нужен код, если кто-то может объяснить, как концептуально разорвать этот рекурсивный шаг, чтобы можно было использовать другие ядра/машины.

UPDATE: я все еще не могу понять, как превратить RecursivelyListAllThatWork в runnable (я знаю, как это сделать, но я не понимаю, как изменить алгоритм RecursivelyListAllThatWork, чтобы он мог запускаться параллельно. Остальные части только здесь, чтобы сделать пример работы, мне нужно только реализовать runnable в методе RecursivelyListAllThatWork). Вот код Java:

import java.awt.Point;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class main
{
 public static void main(String[] args)
 {
 System.out.println("starting..");
 int target_sum = 100;
 int[] data = new int[] { 10, 5, 50, 20, 25, 40 };
 List T = tableGeneator(target_sum, data);
 List<integer> coeff = create_coeff(data.length);
 RecursivelyListAllThatWork(data.length, target_sum, T, coeff, data);
 }
 private static List<integer> create_coeff(int i) {
 // TODO Auto-generated method stub
 Integer[] integers = new Integer[i];
 Arrays.fill(integers, 0);
 List<integer> integerList = Arrays.asList(integers);
 return integerList;
 }
 private static void RecursivelyListAllThatWork(int k, int sum, List T, List<integer> coeff, int[] data) {
 // TODO Auto-generated method stub
 if (k == 0) {
 //# print what we have so far
 for (int i = 0; i < coeff.size(); i++) {
 System.out.println(data[i] + " = " + coeff.get(i));
 }
 System.out.println("*******************");
 return;
 }
 Integer x_k = data[k-1];
 // Recursive step: Try all coefficients, but only if they work. 
 for (int c = 0; c <= sum/x_k; c++) { //the c variable caps the percent
 if (T.contains(new Point((sum - c * x_k), (k-1))))
 {
 // mark the coefficient of x_k to be c
 coeff.set((k-1), c);
 RecursivelyListAllThatWork((k - 1), (sum - c * x_k), T, coeff, data);
 // unmark the coefficient of x_k
 coeff.set((k-1), 0);
 }
 }
 }
 public static List tableGeneator(int target_sum, int[] data) {
 List T = new ArrayList();
 T.add(new Point(0, 0));
 float max_percent = 1;
 int R = (int) (target_sum * max_percent * data.length);
 for (int i = 0; i < data.length; i++)
 {
 for (int s = -R; s < R + 1; s++)
 {
 int max_value = (int) Math.abs((target_sum * max_percent)
 / data[i]);
 for (int c = 0; c < max_value + 1; c++)
 {
 if (T.contains(new Point(s - c * data[i], i)))
 {
 Point p = new Point(s, i + 1);
 if (!T.contains(p))
 {
 T.add(p);
 }
 }
 }
 }
 }
 return T;
 }
} 
</integer></integer></integer></integer>
3 ответа

Общий ответ на многопоточность заключается в том, чтобы исключить рекурсивную реализацию благодаря стеку (LIFO или FIFO). При реализации такого алгоритма число потоков является фиксированным параметром для алгоритма (например, количество ядер).

Чтобы реализовать его, стек вызовов языков заменяется стеком, хранящим последний контекст в качестве контрольной точки, когда проверенное состояние завершает рекурсию. В вашем случае это либо значения k=0, либо coeff соответствуют целевой сумме.

После дезактивации первая реализация предназначена для запуска нескольких потоков для потребления стека, но доступ к стеку становится точкой соперничества, потому что может потребоваться синхронизация.

Лучшим масштабируемым решением является выделение стека для каждого потока, но требуется начальное создание контекстов в стеке.

Я предлагаю смешанный подход, когда первый поток работает рекурсивно для ограниченного числа k в качестве максимальной глубины рекурсии: 2 для небольшого набора данных в примере, но я рекомендую 3, если больше. Затем эта первая часть делегирует сформированные промежуточные контексты в пул потоков, которые будут обрабатывать оставшиеся k с нерекурсивной реализацией. Этот код не основан на сложном алгоритме, который вы используете, а на довольно "базовой" реализации:

import java.util.Arrays;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MixedParallel
{
 // pre-requisite: sorted values !!
 private static final int[] data = new int[] { 5, 10, 20, 25, 40, 50 };
 // Context to store intermediate computation or a solution
 static class Context {
 int k;
 int sum;
 int[] coeff;
 Context(int k, int sum, int[] coeff) {
 this.k = k;
 this.sum = sum;
 this.coeff = coeff;
 }
 }
 // Thread pool for parallel execution
 private static ExecutorService executor;
 // Queue to collect solutions
 private static Queue<context> solutions;
 static {
 final int numberOfThreads = 2;
 executor =
 new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 1000, TimeUnit.SECONDS,
 new LinkedBlockingDeque<runnable>());
 // concurrent because of multi-threaded insertions
 solutions = new ConcurrentLinkedQueue<context>();
 }
 public static void main(String[] args)
 {
 int target_sum = 100;
 // result vector, init to 0
 int[] coeff = new int[data.length];
 Arrays.fill(coeff, 0);
 mixedPartialSum(data.length - 1, target_sum, coeff);
 executor.shutdown();
 // System.out.println("Over. Dumping results");
 while(!solutions.isEmpty()) {
 Context s = solutions.poll();
 printResult(s.coeff);
 }
 }
 private static void printResult(int[] coeff) {
 StringBuffer sb = new StringBuffer();
 for (int i = coeff.length - 1; i >= 0; i--) {
 if (coeff[i] > 0) {
 sb.append(data[i]).append(" * ").append(coeff[i]).append(" ");
 }
 }
 System.out.println(sb.append("from ").append(Thread.currentThread()));
 }
 private static void mixedPartialSum(int k, int sum, int[] coeff) {
 int x_k = data[k];
 for (int c = sum / x_k; c >= 0; c--) {
 coeff[k] = c;
 int[] newcoeff = Arrays.copyOf(coeff, coeff.length);
 if (c * x_k == sum) {
 //printResult(newcoeff);
 solutions.add(new Context(0, 0, newcoeff));
 continue;
 } else if (k > 0) {
 if (data.length - k < 2) {
 mixedPartialSum(k - 1, sum - c * x_k, newcoeff);
 // for loop on "c" goes on with previous coeff content
 } else {
 // no longer recursive. delegate to thread pool
 executor.submit(new ComputePartialSum(new Context(k - 1, sum - c * x_k, newcoeff)));
 }
 }
 }
 }
 static class ComputePartialSum implements Callable<void> {
 // queue with contexts to process
 private Queue<context> contexts;
 ComputePartialSum(Context request) {
 contexts = new ArrayDeque<context>();
 contexts.add(request);
 }
 public Void call() {
 while(!contexts.isEmpty()) {
 Context current = contexts.poll();
 int x_k = data[current.k];
 for (int c = current.sum / x_k; c >= 0; c--) {
 current.coeff[current.k] = c;
 int[] newcoeff = Arrays.copyOf(current.coeff, current.coeff.length);
 if (c * x_k == current.sum) {
 //printResult(newcoeff);
 solutions.add(new Context(0, 0, newcoeff));
 continue;
 } else if (current.k > 0) {
 contexts.add(new Context(current.k - 1, current.sum - c * x_k, newcoeff));
 }
 }
 }
 return null;
 }
 }
}
</context></context></void></context></runnable></context>

Вы можете проверить, какой поток нашел выводный результат и проверить, все ли они включены: основной поток в рекурсивном режиме и два потока из пула в режиме стека контекста.

Теперь эта реализация масштабируется, когда data.length высока:

  • максимальная глубина рекурсии ограничивается основной нитью на низком уровне
  • каждый поток из пула работает со своим собственным стеком контекста без конкуренции с другими
  • параметры для настройки теперь numberOfThreads и maxRecursionDepth

Итак, да, ваш алгоритм может быть распараллелен. Вот полностью рекурсивная реализация, основанная на вашем коде:

import java.awt.Point;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class OriginalParallel
{
 static final int numberOfThreads = 2;
 static final int maxRecursionDepth = 3;
 public static void main(String[] args)
 {
 int target_sum = 100;
 int[] data = new int[] { 50, 40, 25, 20, 10, 5 };
 List T = tableGeneator(target_sum, data);
 int[] coeff = new int[data.length];
 Arrays.fill(coeff, 0);
 RecursivelyListAllThatWork(data.length, target_sum, T, coeff, data);
 executor.shutdown();
 }
 private static void printResult(int[] coeff, int[] data) {
 StringBuffer sb = new StringBuffer();
 for (int i = coeff.length - 1; i >= 0; i--) {
 if (coeff[i] > 0) {
 sb.append(data[i]).append(" * ").append(coeff[i]).append(" ");
 }
 }
 System.out.println(sb.append("from ").append(Thread.currentThread()));
 }
 // Thread pool for parallel execution
 private static ExecutorService executor;
 static {
 executor =
 new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 1000, TimeUnit.SECONDS,
 new LinkedBlockingDeque<runnable>());
 }
 private static void RecursivelyListAllThatWork(int k, int sum, List T, int[] coeff, int[] data) {
 if (k == 0) {
 printResult(coeff, data);
 return;
 }
 Integer x_k = data[k-1];
 // Recursive step: Try all coefficients, but only if they work. 
 for (int c = 0; c <= sum/x_k; c++) { //the c variable caps the percent
 if (T.contains(new Point((sum - c * x_k), (k-1)))) {
 // mark the coefficient of x_k to be c
 coeff[k-1] = c;
 if (data.length - k != maxRecursionDepth) {
 RecursivelyListAllThatWork((k - 1), (sum - c * x_k), T, coeff, data);
 } else {
 // delegate to thread pool when reaching depth 3
 int[] newcoeff = Arrays.copyOf(coeff, coeff.length);
 executor.submit(new RecursiveThread(k - 1, sum - c * x_k, T, newcoeff, data)); 
 }
 // unmark the coefficient of x_k
 coeff[k-1] = 0;
 }
 }
 }
 static class RecursiveThread implements Callable<void> {
 int k;
 int sum;
 int[] coeff;
 int[] data;
 List T;
 RecursiveThread(int k, int sum, List T, int[] coeff, int[] data) {
 this.k = k;
 this.sum = sum;
 this.T = T;
 this.coeff = coeff;
 this.data = data;
 System.out.println("New job for k=" + k);
 }
 public Void call() {
 RecursivelyListAllThatWork(k, sum, T, coeff, data);
 return null;
 }
 }
 public static List tableGeneator(int target_sum, int[] data) {
 List T = new ArrayList();
 T.add(new Point(0, 0));
 float max_percent = 1;
 int R = (int) (target_sum * max_percent * data.length);
 for (int i = 0; i < data.length; i++) {
 for (int s = -R; s < R + 1; s++) {
 int max_value = (int) Math.abs((target_sum * max_percent) / data[i]);
 for (int c = 0; c < max_value + 1; c++) {
 if (T.contains(new Point(s - c * data[i], i))) {
 Point p = new Point(s, i + 1);
 if (!T.contains(p)) {
 T.add(p);
 }
 }
 }
 }
 }
 return T;
 }
}
</void></runnable>


1) Вместо

if k == 0:
 print what we have so far
 return

вы можете проверить, сколько коэффициентов отличное от нуля; если этот счет больше определенного порога (3 в вашем примере), то просто не печатайте его. (Подсказка: это будет тесно связано с

mark the coefficient of x_k to be c

линии.)

2) Рекурсивные функции обычно экспоненциальны по своему характеру, а это означает, что по мере увеличения масштаба время выполнения будет резко увеличиваться.

С учетом этого вы можете применять многопоточность для вычисления таблицы и рекурсивной функции.

При рассмотрении таблицы подумайте, какие части цикла влияют друг на друга и должны выполняться последовательно; Разумеется, обратное - найти, какие части не влияют друг на друга и могут выполняться параллельно.

Что касается рекурсивной функции, лучшим вариантом будет, вероятно, применение многопоточности к ветвящейся части.


Они делают это для многопоточности, чтобы убедиться, что у вас нет ненужных глобальных структур данных, например, ваших "меток" для коэффициентов.

Скажем, у вас есть K чисел n [0]... n [K-1] в вашей таблице, а сумма, которую вы хотите достичь, равна S. Я предполагаю ниже, что массив n [] сортируется от наименьшего к наибольшему число.

Здесь приведен простой алгоритм перечисления. я - индекс в список чисел, s - это уже существующая сумма, а cs - список коэффициентов для чисел 0.. я - 1:

function enumerate(i, s, cs):
 if (s == S):
 output_solution(cs)
 else if (i == K):
 return // dead end
 else if ((S - s) < n[i]):
 return // no solution can be found
 else:
 for c in 0 .. floor((S - s) / n[i]): // note: floor(...) > 0
 enumerate(i + 1, s + c * n[i], append(cs, c))

Чтобы запустить процесс:

enumerate(0, 0, make_empty_list())

Теперь здесь нет глобальных структур данных, кроме таблицы n [] (константные данные), а "enumerate" также ничего не возвращает, поэтому вы можете изменить рекурсивный вызов для запуска в своем потоке по вашей воле. Например. вы можете создать новый поток для рекурсивного вызова enumerate(), если у вас слишком много потоков, которые уже запущены, и в этом случае вы ждете.

licensed under cc by-sa 3.0 with attribution.