Commit 551ac780 authored by Luke Campagnola's avatar Luke Campagnola
Browse files

mp fixes

parent c84c94c1
......@@ -5,20 +5,25 @@ if __name__ == '__main__':
if hasattr(os, 'setpgrp'):
os.setpgrp() ## prevents signals (notably keyboard interrupt) being forwarded from parent to this process
if sys.version[0] == '3':
name, port, authkey, ppid, targetStr, path, pyside = pickle.load(sys.stdin.buffer)
#name, port, authkey, ppid, targetStr, path, pyside = pickle.load(sys.stdin.buffer)
opts = pickle.load(sys.stdin.buffer)
else:
name, port, authkey, ppid, targetStr, path, pyside = pickle.load(sys.stdin)
#name, port, authkey, ppid, targetStr, path, pyside = pickle.load(sys.stdin)
opts = pickle.load(sys.stdin)
#print "key:", ' '.join([str(ord(x)) for x in authkey])
path = opts.pop('path', None)
if path is not None:
## rewrite sys.path without assigning a new object--no idea who already has a reference to the existing list.
while len(sys.path) > 0:
sys.path.pop()
sys.path.extend(path)
if pyside:
if opts.pop('pyside', False):
import PySide
#import pyqtgraph
#import pyqtgraph.multiprocess.processes
targetStr = opts.pop('targetStr')
target = pickle.loads(targetStr) ## unpickling the target should import everything we need
target(name, port, authkey, ppid)
#target(name, port, authkey, ppid)
target(**opts) ## Send all other options to the target function
sys.exit(0)
......@@ -35,7 +35,7 @@ class Process(RemoteEventHandler):
ProxyObject for more information.
"""
def __init__(self, name=None, target=None, executable=None, copySysPath=True):
def __init__(self, name=None, target=None, executable=None, copySysPath=True, debug=False):
"""
============ =============================================================
Arguments:
......@@ -46,7 +46,9 @@ class Process(RemoteEventHandler):
process to process requests from the parent process until it
is asked to quit. If you wish to specify a different target,
it must be picklable (bound methods are not).
copySysPath If true, copy the contents of sys.path to the remote process
copySysPath If True, copy the contents of sys.path to the remote process
debug If True, print detailed information about communication
with the child process.
============ =============================================================
"""
......@@ -56,6 +58,7 @@ class Process(RemoteEventHandler):
name = str(self)
if executable is None:
executable = sys.executable
self.debug = debug
## random authentication key
authkey = os.urandom(20)
......@@ -75,23 +78,46 @@ class Process(RemoteEventHandler):
## start remote process, instruct it to run target function
sysPath = sys.path if copySysPath else None
bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py'))
self.debugMsg('Starting child process (%s %s)' % (executable, bootstrap))
self.proc = subprocess.Popen((executable, bootstrap), stdin=subprocess.PIPE)
targetStr = pickle.dumps(target) ## double-pickle target so that child has a chance to
## set its sys.path properly before unpickling the target
pid = os.getpid() # we must sent pid to child because windows does not have getppid
pid = os.getpid() # we must send pid to child because windows does not have getppid
pyside = USE_PYSIDE
## Send everything the remote process needs to start correctly
pickle.dump((name+'_child', port, authkey, pid, targetStr, sysPath, pyside), self.proc.stdin)
data = dict(
name=name+'_child',
port=port,
authkey=authkey,
ppid=pid,
targetStr=targetStr,
path=sysPath,
pyside=pyside,
debug=debug
)
pickle.dump(data, self.proc.stdin)
self.proc.stdin.close()
## open connection for remote process
conn = l.accept()
RemoteEventHandler.__init__(self, conn, name+'_parent', pid=self.proc.pid)
self.debugMsg('Listening for child process..')
while True:
try:
conn = l.accept()
break
except IOError as err:
if err.errno == 4: # interrupted; try again
continue
else:
raise
RemoteEventHandler.__init__(self, conn, name+'_parent', pid=self.proc.pid, debug=debug)
self.debugMsg('Connected to child process.')
atexit.register(self.join)
def join(self, timeout=10):
self.debugMsg('Joining child process..')
if self.proc.poll() is None:
self.close()
start = time.time()
......@@ -99,13 +125,14 @@ class Process(RemoteEventHandler):
if timeout is not None and time.time() - start > timeout:
raise Exception('Timed out waiting for remote process to end.')
time.sleep(0.05)
self.debugMsg('Child process exited. (%d)' % self.proc.returncode)
def startEventLoop(name, port, authkey, ppid):
def startEventLoop(name, port, authkey, ppid, debug=False):
conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
global HANDLER
#ppid = 0 if not hasattr(os, 'getppid') else os.getppid()
HANDLER = RemoteEventHandler(conn, name, ppid)
HANDLER = RemoteEventHandler(conn, name, ppid, debug=debug)
while True:
try:
HANDLER.processRequests() # exception raised when the loop should exit
......@@ -329,7 +356,7 @@ class QtProcess(Process):
except ClosedError:
self.timer.stop()
def startQtEventLoop(name, port, authkey, ppid):
def startQtEventLoop(name, port, authkey, ppid, debug=False):
conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
from pyqtgraph.Qt import QtGui, QtCore
#from PyQt4 import QtGui, QtCore
......@@ -342,7 +369,7 @@ def startQtEventLoop(name, port, authkey, ppid):
global HANDLER
#ppid = 0 if not hasattr(os, 'getppid') else os.getppid()
HANDLER = RemoteQtEventHandler(conn, name, ppid)
HANDLER = RemoteQtEventHandler(conn, name, ppid, debug=debug)
HANDLER.startEventTimer()
app.exec_()
......
......@@ -42,7 +42,8 @@ class RemoteEventHandler(object):
handlers = {} ## maps {process ID : handler}. This allows unpickler to determine which process
## an object proxy belongs to
def __init__(self, connection, name, pid):
def __init__(self, connection, name, pid, debug=False):
self.debug = debug
self.conn = connection
self.name = name
self.results = {} ## reqId: (status, result); cache of request results received from the remote process
......@@ -76,6 +77,11 @@ class RemoteEventHandler(object):
print(pid, cls.handlers)
raise
def debugMsg(self, msg):
if not self.debug:
return
print("[%d] %s" % (os.getpid(), str(msg)))
def getProxyOption(self, opt):
return self.proxyOptions[opt]
......@@ -91,7 +97,9 @@ class RemoteEventHandler(object):
after no more events are immediately available. (non-blocking)
Returns the number of events processed.
"""
self.debugMsg('processRequests:')
if self.exited:
self.debugMsg(' processRequests: exited already; raise ClosedError.')
raise ClosedError()
numProcessed = 0
......@@ -100,37 +108,64 @@ class RemoteEventHandler(object):
self.handleRequest()
numProcessed += 1
except ClosedError:
self.debugMsg(' processRequests: got ClosedError from handleRequest; setting exited=True.')
self.exited = True
raise
except IOError as err:
if err.errno == 4: ## interrupted system call; try again
continue
else:
raise
#except IOError as err: ## let handleRequest take care of this.
#self.debugMsg(' got IOError from handleRequest; try again.')
#if err.errno == 4: ## interrupted system call; try again
#continue
#else:
#raise
except:
print("Error in process %s" % self.name)
sys.excepthook(*sys.exc_info())
self.debugMsg(' processRequests: finished %d requests' % numProcessed)
return numProcessed
def handleRequest(self):
"""Handle a single request from the remote process.
Blocks until a request is available."""
result = None
try:
cmd, reqId, nByteMsgs, optStr = self.conn.recv() ## args, kwds are double-pickled to ensure this recv() call never fails
except (EOFError, IOError):
## remote process has shut down; end event loop
raise ClosedError()
#print os.getpid(), "received request:", cmd, reqId
while True:
try:
## args, kwds are double-pickled to ensure this recv() call never fails
cmd, reqId, nByteMsgs, optStr = self.conn.recv()
break
except EOFError:
self.debugMsg(' handleRequest: got EOFError from recv; raise ClosedError.')
## remote process has shut down; end event loop
raise ClosedError()
except IOError as err:
if err.errno == 4: ## interrupted system call; try again
self.debugMsg(' handleRequest: got IOError 4 from recv; try again.')
continue
else:
self.debugMsg(' handleRequest: got IOError %d from recv (%s); raise ClosedError.' % (err.errno, err.strerror))
raise ClosedError()
self.debugMsg(" handleRequest: received %s %s" % (str(cmd), str(reqId)))
## read byte messages following the main request
byteData = []
if nByteMsgs > 0:
self.debugMsg(" handleRequest: reading %d byte messages" % nByteMsgs)
for i in range(nByteMsgs):
try:
byteData.append(self.conn.recv_bytes())
except (EOFError, IOError):
raise ClosedError()
while True:
try:
byteData.append(self.conn.recv_bytes())
break
except EOFError:
self.debugMsg(" handleRequest: got EOF while reading byte messages; raise ClosedError.")
raise ClosedError()
except IOError as err:
if err.errno == 4:
self.debugMsg(" handleRequest: got IOError 4 while reading byte messages; try again.")
continue
else:
self.debugMsg(" handleRequest: got IOError while reading byte messages; raise ClosedError.")
raise ClosedError()
try:
......@@ -140,6 +175,7 @@ class RemoteEventHandler(object):
## (this is already a return from a previous request)
opts = pickle.loads(optStr)
self.debugMsg(" handleRequest: id=%s opts=%s" % (str(reqId), str(opts)))
#print os.getpid(), "received request:", cmd, reqId, opts
returnType = opts.get('returnType', 'auto')
......@@ -213,6 +249,7 @@ class RemoteEventHandler(object):
if reqId is not None:
if exc is None:
self.debugMsg(" handleRequest: sending return value for %d: %s" % (reqId, str(result)))
#print "returnValue:", returnValue, result
if returnType == 'auto':
result = self.autoProxy(result, self.proxyOptions['noProxyTypes'])
......@@ -225,6 +262,7 @@ class RemoteEventHandler(object):
sys.excepthook(*sys.exc_info())
self.replyError(reqId, *sys.exc_info())
else:
self.debugMsg(" handleRequest: returning exception for %d" % reqId)
self.replyError(reqId, *exc)
elif exc is not None:
......@@ -368,13 +406,16 @@ class RemoteEventHandler(object):
## Send primary request
request = (request, reqId, nByteMsgs, optStr)
self.debugMsg('send request: cmd=%s nByteMsgs=%d id=%s opts=%s' % (str(request[0]), nByteMsgs, str(reqId), str(opts)))
self.conn.send(request)
## follow up by sending byte messages
if byteData is not None:
for obj in byteData: ## Remote process _must_ be prepared to read the same number of byte messages!
self.conn.send_bytes(obj)
self.debugMsg(' sent %d byte messages' % len(byteData))
self.debugMsg(' call sync: %s' % callSync)
if callSync == 'off':
return
......
from pyqtgraph.Qt import QtGui, QtCore
from pyqtgraph.Qt import QtGui, QtCore, USE_PYSIDE
import pyqtgraph.multiprocess as mp
import pyqtgraph as pg
from .GraphicsView import GraphicsView
......@@ -21,13 +21,14 @@ class RemoteGraphicsView(QtGui.QWidget):
self._sizeHint = (640,480) ## no clue why this is needed, but it seems to be the default sizeHint for GraphicsView.
## without it, the widget will not compete for space against another GraphicsView.
QtGui.QWidget.__init__(self)
self._proc = mp.QtProcess()
self._proc = mp.QtProcess(debug=False)
self.pg = self._proc._import('pyqtgraph')
self.pg.setConfigOptions(**self.pg.CONFIG_OPTIONS)
rpgRemote = self._proc._import('pyqtgraph.widgets.RemoteGraphicsView')
self._view = rpgRemote.Renderer(*args, **kwds)
self._view._setProxyOptions(deferGetattr=True)
self.setFocusPolicy(QtCore.Qt.FocusPolicy(self._view.focusPolicy()))
self.setFocusPolicy(QtCore.Qt.StrongFocus)
self.setSizePolicy(QtGui.QSizePolicy.Expanding, QtGui.QSizePolicy.Expanding)
self.setMouseTracking(True)
self.shm = None
......@@ -114,6 +115,7 @@ class RemoteGraphicsView(QtGui.QWidget):
return self._proc
class Renderer(GraphicsView):
## Created by the remote process to handle render requests
sceneRendered = QtCore.Signal(object)
......@@ -175,7 +177,12 @@ class Renderer(GraphicsView):
address = ctypes.addressof(ctypes.c_char.from_buffer(self.shm, 0))
## render the scene directly to shared memory
self.img = QtGui.QImage(address, self.width(), self.height(), QtGui.QImage.Format_ARGB32)
if USE_PYSIDE:
ch = ctypes.c_char.from_buffer(self.shm, 0)
#ch = ctypes.c_char_p(address)
self.img = QtGui.QImage(ch, self.width(), self.height(), QtGui.QImage.Format_ARGB32)
else:
self.img = QtGui.QImage(address, self.width(), self.height(), QtGui.QImage.Format_ARGB32)
self.img.fill(0xffffffff)
p = QtGui.QPainter(self.img)
self.render(p, self.viewRect(), self.rect())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment