Commit d1fdbadd authored by Luke Campagnola's avatar Luke Campagnola
Browse files

Multiprocessing updates / fixes:

   - ForkedProcess is much more careful with inherited state -- closes file handles, removes atexit and excepthook callbacks
   - Remote processes copy sys.path from parent
   - Parallelizer has ProgressDialog support
   - Many docstring updates
   - Added some test code for remote GraphicsView rendering
parent cc93c7ba
# -*- coding: utf-8 -*-
import initExample ## Add path to library (just for examples; you do not need this)
from pyqtgraph.Qt import QtGui, QtCore
import pyqtgraph as pg
app = pg.mkQApp()
v = pg.RemoteGraphicsView()
v.show()
QtGui = v.pg.QtGui
rect = QtGui.QGraphicsRectItem(0,0,10,10)
rect.setPen(QtGui.QPen(QtGui.QColor(255,255,0)))
v.scene().addItem(rect)
## Start Qt event loop unless running in interactive mode or using pyside.
import sys
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()
# -*- coding: utf-8 -*-
import initExample ## Add path to library (just for examples; you do not need this)
import numpy as np
import pyqtgraph.multiprocess as mp
from pyqtgraph.multiprocess.parallelizer import Parallelize #, Parallelizer
import pyqtgraph as pg
import time
print "\n=================\nParallelize"
tasks = [1,2,4,8]
results = [None] * len(tasks)
size = 2000000
start = time.time()
with Parallelize(enumerate(tasks), results=results, workers=1) as tasker:
for i, x in tasker:
print i, x
tot = 0
for j in xrange(size):
tot += j * x
results[i] = tot
print results
print "serial:", time.time() - start
start = time.time()
with Parallelize(enumerate(tasks), results=results) as tasker:
for i, x in tasker:
print i, x
tot = 0
for j in xrange(size):
tot += j * x
results[i] = tot
print results
print "parallel:", time.time() - start
......
# -*- coding: utf-8 -*-
import initExample ## Add path to library (just for examples; you do not need this)
import numpy as np
import pyqtgraph.multiprocess as mp
import pyqtgraph as pg
import time
print "\n=================\nParallelize"
## Do a simple task:
## for x in range(N):
## sum([x*i for i in range(M)])
##
## We'll do this three times
## - once without Parallelize
## - once with Parallelize, but forced to use a single worker
## - once with Parallelize automatically determining how many workers to use
##
tasks = range(10)
results = [None] * len(tasks)
results2 = results[:]
results3 = results[:]
size = 2000000
pg.mkQApp()
### Purely serial processing
start = time.time()
with pg.ProgressDialog('processing serially..', maximum=len(tasks)) as dlg:
for i, x in enumerate(tasks):
tot = 0
for j in xrange(size):
tot += j * x
results[i] = tot
dlg += 1
if dlg.wasCanceled():
raise Exception('processing canceled')
print "Serial time: %0.2f" % (time.time() - start)
### Use parallelize, but force a single worker
### (this simulates the behavior seen on windows, which lacks os.fork)
start = time.time()
with mp.Parallelize(enumerate(tasks), results=results2, workers=1, progressDialog='processing serially (using Parallelizer)..') as tasker:
for i, x in tasker:
tot = 0
for j in xrange(size):
tot += j * x
tasker.results[i] = tot
print "\nParallel time, 1 worker: %0.2f" % (time.time() - start)
print "Results match serial: ", results2 == results
### Use parallelize with multiple workers
start = time.time()
with mp.Parallelize(enumerate(tasks), results=results3, progressDialog='processing in parallel..') as tasker:
for i, x in tasker:
tot = 0
for j in xrange(size):
tot += j * x
tasker.results[i] = tot
print "\nParallel time, %d workers: %0.2f" % (mp.Parallelize.suggestedWorkerCount(), time.time() - start)
print "Results match serial: ", results3 == results
......@@ -20,3 +20,5 @@ TODO:
"""
from processes import *
from parallelizer import Parallelize, CanceledError
from remoteproxy import proxy
\ No newline at end of file
"""For starting up remote processes"""
import sys, pickle
if __name__ == '__main__':
name, port, authkey, targetStr, path = pickle.load(sys.stdin)
if path is not None:
## rewrite sys.path without assigning a new object--no idea who already has a reference to the existing list.
while len(sys.path) > 0:
sys.path.pop()
sys.path.extend(path)
#import pyqtgraph
#import pyqtgraph.multiprocess.processes
target = pickle.loads(targetStr) ## unpickling the target should import everything we need
target(name, port, authkey)
sys.exit(0)
......@@ -2,6 +2,10 @@ import os, sys, time, multiprocessing
from processes import ForkedProcess
from remoteproxy import ExitError
class CanceledError(Exception):
"""Raised when the progress dialog is canceled during a processing operation."""
pass
class Parallelize:
"""
Class for ultra-simple inline parallelization on multi-core CPUs
......@@ -29,35 +33,78 @@ class Parallelize:
print results
The only major caveat is that *result* in the example above must be picklable.
The only major caveat is that *result* in the example above must be picklable,
since it is automatically sent via pipe back to the parent process.
"""
def __init__(self, tasks, workers=None, block=True, **kwds):
def __init__(self, tasks, workers=None, block=True, progressDialog=None, **kwds):
"""
Args:
tasks - list of objects to be processed (Parallelize will determine how to distribute the tasks)
workers - number of worker processes or None to use number of CPUs in the system
kwds - objects to be shared by proxy with child processes
=============== ===================================================================
Arguments:
tasks list of objects to be processed (Parallelize will determine how to
distribute the tasks)
workers number of worker processes or None to use number of CPUs in the
system
progressDialog optional dict of arguments for ProgressDialog
to update while tasks are processed
kwds objects to be shared by proxy with child processes (they will
appear as attributes of the tasker)
=============== ===================================================================
"""
self.block = block
## Generate progress dialog.
## Note that we want to avoid letting forked child processes play with progress dialogs..
self.showProgress = False
if progressDialog is not None:
self.showProgress = True
if isinstance(progressDialog, basestring):
progressDialog = {'labelText': progressDialog}
import pyqtgraph as pg
self.progressDlg = pg.ProgressDialog(**progressDialog)
if workers is None:
workers = multiprocessing.cpu_count()
workers = self.suggestedWorkerCount()
if not hasattr(os, 'fork'):
workers = 1
self.workers = workers
self.tasks = list(tasks)
self.kwds = kwds
self.kwds = kwds.copy()
self.kwds['_taskStarted'] = self._taskStarted
def __enter__(self):
self.proc = None
workers = self.workers
if workers == 1:
return Tasker(None, self.tasks, self.kwds)
if self.workers == 1:
return self.runSerial()
else:
return self.runParallel()
def __exit__(self, *exc_info):
if self.proc is not None: ## worker
try:
if exc_info[0] is not None:
sys.excepthook(*exc_info)
finally:
#print os.getpid(), 'exit'
os._exit(0)
else: ## parent
if self.showProgress:
self.progressDlg.__exit__(None, None, None)
def runSerial(self):
if self.showProgress:
self.progressDlg.__enter__()
self.progressDlg.setMaximum(len(self.tasks))
self.progress = {os.getpid(): []}
return Tasker(None, self.tasks, self.kwds)
def runParallel(self):
self.childs = []
## break up tasks into one set per worker
workers = self.workers
chunks = [[] for i in xrange(workers)]
i = 0
for i in range(len(self.tasks)):
......@@ -72,30 +119,74 @@ class Parallelize:
else:
self.childs.append(proc)
## process events from workers until all have exited.
activeChilds = self.childs[:]
while len(activeChilds) > 0:
for ch in activeChilds:
## Keep track of the progress of each worker independently.
self.progress = {ch.childPid: [] for ch in self.childs}
## for each child process, self.progress[pid] is a list
## of task indexes. The last index is the task currently being
## processed; all others are finished.
try:
if self.showProgress:
self.progressDlg.__enter__()
self.progressDlg.setMaximum(len(self.tasks))
## process events from workers until all have exited.
activeChilds = self.childs[:]
pollInterval = 0.01
while len(activeChilds) > 0:
waitingChildren = 0
rem = []
try:
ch.processRequests()
except ExitError:
rem.append(ch)
for ch in rem:
activeChilds.remove(ch)
time.sleep(0.1)
for ch in activeChilds:
try:
n = ch.processRequests()
if n > 0:
waitingChildren += 1
except ExitError:
#print ch.childPid, 'process finished'
rem.append(ch)
if self.showProgress:
self.progressDlg += 1
#print "remove:", [ch.childPid for ch in rem]
for ch in rem:
activeChilds.remove(ch)
os.waitpid(ch.childPid, 0)
#print [ch.childPid for ch in activeChilds]
if self.showProgress and self.progressDlg.wasCanceled():
for ch in activeChilds:
ch.kill()
raise CanceledError()
## adjust polling interval--prefer to get exactly 1 event per poll cycle.
if waitingChildren > 1:
pollInterval *= 0.7
elif waitingChildren == 0:
pollInterval /= 0.7
pollInterval = max(min(pollInterval, 0.5), 0.0005) ## but keep it within reasonable limits
time.sleep(pollInterval)
finally:
if self.showProgress:
self.progressDlg.__exit__(None, None, None)
return [] ## no tasks for parent process.
@staticmethod
def suggestedWorkerCount():
return multiprocessing.cpu_count() ## is this really the best option?
def __exit__(self, *exc_info):
if exc_info[0] is not None:
sys.excepthook(*exc_info)
if self.proc is not None:
os._exit(0)
def _taskStarted(self, pid, i, **kwds):
## called remotely by tasker to indicate it has started working on task i
#print pid, 'reported starting task', i
if self.showProgress:
if len(self.progress[pid]) > 0:
self.progressDlg += 1
if pid == os.getpid(): ## single-worker process
if self.progressDlg.wasCanceled():
raise CanceledError()
self.progress[pid].append(i)
def wait(self):
## wait for all child processes to finish
pass
class Tasker:
def __init__(self, proc, tasks, kwds):
......@@ -106,9 +197,13 @@ class Tasker:
def __iter__(self):
## we could fix this up such that tasks are retrieved from the parent process one at a time..
for task in self.tasks:
for i, task in enumerate(self.tasks):
self.index = i
#print os.getpid(), 'starting task', i
self._taskStarted(os.getpid(), i, _callSync='off')
yield task
if self.proc is not None:
#print os.getpid(), 'no more tasks'
self.proc.close()
......
from remoteproxy import RemoteEventHandler, ExitError, NoResultError, LocalObjectProxy, ObjectProxy
import subprocess, atexit, os, sys, time, random, socket
import subprocess, atexit, os, sys, time, random, socket, signal
import cPickle as pickle
import multiprocessing.connection
__all__ = ['Process', 'QtProcess', 'ForkedProcess', 'ExitError', 'NoResultError']
class Process(RemoteEventHandler):
def __init__(self, name=None, target=None):
"""
Bases: RemoteEventHandler
This class is used to spawn and control a new python interpreter.
It uses subprocess.Popen to start the new process and communicates with it
using multiprocessing.Connection objects over a network socket.
By default, the remote process will immediately enter an event-processing
loop that carries out requests send from the parent process.
Remote control works mainly through proxy objects::
proc = Process() ## starts process, returns handle
rsys = proc._import('sys') ## asks remote process to import 'sys', returns
## a proxy which references the imported module
rsys.stdout.write('hello\n') ## This message will be printed from the remote
## process. Proxy objects can usually be used
## exactly as regular objects are.
proc.close() ## Request the remote process shut down
Requests made via proxy objects may be synchronous or asynchronous and may
return objects either by proxy or by value (if they are picklable). See
ProxyObject for more information.
"""
def __init__(self, name=None, target=None, copySysPath=True):
"""
============ =============================================================
Arguments:
name Optional name for this process used when printing messages
from the remote process.
target Optional function to call after starting remote process.
By default, this is startEventLoop(), which causes the remote
process to process requests from the parent process until it
is asked to quit. If you wish to specify a different target,
it must be picklable (bound methods are not).
copySysPath If true, copy the contents of sys.path to the remote process
============ =============================================================
"""
if target is None:
target = startEventLoop
if name is None:
......@@ -25,8 +66,12 @@ class Process(RemoteEventHandler):
port += 1
## start remote process, instruct it to run target function
self.proc = subprocess.Popen((sys.executable, __file__, 'remote'), stdin=subprocess.PIPE)
pickle.dump((name+'_child', port, authkey, target), self.proc.stdin)
sysPath = sys.path if copySysPath else None
bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py'))
self.proc = subprocess.Popen((sys.executable, bootstrap), stdin=subprocess.PIPE)
targetStr = pickle.dumps(target) ## double-pickle target so that child has a chance to
## set its sys.path properly before unpickling the target
pickle.dump((name+'_child', port, authkey, targetStr, sysPath), self.proc.stdin)
self.proc.stdin.close()
## open connection for remote process
......@@ -60,16 +105,29 @@ def startEventLoop(name, port, authkey):
class ForkedProcess(RemoteEventHandler):
"""
ForkedProcess is a substitute for Process that uses os.fork() to generate a new process.
This is much faster than starting a completely new interpreter, but carries some caveats
and limitations:
- open file handles are shared with the parent process, which is potentially dangerous
- it is not possible to have a QApplication in both parent and child process
(unless both QApplications are created _after_ the call to fork())
- generally not thread-safe. Also, threads are not copied by fork(); the new process
will have only one thread that starts wherever fork() was called in the parent process.
- forked processes are unceremoniously terminated when join() is called; they are not
given any opportunity to clean up. (This prevents them calling any cleanup code that
was only intended to be used by the parent process)
This is much faster than starting a completely new interpreter and child processes
automatically have a copy of the entire program state from before the fork. This
makes it an appealing approach when parallelizing expensive computations. (see
also Parallelizer)
However, fork() comes with some caveats and limitations:
- fork() is not available on Windows.
- It is not possible to have a QApplication in both parent and child process
(unless both QApplications are created _after_ the call to fork())
Attempts by the forked process to access Qt GUI elements created by the parent
will most likely cause the child to crash.
- Likewise, database connections are unlikely to function correctly in a forked child.
- Threads are not copied by fork(); the new process
will have only one thread that starts wherever fork() was called in the parent process.
- Forked processes are unceremoniously terminated when join() is called; they are not
given any opportunity to clean up. (This prevents them calling any cleanup code that
was only intended to be used by the parent process)
- Normally when fork()ing, open file handles are shared with the parent process,
which is potentially dangerous. ForkedProcess is careful to close all file handles
that are not explicitly needed--stdout, stderr, and a single pipe to the parent
process.
"""
def __init__(self, name=None, target=0, preProxy=None):
......@@ -101,16 +159,46 @@ class ForkedProcess(RemoteEventHandler):
pid = os.fork()
if pid == 0:
self.isParent = False
## We are now in the forked process; need to be extra careful what we touch while here.
## - no reading/writing file handles/sockets owned by parent process (stdout is ok)
## - don't touch QtGui or QApplication at all; these are landmines.
## - don't let the process call exit handlers
## -
## close all file handles we do not want shared with parent
conn.close()
sys.stdin.close() ## otherwise we screw with interactive prompts.
fid = remoteConn.fileno()
os.closerange(3, fid)
os.closerange(fid+1, 4096) ## just guessing on the maximum descriptor count..
## Override any custom exception hooks
def excepthook(*args):
import traceback
traceback.print_exception(*args)
sys.excepthook = excepthook
## Make it harder to access QApplication instance
if 'PyQt4.QtGui' in sys.modules:
sys.modules['PyQt4.QtGui'].QApplication = None
sys.modules.pop('PyQt4.QtGui', None)
sys.modules.pop('PyQt4.QtCore', None)
## sabotage atexit callbacks
atexit._exithandlers = []
atexit.register(lambda: os._exit(0))
RemoteEventHandler.__init__(self, remoteConn, name+'_child', pid=os.getppid())
if target is not None:
target()
ppid = os.getppid()
self.forkedProxies = {}
for name, proxyId in proxyIDs.iteritems():
self.forkedProxies[name] = ObjectProxy(ppid, proxyId=proxyId, typeStr=repr(preProxy[name]))
if target is not None:
target()
else:
self.isParent = True
self.childPid = pid
......@@ -127,10 +215,11 @@ class ForkedProcess(RemoteEventHandler):
self.processRequests() # exception raised when the loop should exit
time.sleep(0.01)
except ExitError:
sys.exit(0)
break
except:
print "Error occurred in forked event loop:"
sys.excepthook(*sys.exc_info())
sys.exit(0)
def join(self, timeout=10):
if self.hasJoined:
......@@ -138,10 +227,19 @@ class ForkedProcess(RemoteEventHandler):
#os.kill(pid, 9)
try:
self.close(callSync='sync', timeout=timeout, noCleanup=True) ## ask the child process to exit and require that it return a confirmation.
os.waitpid(self.childPid, 0)
except IOError: ## probably remote process has already quit
pass
self.hasJoined = True
def kill(self):
"""Immediately kill the forked remote process.
This is generally safe because forked processes are already
expected to _avoid_ any cleanup at exit."""
os.kill(self.childPid, signal.SIGKILL)
self.hasJoined = True
##Special set of subclasses that implement a Qt event loop instead.
......@@ -165,8 +263,33 @@ class RemoteQtEventHandler(RemoteEventHandler):
#raise
class QtProcess(Process):
def __init__(self, name=None):
Process.__init__(self, name, target=startQtEventLoop)
"""
QtProcess is essentially the same as Process, with two major differences:
- The remote process starts by running startQtEventLoop() which creates a
QApplication in the remote process and uses a QTimer to trigger
remote event processing. This allows the remote process to have its own
GUI.
- A QTimer is also started on the parent process which polls for requests
from the child process. This allows Qt signals emitted within the child
process to invoke slots on the parent process and vice-versa.
Example::
proc = QtProcess()
rQtGui = proc._import('PyQt4.QtGui')
btn = rQtGui.QPushButton('button on child process')
btn.show()
def slot():
print 'slot invoked on parent process'
btn.clicked.connect(proxy(slot)) # be sure to send a proxy of the slot
"""
def __init__(self, **kwds):
if 'target' not in kwds:
kwds['target'] = startQtEventLoop
Process.__init__(self, **kwds)
self.startEventTimer()
def startEventTimer(self):
......@@ -201,8 +324,3 @@ def startQtEventLoop(name, port, authkey):
app.exec_()
if __name__ == '__main__':
if len(sys.argv) == 2 and sys.argv[1] == 'remote': ## module has been invoked as script in new python interpreter.
name, port, authkey, target = pickle.load(sys.stdin)
target(name, port, authkey)
sys.exit(0)
......@@ -9,7 +9,26 @@ class NoResultError(Exception):
class RemoteEventHandler(object):
"""
This class handles communication between two processes. One instance is present on
each process and listens for communication from the other process. This enables
(amongst other things) ObjectProxy instances to look up their attributes and call
their methods.
This class is responsible for carrying out actions on behalf of the remote process.
Each instance holds one end of a Connection which allows python
objects to be passed between processes.
For the most common operations, see _import(), close(), and transfer()
To handle and respond to incoming requests, RemoteEventHandler requires that its
processRequests method is called repeatedly (this is usually handled by the Process
classes defined in multiprocess.processes).