codepad
[
create a new paste
]
login
|
about
Language:
C
C++
D
Haskell
Lua
OCaml
PHP
Perl
Plain Text
Python
Ruby
Scheme
Tcl
#!/usr/bin/python -i from subprocess import Popen, PIPE try: from cStringIO import StringIO except ImportError: from StringIO import StringIO # NOQA from circuits.io import File, Write from circuits import handler, Component, Debugger, Event, Manager class Hello(Event): """Hello Event""" class Process(Component): channel = "process" def init(self, args, cwd=None, shell=False): self.p = Popen( args, cwd=cwd, shell=shell, stdin=PIPE, stderr=PIPE, stdout=PIPE ) self.stderr = StringIO() self.stdout = StringIO() File( self.p.stdin, channel="{0:d}.stdin".format(self.p.pid) ).register(self) File( self.p.stderr, channel="{0:d}.stderr".format(self.p.pid) ).register(self) File( self.p.stdout, channel="{0:d}.stdout".format(self.p.pid) ).register(self) self.addHandler( handler("read", channel="{0:d}.stderr".format(self.p.pid))( self.__class__._on_stderr_read ) ) self.addHandler( handler("read", channel="{0:d}.stdout".format(self.p.pid))( self.__class__._on_stdout_read ) ) @staticmethod def _on_stderr_read(self, data): self.stderr.write(data) @staticmethod def _on_stdout_read(self, data): self.stdout.write(data) def write(self, data): self.fire(Write(data), "{0:d}.stdin".format(self.p.pid)) def kill(self): self.p.kill() def terminate(self): self.p.terminate() def signal(self, signal): self.p.send_signal(signal) def wait(self): return self.p.wait() m = Manager() + Debugger() m.start()
Private
[
?
]
Run code
Submit