opt
/
alt
/
python310
/
lib64
/
python3.10
/
multiprocessing
/
Go to Home Directory
+
Upload
Create File
root@0UT1S:~$
Execute
By Order of Mr.0UT1S
[DIR] ..
N/A
[DIR] __pycache__
N/A
[DIR] dummy
N/A
__init__.py
916 bytes
Rename
Delete
connection.py
30.94 KB
Rename
Delete
context.py
11.33 KB
Rename
Delete
forkserver.py
11.86 KB
Rename
Delete
heap.py
11.35 KB
Rename
Delete
managers.py
46.39 KB
Rename
Delete
pool.py
31.99 KB
Rename
Delete
popen_fork.py
2.32 KB
Rename
Delete
popen_forkserver.py
2.18 KB
Rename
Delete
popen_spawn_posix.py
1.98 KB
Rename
Delete
popen_spawn_win32.py
3.92 KB
Rename
Delete
process.py
11.81 KB
Rename
Delete
queues.py
11.74 KB
Rename
Delete
reduction.py
9.29 KB
Rename
Delete
resource_sharer.py
5.01 KB
Rename
Delete
resource_tracker.py
8.76 KB
Rename
Delete
shared_memory.py
18.03 KB
Rename
Delete
sharedctypes.py
6.16 KB
Rename
Delete
spawn.py
9.08 KB
Rename
Delete
synchronize.py
11.34 KB
Rename
Delete
util.py
13.70 KB
Rename
Delete
import errno import os import selectors import signal import socket import struct import sys import threading import warnings from . import connection from . import process from .context import reduction from . import resource_tracker from . import spawn from . import util __all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process', 'set_forkserver_preload'] # # # MAXFDS_TO_SEND = 256 SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t # # Forkserver class # class ForkServer(object): def __init__(self): self._forkserver_address = None self._forkserver_alive_fd = None self._forkserver_pid = None self._inherited_fds = None self._lock = threading.Lock() self._preload_modules = ['__main__'] def _stop(self): # Method used by unit tests to stop the server with self._lock: self._stop_unlocked() def _stop_unlocked(self): if self._forkserver_pid is None: return # close the "alive" file descriptor asks the server to stop os.close(self._forkserver_alive_fd) self._forkserver_alive_fd = None os.waitpid(self._forkserver_pid, 0) self._forkserver_pid = None if not util.is_abstract_socket_namespace(self._forkserver_address): os.unlink(self._forkserver_address) self._forkserver_address = None def set_forkserver_preload(self, modules_names): '''Set list of module names to try to load in forkserver process.''' if not all(type(mod) is str for mod in self._preload_modules): raise TypeError('module_names must be a list of strings') self._preload_modules = modules_names def get_inherited_fds(self): '''Return list of fds inherited from parent process. This returns None if the current process was not started by fork server. ''' return self._inherited_fds def connect_to_new_process(self, fds): '''Request forkserver to create a child process. Returns a pair of fds (status_r, data_w). The calling process can read the child process's pid and (eventually) its returncode from status_r. The calling process should write to data_w the pickled preparation and process data. ''' self.ensure_running() if len(fds) + 4 >= MAXFDS_TO_SEND: raise ValueError('too many fds') with socket.socket(socket.AF_UNIX) as client: client.connect(self._forkserver_address) parent_r, child_w = os.pipe() child_r, parent_w = os.pipe() allfds = [child_r, child_w, self._forkserver_alive_fd, resource_tracker.getfd()] allfds += fds try: reduction.sendfds(client, allfds) return parent_r, parent_w except: os.close(parent_r) os.close(parent_w) raise finally: os.close(child_r) os.close(child_w) def ensure_running(self): '''Make sure that a fork server is running. This can be called from any process. Note that usually a child process will just reuse the forkserver started by its parent, so ensure_running() will do nothing. ''' with self._lock: resource_tracker.ensure_running() if self._forkserver_pid is not None: # forkserver was launched before, is it still running? pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG) if not pid: # still alive return # dead, launch it again os.close(self._forkserver_alive_fd) self._forkserver_address = None self._forkserver_alive_fd = None self._forkserver_pid = None cmd = ('from multiprocessing.forkserver import main; ' + 'main(%d, %d, %r, **%r)') if self._preload_modules: desired_keys = {'main_path', 'sys_path'} data = spawn.get_preparation_data('ignore') data = {x: y for x, y in data.items() if x in desired_keys} else: data = {} with socket.socket(socket.AF_UNIX) as listener: address = connection.arbitrary_address('AF_UNIX') listener.bind(address) if not util.is_abstract_socket_namespace(address): os.chmod(address, 0o600) listener.listen() # all client processes own the write end of the "alive" pipe; # when they all terminate the read end becomes ready. alive_r, alive_w = os.pipe() try: fds_to_pass = [listener.fileno(), alive_r] cmd %= (listener.fileno(), alive_r, self._preload_modules, data) exe = spawn.get_executable() args = [exe] + util._args_from_interpreter_flags() args += ['-c', cmd] pid = util.spawnv_passfds(exe, args, fds_to_pass) except: os.close(alive_w) raise finally: os.close(alive_r) self._forkserver_address = address self._forkserver_alive_fd = alive_w self._forkserver_pid = pid # # # def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): '''Run forkserver.''' if preload: if '__main__' in preload and main_path is not None: process.current_process()._inheriting = True try: spawn.import_main_path(main_path) finally: del process.current_process()._inheriting for modname in preload: try: __import__(modname) except ImportError: pass util._close_stdin() sig_r, sig_w = os.pipe() os.set_blocking(sig_r, False) os.set_blocking(sig_w, False) def sigchld_handler(*_unused): # Dummy signal handler, doesn't do anything pass handlers = { # unblocking SIGCHLD allows the wakeup fd to notify our event loop signal.SIGCHLD: sigchld_handler, # protect the process from ^C signal.SIGINT: signal.SIG_IGN, } old_handlers = {sig: signal.signal(sig, val) for (sig, val) in handlers.items()} # calling os.write() in the Python signal handler is racy signal.set_wakeup_fd(sig_w) # map child pids to client fds pid_to_fd = {} with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \ selectors.DefaultSelector() as selector: _forkserver._forkserver_address = listener.getsockname() selector.register(listener, selectors.EVENT_READ) selector.register(alive_r, selectors.EVENT_READ) selector.register(sig_r, selectors.EVENT_READ) while True: try: while True: rfds = [key.fileobj for (key, events) in selector.select()] if rfds: break if alive_r in rfds: # EOF because no more client processes left assert os.read(alive_r, 1) == b'', "Not at EOF?" raise SystemExit if sig_r in rfds: # Got SIGCHLD os.read(sig_r, 65536) # exhaust while True: # Scan for child processes try: pid, sts = os.waitpid(-1, os.WNOHANG) except ChildProcessError: break if pid == 0: break child_w = pid_to_fd.pop(pid, None) if child_w is not None: returncode = os.waitstatus_to_exitcode(sts) # Send exit code to client process try: write_signed(child_w, returncode) except BrokenPipeError: # client vanished pass os.close(child_w) else: # This shouldn't happen really warnings.warn('forkserver: waitpid returned ' 'unexpected pid %d' % pid) if listener in rfds: # Incoming fork request with listener.accept()[0] as s: # Receive fds from client fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1) if len(fds) > MAXFDS_TO_SEND: raise RuntimeError( "Too many ({0:n}) fds to send".format( len(fds))) child_r, child_w, *fds = fds s.close() pid = os.fork() if pid == 0: # Child code = 1 try: listener.close() selector.close() unused_fds = [alive_r, child_w, sig_r, sig_w] unused_fds.extend(pid_to_fd.values()) code = _serve_one(child_r, fds, unused_fds, old_handlers) except Exception: sys.excepthook(*sys.exc_info()) sys.stderr.flush() finally: os._exit(code) else: # Send pid to client process try: write_signed(child_w, pid) except BrokenPipeError: # client vanished pass pid_to_fd[pid] = child_w os.close(child_r) for fd in fds: os.close(fd) except OSError as e: if e.errno != errno.ECONNABORTED: raise def _serve_one(child_r, fds, unused_fds, handlers): # close unnecessary stuff and reset signal handlers signal.set_wakeup_fd(-1) for sig, val in handlers.items(): signal.signal(sig, val) for fd in unused_fds: os.close(fd) (_forkserver._forkserver_alive_fd, resource_tracker._resource_tracker._fd, *_forkserver._inherited_fds) = fds # Run process object received over pipe parent_sentinel = os.dup(child_r) code = spawn._main(child_r, parent_sentinel) return code # # Read and write signed numbers # def read_signed(fd): data = b'' length = SIGNED_STRUCT.size while len(data) < length: s = os.read(fd, length - len(data)) if not s: raise EOFError('unexpected EOF') data += s return SIGNED_STRUCT.unpack(data)[0] def write_signed(fd, n): msg = SIGNED_STRUCT.pack(n) while msg: nbytes = os.write(fd, msg) if nbytes == 0: raise RuntimeError('should not get here') msg = msg[nbytes:] # # # _forkserver = ForkServer() ensure_running = _forkserver.ensure_running get_inherited_fds = _forkserver.get_inherited_fds connect_to_new_process = _forkserver.connect_to_new_process set_forkserver_preload = _forkserver.set_forkserver_preload
Save