Передача от родителей к родителям в многопроцессорности Python

Я пишу python script, который будет анализировать файл быстро, отправляя строки в разные процессы для обработки. В конце я хочу, чтобы родитель получал результаты от каждого дочернего процесса, а затем мог управлять этим. Вот код:

#!/usr/bin/env python
import os
import re
from datetime import datetime
from multiprocessing import Process, JoinableQueue
class LineConsumer(Process):
 def __init__(self, queue):
 self.queue = queue
 self.lines = 0
 super(LineConsumer, self).__init__( )
 def run(self):
 print "My PID is %d" % self.pid
 while True:
 line = self.queue.get( )
 print self.lines
 if ':' in line:
 self.lines += 1
 self.queue.task_done( )
class Parser(object):
 def __init__(self, filename, processes=4):
 self.filename = filename
 self.processes = processes
 def parse(self):
 queue = JoinableQueue(100)
 consumers = [ ]
 parents = [ ]
 for i in range(0, self.processes):
 lc = LineConsumer(queue)
 lc.start( )
 consumers.append(lc)
 starttime = datetime.now( )
 problem = False
 numlines = 0
 with open(self.filename, 'r') as data:
 for line in data:
 numlines += 1
 def checkAlive(p):
 if not p.is_alive( ):
 return False
 return True
 alive = map(checkAlive, consumers)
 if False in alive:
 problem = True
 print "A process died!!!"
 break
 queue.put(line)
 if not problem:
 queue.join( )
 for p in consumers:
 print p.lines( )
 p.terminate( )
 p.join( )
 endtime = datetime.now( )
 timedelta = endtime - starttime 
 lps = numlines / timedelta.total_seconds( )
 print "Processed packets at %f lps" % lps
if __name__ == "__main__":
 import sys
 if len(sys.argv) != 2:
 print "Supply a file to read"
 sys.exit(1)
 parser = Parser(sys.argv[1])
 parser.parse( )

Вот результаты:

My PID is 11578
My PID is 11579
My PID is 11580
My PID is 11581
0
1
0
2
1
3
2
1
...
555
627
564
556
628
0
0
0
0
Processed packets at 27189.771341 lps

Как вы можете видеть, каждый ребенок может сохранить свой счет строки, но когда я пытаюсь получить доступ к счету от родителя, я продолжаю получать 0. Как я могу отправить счетчик строк родительскому?

2 ответа

Вы можете передать значения в очередь результатов.

В LineConsumer:

def __init__(self, queue, result_queue):
 self.result_queue = result_queue
 # ...
def terminate(self):
 self.results_queue.put(self.lines)
 super(LineConsumer, self).terminate()

В Parser:

queue = JoinableQueue(100)
result_queue = Queue()
# ...
 lc = LineConsumer(queue, result_queue)
# ...
for p in consumers:
 p.terminate()
 p.join()
while True:
 try:
 print results.queue.get(False)
 except Queue.Empty: # need to import Queue
 break


Я предполагаю, что проблема заключается в том, что вы пытаетесь получить доступ к копии объекта, которая была фактически изменена в другом процессе (дочернем). Чтобы отправить ответ обратно родителям, вам необходимо использовать явное межпроцессное общение. То есть что-то вроде JoinableQueue, которое вы используете для передачи информации детям.

btw, я не вижу, как вы отправляете свои данные потребителям.

licensed under cc by-sa 3.0 with attribution.