processes.py 13.9 KB
Newer Older
1
from remoteproxy import RemoteEventHandler, ExitError, NoResultError, LocalObjectProxy, ObjectProxy
2
import subprocess, atexit, os, sys, time, random, socket, signal
3
4
5
import cPickle as pickle
import multiprocessing.connection

6
7
__all__ = ['Process', 'QtProcess', 'ForkedProcess', 'ExitError', 'NoResultError']

8
class Process(RemoteEventHandler):
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    """
    Bases: RemoteEventHandler
    
    This class is used to spawn and control a new python interpreter.
    It uses subprocess.Popen to start the new process and communicates with it
    using multiprocessing.Connection objects over a network socket.
    
    By default, the remote process will immediately enter an event-processing
    loop that carries out requests send from the parent process.
    
    Remote control works mainly through proxy objects::
    
        proc = Process()              ## starts process, returns handle
        rsys = proc._import('sys')    ## asks remote process to import 'sys', returns
                                      ## a proxy which references the imported module
        rsys.stdout.write('hello\n')  ## This message will be printed from the remote 
                                      ## process. Proxy objects can usually be used
                                      ## exactly as regular objects are.
        proc.close()                  ## Request the remote process shut down
    
    Requests made via proxy objects may be synchronous or asynchronous and may
    return objects either by proxy or by value (if they are picklable). See
    ProxyObject for more information.
    """
    
    def __init__(self, name=None, target=None, copySysPath=True):
        """
        ============  =============================================================
        Arguments:
        name          Optional name for this process used when printing messages
                      from the remote process.
        target        Optional function to call after starting remote process. 
                      By default, this is startEventLoop(), which causes the remote
                      process to process requests from the parent process until it
                      is asked to quit. If you wish to specify a different target,
                      it must be picklable (bound methods are not).
        copySysPath   If true, copy the contents of sys.path to the remote process
        ============  =============================================================
        
        """
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
        if target is None:
            target = startEventLoop
        if name is None:
            name = str(self)
        
        ## random authentication key
        authkey = ''.join([chr(random.getrandbits(7)) for i in range(20)])
        
        ## Listen for connection from remote process (and find free port number)
        port = 10000
        while True:
            try:
                l = multiprocessing.connection.Listener(('localhost', int(port)), authkey=authkey)
                break
            except socket.error as ex:
                if ex.errno != 98:
                    raise
                port += 1
        
        ## start remote process, instruct it to run target function
69
70
71
72
73
74
        sysPath = sys.path if copySysPath else None
        bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py'))
        self.proc = subprocess.Popen((sys.executable, bootstrap), stdin=subprocess.PIPE)
        targetStr = pickle.dumps(target)  ## double-pickle target so that child has a chance to 
                                          ## set its sys.path properly before unpickling the target
        pickle.dump((name+'_child', port, authkey, targetStr, sysPath), self.proc.stdin)
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
        self.proc.stdin.close()
        
        ## open connection for remote process
        conn = l.accept()
        RemoteEventHandler.__init__(self, conn, name+'_parent', pid=self.proc.pid)
        
        atexit.register(self.join)
        
    def join(self, timeout=10):
        if self.proc.poll() is None:
            self.close()
            start = time.time()
            while self.proc.poll() is None:
                if timeout is not None and time.time() - start > timeout:
                    raise Exception('Timed out waiting for remote process to end.')
                time.sleep(0.05)
        
        
def startEventLoop(name, port, authkey):
    conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
    global HANDLER
    HANDLER = RemoteEventHandler(conn, name, os.getppid())
    while True:
        try:
            HANDLER.processRequests()  # exception raised when the loop should exit
            time.sleep(0.01)
        except ExitError:
            break


class ForkedProcess(RemoteEventHandler):
    """
    ForkedProcess is a substitute for Process that uses os.fork() to generate a new process.
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
    This is much faster than starting a completely new interpreter and child processes
    automatically have a copy of the entire program state from before the fork. This
    makes it an appealing approach when parallelizing expensive computations. (see
    also Parallelizer)
    
    However, fork() comes with some caveats and limitations:

    - fork() is not available on Windows.
    - It is not possible to have a QApplication in both parent and child process
      (unless both QApplications are created _after_ the call to fork())
      Attempts by the forked process to access Qt GUI elements created by the parent
      will most likely cause the child to crash.
    - Likewise, database connections are unlikely to function correctly in a forked child.
    - Threads are not copied by fork(); the new process 
      will have only one thread that starts wherever fork() was called in the parent process.
    - Forked processes are unceremoniously terminated when join() is called; they are not 
      given any opportunity to clean up. (This prevents them calling any cleanup code that
      was only intended to be used by the parent process)
    - Normally when fork()ing, open file handles are shared with the parent process, 
      which is potentially dangerous. ForkedProcess is careful to close all file handles 
      that are not explicitly needed--stdout, stderr, and a single pipe to the parent 
      process.
      
131
132
    """
    
Luke Campagnola's avatar
Luke Campagnola committed
133
    def __init__(self, name=None, target=0, preProxy=None, randomReseed=True):
134
135
136
137
138
139
140
141
142
143
        """
        When initializing, an optional target may be given. 
        If no target is specified, self.eventLoop will be used.
        If None is given, no target will be called (and it will be up 
        to the caller to properly shut down the forked process)
        
        preProxy may be a dict of values that will appear as ObjectProxy
        in the remote process (but do not need to be sent explicitly since 
        they are available immediately before the call to fork().
        Proxies will be availabe as self.proxies[name].
Luke Campagnola's avatar
Luke Campagnola committed
144
145
146
        
        If randomReseed is True, the built-in random and numpy.random generators
        will be reseeded in the child process.
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
        """
        self.hasJoined = False
        if target == 0:
            target = self.eventLoop
        if name is None:
            name = str(self)
        
        conn, remoteConn = multiprocessing.Pipe()
        
        proxyIDs = {}
        if preProxy is not None:
            for k, v in preProxy.iteritems():
                proxyId = LocalObjectProxy.registerObject(v)
                proxyIDs[k] = proxyId
        
        pid = os.fork()
        if pid == 0:
            self.isParent = False
165
166
167
168
169
170
171
            ## We are now in the forked process; need to be extra careful what we touch while here.
            ##   - no reading/writing file handles/sockets owned by parent process (stdout is ok)
            ##   - don't touch QtGui or QApplication at all; these are landmines.
            ##   - don't let the process call exit handlers
            ##   -  
            
            ## close all file handles we do not want shared with parent
172
173
            conn.close()
            sys.stdin.close()  ## otherwise we screw with interactive prompts.
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
            fid = remoteConn.fileno()
            os.closerange(3, fid)
            os.closerange(fid+1, 4096) ## just guessing on the maximum descriptor count..
            
            ## Override any custom exception hooks
            def excepthook(*args):
                import traceback
                traceback.print_exception(*args)
            sys.excepthook = excepthook 
            
            ## Make it harder to access QApplication instance
            if 'PyQt4.QtGui' in sys.modules:
                sys.modules['PyQt4.QtGui'].QApplication = None
            sys.modules.pop('PyQt4.QtGui', None)
            sys.modules.pop('PyQt4.QtCore', None)
            
            ## sabotage atexit callbacks
            atexit._exithandlers = []
            atexit.register(lambda: os._exit(0))
            
Luke Campagnola's avatar
Luke Campagnola committed
194
195
196
197
198
            if randomReseed:
                if 'numpy.random' in sys.modules:
                    sys.modules['numpy.random'].seed(os.getpid() ^ int(time.time()*10000%10000))
                if 'random' in sys.modules:
                    sys.modules['random'].seed(os.getpid() ^ int(time.time()*10000%10000))
199
            
200
            RemoteEventHandler.__init__(self, remoteConn, name+'_child', pid=os.getppid())
201
            
202
203
204
205
            ppid = os.getppid()
            self.forkedProxies = {}
            for name, proxyId in proxyIDs.iteritems():
                self.forkedProxies[name] = ObjectProxy(ppid, proxyId=proxyId, typeStr=repr(preProxy[name]))
206
207
208
209
            
            if target is not None:
                target()
                
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
        else:
            self.isParent = True
            self.childPid = pid
            remoteConn.close()
            RemoteEventHandler.handlers = {}  ## don't want to inherit any of this from the parent.
            
            RemoteEventHandler.__init__(self, conn, name+'_parent', pid=pid)
            atexit.register(self.join)
        
        
    def eventLoop(self):
        while True:
            try:
                self.processRequests()  # exception raised when the loop should exit
                time.sleep(0.01)
            except ExitError:
226
                break
227
228
229
            except:
                print "Error occurred in forked event loop:"
                sys.excepthook(*sys.exc_info())
230
        sys.exit(0)
231
232
233
234
235
236
237
        
    def join(self, timeout=10):
        if self.hasJoined:
            return
        #os.kill(pid, 9)  
        try:
            self.close(callSync='sync', timeout=timeout, noCleanup=True)  ## ask the child process to exit and require that it return a confirmation.
238
            os.waitpid(self.childPid, 0)
239
240
241
242
        except IOError:  ## probably remote process has already quit
            pass  
        self.hasJoined = True

243
244
245
246
247
248
249
250
    def kill(self):
        """Immediately kill the forked remote process. 
        This is generally safe because forked processes are already
        expected to _avoid_ any cleanup at exit."""
        os.kill(self.childPid, signal.SIGKILL)
        self.hasJoined = True
        
        
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273

##Special set of subclasses that implement a Qt event loop instead.
        
class RemoteQtEventHandler(RemoteEventHandler):
    def __init__(self, *args, **kwds):
        RemoteEventHandler.__init__(self, *args, **kwds)
        
    def startEventTimer(self):
        from pyqtgraph.Qt import QtGui, QtCore
        self.timer = QtCore.QTimer()
        self.timer.timeout.connect(self.processRequests)
        self.timer.start(10)
    
    def processRequests(self):
        try:
            RemoteEventHandler.processRequests(self)
        except ExitError:
            from pyqtgraph.Qt import QtGui, QtCore
            QtGui.QApplication.instance().quit()
            self.timer.stop()
            #raise

class QtProcess(Process):
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
    """
    QtProcess is essentially the same as Process, with two major differences:
    
    - The remote process starts by running startQtEventLoop() which creates a 
      QApplication in the remote process and uses a QTimer to trigger
      remote event processing. This allows the remote process to have its own 
      GUI.
    - A QTimer is also started on the parent process which polls for requests
      from the child process. This allows Qt signals emitted within the child 
      process to invoke slots on the parent process and vice-versa.
      
    Example::
    
        proc = QtProcess()            
        rQtGui = proc._import('PyQt4.QtGui')
        btn = rQtGui.QPushButton('button on child process')
        btn.show()
        
        def slot():
            print 'slot invoked on parent process'
        btn.clicked.connect(proxy(slot))   # be sure to send a proxy of the slot
    """
    
    def __init__(self, **kwds):
        if 'target' not in kwds:
            kwds['target'] = startQtEventLoop
        Process.__init__(self, **kwds)
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
        self.startEventTimer()
        
    def startEventTimer(self):
        from pyqtgraph.Qt import QtGui, QtCore  ## avoid module-level import to keep bootstrap snappy.
        self.timer = QtCore.QTimer()
        app = QtGui.QApplication.instance()
        if app is None:
            raise Exception("Must create QApplication before starting QtProcess")
        self.timer.timeout.connect(self.processRequests)
        self.timer.start(10)
        
    def processRequests(self):
        try:
            Process.processRequests(self)
        except ExitError:
            self.timer.stop()
    
def startQtEventLoop(name, port, authkey):
    conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
    from pyqtgraph.Qt import QtGui, QtCore
    #from PyQt4 import QtGui, QtCore
    app = QtGui.QApplication.instance()
    #print app
    if app is None:
        app = QtGui.QApplication([])
        app.setQuitOnLastWindowClosed(False)  ## generally we want the event loop to stay open 
                                              ## until it is explicitly closed by the parent process.
    
    global HANDLER
    HANDLER = RemoteQtEventHandler(conn, name, os.getppid())
    HANDLER.startEventTimer()
    app.exec_()