Commit 89c04c8a authored by Luke Campagnola's avatar Luke Campagnola
Browse files

Corrected bug in multiprocess causing deadlock at exit

Multiprocess debugging messages now use one color per process
Corrected RemoteGraphicsView not setting correct pg options on remote
process

New debugging tools:
* util.cprint for printing color on terminal (based on colorama)
* debug.ThreadColor causes each thread to print in a different color
* debug.PeriodicTrace used for debugging deadlocks
* Mutex for detecting deadlocks
parent adfcfa99
......@@ -7,10 +7,12 @@ Distributed under MIT/X11 license. See license.txt for more infomation.
from __future__ import print_function
import sys, traceback, time, gc, re, types, weakref, inspect, os, cProfile
import sys, traceback, time, gc, re, types, weakref, inspect, os, cProfile, threading
from . import ptime
from numpy import ndarray
from .Qt import QtCore, QtGui
from .util.Mutex import Mutex
from .util import cprint
__ftraceDepth = 0
def ftrace(func):
......@@ -991,3 +993,75 @@ class PrintDetector(object):
def flush(self):
self.stdout.flush()
class PeriodicTrace(object):
"""
Used to debug freezing by starting a new thread that reports on the
location of the main thread periodically.
"""
class ReportThread(QtCore.QThread):
def __init__(self):
self.frame = None
self.ind = 0
self.lastInd = None
self.lock = Mutex()
QtCore.QThread.__init__(self)
def notify(self, frame):
with self.lock:
self.frame = frame
self.ind += 1
def run(self):
while True:
time.sleep(1)
with self.lock:
if self.lastInd != self.ind:
print("== Trace %d: ==" % self.ind)
traceback.print_stack(self.frame)
self.lastInd = self.ind
def __init__(self):
self.mainThread = threading.current_thread()
self.thread = PeriodicTrace.ReportThread()
self.thread.start()
sys.settrace(self.trace)
def trace(self, frame, event, arg):
if threading.current_thread() is self.mainThread: # and 'threading' not in frame.f_code.co_filename:
self.thread.notify(frame)
# print("== Trace ==", event, arg)
# traceback.print_stack(frame)
return self.trace
class ThreadColor(object):
"""
Wrapper on stdout/stderr that colors text by the current thread ID.
*stream* must be 'stdout' or 'stderr'.
"""
colors = {}
lock = Mutex()
def __init__(self, stream):
self.stream = getattr(sys, stream)
self.err = stream == 'stderr'
setattr(sys, stream, self)
def write(self, msg):
with self.lock:
cprint.cprint(self.stream, self.color(), msg, -1, stderr=self.err)
def flush(self):
with self.lock:
self.stream.flush()
def color(self):
tid = threading.current_thread()
if tid not in self.colors:
c = (len(self.colors) % 15) + 1
self.colors[tid] = c
return self.colors[tid]
from .remoteproxy import RemoteEventHandler, ClosedError, NoResultError, LocalObjectProxy, ObjectProxy
import subprocess, atexit, os, sys, time, random, socket, signal
import multiprocessing.connection
from ..Qt import USE_PYSIDE
try:
import cPickle as pickle
except ImportError:
import pickle
from .remoteproxy import RemoteEventHandler, ClosedError, NoResultError, LocalObjectProxy, ObjectProxy
from ..Qt import USE_PYSIDE
from ..util import cprint # color printing for debugging
__all__ = ['Process', 'QtProcess', 'ForkedProcess', 'ClosedError', 'NoResultError']
class Process(RemoteEventHandler):
......@@ -35,7 +37,8 @@ class Process(RemoteEventHandler):
return objects either by proxy or by value (if they are picklable). See
ProxyObject for more information.
"""
_process_count = 1 # just used for assigning colors to each process for debugging
def __init__(self, name=None, target=None, executable=None, copySysPath=True, debug=False, timeout=20, wrapStdout=None):
"""
============== =============================================================
......@@ -64,7 +67,7 @@ class Process(RemoteEventHandler):
name = str(self)
if executable is None:
executable = sys.executable
self.debug = debug
self.debug = 7 if debug is True else False # 7 causes printing in white
## random authentication key
authkey = os.urandom(20)
......@@ -82,6 +85,13 @@ class Process(RemoteEventHandler):
sysPath = sys.path if copySysPath else None
bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py'))
self.debugMsg('Starting child process (%s %s)' % (executable, bootstrap))
# Decide on printing color for this process
if debug:
procDebug = (Process._process_count%6) + 1 # pick a color for this process to print in
Process._process_count += 1
else:
procDebug = False
if wrapStdout is None:
wrapStdout = sys.platform.startswith('win')
......@@ -94,8 +104,8 @@ class Process(RemoteEventHandler):
self.proc = subprocess.Popen((executable, bootstrap), stdin=subprocess.PIPE, stdout=stdout, stderr=stderr)
## to circumvent the bug and still make the output visible, we use
## background threads to pass data from pipes to stdout/stderr
self._stdoutForwarder = FileForwarder(self.proc.stdout, "stdout")
self._stderrForwarder = FileForwarder(self.proc.stderr, "stderr")
self._stdoutForwarder = FileForwarder(self.proc.stdout, "stdout", procDebug)
self._stderrForwarder = FileForwarder(self.proc.stderr, "stderr", procDebug)
else:
self.proc = subprocess.Popen((executable, bootstrap), stdin=subprocess.PIPE)
......@@ -112,7 +122,7 @@ class Process(RemoteEventHandler):
targetStr=targetStr,
path=sysPath,
pyside=USE_PYSIDE,
debug=debug
debug=procDebug
)
pickle.dump(data, self.proc.stdin)
self.proc.stdin.close()
......@@ -128,8 +138,8 @@ class Process(RemoteEventHandler):
continue
else:
raise
RemoteEventHandler.__init__(self, conn, name+'_parent', pid=self.proc.pid, debug=debug)
RemoteEventHandler.__init__(self, conn, name+'_parent', pid=self.proc.pid, debug=self.debug)
self.debugMsg('Connected to child process.')
atexit.register(self.join)
......@@ -159,10 +169,11 @@ class Process(RemoteEventHandler):
def startEventLoop(name, port, authkey, ppid, debug=False):
if debug:
import os
print('[%d] connecting to server at port localhost:%d, authkey=%s..' % (os.getpid(), port, repr(authkey)))
cprint.cout(debug, '[%d] connecting to server at port localhost:%d, authkey=%s..\n'
% (os.getpid(), port, repr(authkey)), -1)
conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
if debug:
print('[%d] connected; starting remote proxy.' % os.getpid())
cprint.cout(debug, '[%d] connected; starting remote proxy.\n' % os.getpid(), -1)
global HANDLER
#ppid = 0 if not hasattr(os, 'getppid') else os.getppid()
HANDLER = RemoteEventHandler(conn, name, ppid, debug=debug)
......@@ -372,17 +383,17 @@ class QtProcess(Process):
def __init__(self, **kwds):
if 'target' not in kwds:
kwds['target'] = startQtEventLoop
from ..Qt import QtGui ## avoid module-level import to keep bootstrap snappy.
self._processRequests = kwds.pop('processRequests', True)
if self._processRequests and QtGui.QApplication.instance() is None:
raise Exception("Must create QApplication before starting QtProcess, or use QtProcess(processRequests=False)")
Process.__init__(self, **kwds)
self.startEventTimer()
def startEventTimer(self):
from ..Qt import QtGui, QtCore ## avoid module-level import to keep bootstrap snappy.
from ..Qt import QtCore ## avoid module-level import to keep bootstrap snappy.
self.timer = QtCore.QTimer()
if self._processRequests:
app = QtGui.QApplication.instance()
if app is None:
raise Exception("Must create QApplication before starting QtProcess, or use QtProcess(processRequests=False)")
self.startRequestProcessing()
def startRequestProcessing(self, interval=0.01):
......@@ -404,10 +415,10 @@ class QtProcess(Process):
def startQtEventLoop(name, port, authkey, ppid, debug=False):
if debug:
import os
print('[%d] connecting to server at port localhost:%d, authkey=%s..' % (os.getpid(), port, repr(authkey)))
cprint.cout(debug, '[%d] connecting to server at port localhost:%d, authkey=%s..\n' % (os.getpid(), port, repr(authkey)), -1)
conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
if debug:
print('[%d] connected; starting remote proxy.' % os.getpid())
cprint.cout(debug, '[%d] connected; starting remote proxy.\n' % os.getpid(), -1)
from ..Qt import QtGui, QtCore
#from PyQt4 import QtGui, QtCore
app = QtGui.QApplication.instance()
......@@ -437,11 +448,13 @@ class FileForwarder(threading.Thread):
which ensures that the correct behavior is achieved even if
sys.stdout/stderr are replaced at runtime.
"""
def __init__(self, input, output):
def __init__(self, input, output, color):
threading.Thread.__init__(self)
self.input = input
self.output = output
self.lock = threading.Lock()
self.daemon = True
self.color = color
self.start()
def run(self):
......@@ -449,12 +462,12 @@ class FileForwarder(threading.Thread):
while True:
line = self.input.readline()
with self.lock:
sys.stdout.write(line)
cprint.cout(self.color, line, -1)
elif self.output == 'stderr':
while True:
line = self.input.readline()
with self.lock:
sys.stderr.write(line)
cprint.cerr(self.color, line, -1)
else:
while True:
line = self.input.readline()
......
......@@ -7,6 +7,9 @@ except ImportError:
import builtins
import pickle
# color printing for debugging
from ..util import cprint
class ClosedError(Exception):
"""Raised when an event handler receives a request to close the connection
or discovers that the connection has been closed."""
......@@ -80,7 +83,7 @@ class RemoteEventHandler(object):
def debugMsg(self, msg):
if not self.debug:
return
print("[%d] %s" % (os.getpid(), str(msg)))
cprint.cout(self.debug, "%d [%d] %s\n" % (self.debug, os.getpid(), str(msg)), -1)
def getProxyOption(self, opt):
return self.proxyOptions[opt]
......
# -*- coding: utf-8 -*-
"""
Mutex.py - Stand-in extension of Qt's QMutex class
Copyright 2010 Luke Campagnola
Distributed under MIT/X11 license. See license.txt for more infomation.
"""
from PyQt4 import QtCore
import traceback
class Mutex(QtCore.QMutex):
"""Extends QMutex to provide warning messages when a mutex stays locked for a long time.
Mostly just useful for debugging purposes. Should only be used with MutexLocker, not
QMutexLocker.
"""
def __init__(self, *args, **kargs):
if kargs.get('recursive', False):
args = (QtCore.QMutex.Recursive,)
QtCore.QMutex.__init__(self, *args)
self.l = QtCore.QMutex() ## for serializing access to self.tb
self.tb = []
self.debug = False ## True to enable debugging functions
def tryLock(self, timeout=None, id=None):
if timeout is None:
locked = QtCore.QMutex.tryLock(self)
else:
locked = QtCore.QMutex.tryLock(self, timeout)
if self.debug and locked:
self.l.lock()
try:
if id is None:
self.tb.append(''.join(traceback.format_stack()[:-1]))
else:
self.tb.append(" " + str(id))
#print 'trylock', self, len(self.tb)
finally:
self.l.unlock()
return locked
def lock(self, id=None):
c = 0
waitTime = 5000 # in ms
while True:
if self.tryLock(waitTime, id):
break
c += 1
if self.debug:
self.l.lock()
try:
print "Waiting for mutex lock (%0.1f sec). Traceback follows:" % (c*waitTime/1000.)
traceback.print_stack()
if len(self.tb) > 0:
print "Mutex is currently locked from:\n", self.tb[-1]
else:
print "Mutex is currently locked from [???]"
finally:
self.l.unlock()
#print 'lock', self, len(self.tb)
def unlock(self):
QtCore.QMutex.unlock(self)
if self.debug:
self.l.lock()
try:
#print 'unlock', self, len(self.tb)
if len(self.tb) > 0:
self.tb.pop()
else:
raise Exception("Attempt to unlock mutex before it has been locked")
finally:
self.l.unlock()
def depth(self):
self.l.lock()
n = len(self.tb)
self.l.unlock()
return n
def traceback(self):
self.l.lock()
try:
ret = self.tb[:]
finally:
self.l.unlock()
return ret
def __exit__(self, *args):
self.unlock()
def __enter__(self):
self.lock()
return self
class MutexLocker:
def __init__(self, lock):
#print self, "lock on init",lock, lock.depth()
self.lock = lock
self.lock.lock()
self.unlockOnDel = True
def unlock(self):
#print self, "unlock by req",self.lock, self.lock.depth()
self.lock.unlock()
self.unlockOnDel = False
def relock(self):
#print self, "relock by req",self.lock, self.lock.depth()
self.lock.lock()
self.unlockOnDel = True
def __del__(self):
if self.unlockOnDel:
#print self, "Unlock by delete:", self.lock, self.lock.depth()
self.lock.unlock()
#else:
#print self, "Skip unlock by delete", self.lock, self.lock.depth()
def __exit__(self, *args):
if self.unlockOnDel:
self.unlock()
def __enter__(self):
return self
def mutex(self):
return self.lock
#import functools
#def methodWrapper(fn, self, *args, **kargs):
#print repr(fn), repr(self), args, kargs
#obj = self.__wrapped_object__()
#return getattr(obj, fn)(*args, **kargs)
##def WrapperClass(clsName, parents, attrs):
##for parent in parents:
##for name in dir(parent):
##attr = getattr(parent, name)
##if callable(attr) and name not in attrs:
##attrs[name] = functools.partial(funcWrapper, name)
##return type(clsName, parents, attrs)
#def WrapperClass(name, bases, attrs):
#for n in ['__getattr__', '__setattr__', '__getitem__', '__setitem__']:
#if n not in attrs:
#attrs[n] = functools.partial(methodWrapper, n)
#return type(name, bases, attrs)
#class WrapperClass(type):
#def __new__(cls, name, bases, attrs):
#fakes = []
#for n in ['__getitem__', '__setitem__']:
#if n not in attrs:
#attrs[n] = lambda self, *args: getattr(self, n)(*args)
#fakes.append(n)
#print fakes
#typ = type(name, bases, attrs)
#typ.__faked_methods__ = fakes
#return typ
#def __init__(self, name, bases, attrs):
#print self.__faked_methods__
#for n in self.__faked_methods__:
#self.n = None
#class ThreadsafeWrapper(object):
#def __init__(self, obj):
#self.__TSW_object__ = obj
#def __wrapped_object__(self):
#return self.__TSW_object__
class ThreadsafeWrapper(object):
"""Wrapper that makes access to any object thread-safe (within reasonable limits).
Mostly tested for wrapping lists, dicts, etc.
NOTE: Do not instantiate directly; use threadsafe(obj) instead.
- all method calls and attribute/item accesses are protected by mutex
- optionally, attribute/item accesses may return protected objects
- can be manually locked for extended operations
"""
def __init__(self, obj, recursive=False, reentrant=True):
"""
If recursive is True, then sub-objects accessed from obj are wrapped threadsafe as well.
If reentrant is True, then the object can be locked multiple times from the same thread."""
self.__TSOwrapped_object__ = obj
if reentrant:
self.__TSOwrap_lock__ = Mutex(QtCore.QMutex.Recursive)
else:
self.__TSOwrap_lock__ = Mutex()
self.__TSOrecursive__ = recursive
self.__TSOreentrant__ = reentrant
self.__TSOwrapped_objs__ = {}
def lock(self, id=None):
self.__TSOwrap_lock__.lock(id=id)
def tryLock(self, timeout=None, id=None):
self.__TSOwrap_lock__.tryLock(timeout=timeout, id=id)
def unlock(self):
self.__TSOwrap_lock__.unlock()
def unwrap(self):
return self.__TSOwrapped_object__
def __safe_call__(self, fn, *args, **kargs):
obj = self.__wrapped_object__()
ret = getattr(obj, fn)(*args, **kargs)
return self.__wrap_object__(ret)
def __getattr__(self, attr):
#try:
#return object.__getattribute__(self, attr)
#except AttributeError:
with self.__TSOwrap_lock__:
val = getattr(self.__wrapped_object__(), attr)
#if callable(val):
#return self.__wrap_object__(val)
return self.__wrap_object__(val)
def __setattr__(self, attr, val):
if attr[:5] == '__TSO':
#return object.__setattr__(self, attr, val)
self.__dict__[attr] = val
return
with self.__TSOwrap_lock__:
return setattr(self.__wrapped_object__(), attr, val)
def __wrap_object__(self, obj):
if not self.__TSOrecursive__:
return obj
if obj.__class__ in [int, float, str, unicode, tuple]:
return obj
if id(obj) not in self.__TSOwrapped_objs__:
self.__TSOwrapped_objs__[id(obj)] = threadsafe(obj, recursive=self.__TSOrecursive__, reentrant=self.__TSOreentrant__)
return self.__TSOwrapped_objs__[id(obj)]
def __wrapped_object__(self):
#if isinstance(self.__TSOwrapped_object__, weakref.ref):
#return self.__TSOwrapped_object__()
#else:
return self.__TSOwrapped_object__
def mkMethodWrapper(name):
return lambda self, *args, **kargs: self.__safe_call__(name, *args, **kargs)
def threadsafe(obj, *args, **kargs):
"""Return a thread-safe wrapper around obj. (see ThreadsafeWrapper)
args and kargs are passed directly to ThreadsafeWrapper.__init__()
This factory function is necessary for wrapping special methods (like __getitem__)"""
if type(obj) in [int, float, str, unicode, tuple, type(None), bool]:
return obj
clsName = 'Threadsafe_' + obj.__class__.__name__
attrs = {}
ignore = set(['__new__', '__init__', '__class__', '__hash__', '__getattribute__', '__getattr__', '__setattr__'])
for n in dir(obj):
if not n.startswith('__') or n in ignore:
continue
v = getattr(obj, n)
if callable(v):
attrs[n] = mkMethodWrapper(n)
typ = type(clsName, (ThreadsafeWrapper,), attrs)
return typ(obj, *args, **kargs)
if __name__ == '__main__':
d = {'x': 3, 'y': [1,2,3,4], 'z': {'a': 3}, 'w': (1,2,3,4)}
t = threadsafe(d, recursive=True, reentrant=False)
\ No newline at end of file
Copyright (c) 2010 Jonathan Hartley
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holders, nor those of its contributors
may be used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Download and docs:
http://pypi.python.org/pypi/colorama
Development:
http://code.google.com/p/colorama
Discussion group:
https://groups.google.com/forum/#!forum/python-colorama
Description
===========
Makes ANSI escape character sequences for producing colored terminal text and
cursor positioning work under MS Windows.
ANSI escape character sequences have long been used to produce colored terminal
text and cursor positioning on Unix and Macs. Colorama makes this work on
Windows, too, by wrapping stdout, stripping ANSI sequences it finds (which
otherwise show up as gobbledygook in your output), and converting them into the
appropriate win32 calls to modify the state of the terminal. On other platforms,
Colorama does nothing.
Colorama also provides some shortcuts to help generate ANSI sequences
but works fine in conjunction with any other ANSI sequence generation library,
such as Termcolor (http://pypi.python.org/pypi/termcolor.)
This has the upshot of providing a simple cross-platform API for printing
colored terminal text from Python, and has the happy side-effect that existing
applications or libraries which use ANSI sequences to produce colored output on
Linux or Macs can now also work on Windows, simply by calling
``colorama.init()``.
An alternative approach is to install 'ansi.sys' on Windows machines, which
provides the same behaviour for all applications running in terminals. Colorama
is intended for situations where that isn't easy (e.g. maybe your app doesn't
have an installer.)