[ create a new paste ] login | about

Link: http://codepad.org/VvCWAl0O    [ raw code | fork ]

Python, pasted on Jan 3:
#!/usr/bin/python -i

from subprocess import Popen, PIPE

try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO  # NOQA

from circuits.io import File, Write
from circuits import handler, Component, Debugger, Event, Manager


class Hello(Event):
    """Hello Event"""


class Process(Component):

    channel = "process"

    def init(self, args, cwd=None, shell=False):
        self.p = Popen(
            args,
            cwd=cwd,
            shell=shell,
            stdin=PIPE,
            stderr=PIPE,
            stdout=PIPE
        )

        self.stderr = StringIO()
        self.stdout = StringIO()

        File(
            self.p.stdin,
            channel="{0:d}.stdin".format(self.p.pid)
        ).register(self)

        File(
            self.p.stderr,
            channel="{0:d}.stderr".format(self.p.pid)
        ).register(self)

        File(
            self.p.stdout,
            channel="{0:d}.stdout".format(self.p.pid)
        ).register(self)

        self.addHandler(
            handler("read", channel="{0:d}.stderr".format(self.p.pid))(
                self.__class__._on_stderr_read
            )
        )

        self.addHandler(
            handler("read", channel="{0:d}.stdout".format(self.p.pid))(
                self.__class__._on_stdout_read
            )
        )

    @staticmethod
    def _on_stderr_read(self, data):
        self.stderr.write(data)

    @staticmethod
    def _on_stdout_read(self, data):
        self.stdout.write(data)

    def write(self, data):
        self.fire(Write(data), "{0:d}.stdin".format(self.p.pid))

    def kill(self):
        self.p.kill()

    def terminate(self):
        self.p.terminate()

    def signal(self, signal):
        self.p.send_signal(signal)

    def wait(self):
        return self.p.wait()


m = Manager() + Debugger()
m.start()


Create a new paste based on this one


Comments: