[ create a new paste ] login | about

Link: http://codepad.org/TWa68adr    [ raw code | fork ]

Python, pasted on Feb 5:
#  socketlibevent.py - MIT License
#  phoenix@burninglabs.com
#
#  Non-blocking socket I/O for Stackless Python using libevent, via pyevent.
#
#  Usage:
#      import sys, socketlibevent; sys.modules['socket'] = socketlibevent
#
#  Based on Richard Tew's stacklesssocket module.
#  Uses Dug Song's pyevent.
#
#  Thanks a Heap !


import stackless
import sys
import errno
import weakref
import signal
import socket as stdsocket

try:
    import event
except:
    try:
        import rel; rel.override()
        import event
    except:
        print "please install libevent and pyevent"
                # http://code.google.com/p/pyevent/
        print "(or 'stackless ez_setup.py rel' for quick testing)"
                # http://code.google.com/p/registeredeventlistener/
        sys.exit()

# For SSL support, this module uses the 'ssl' module (built in from 2.6 up):
#               ('back-port' for Python < 2.6: http://pypi.python.org/pypi/ssl/)
try:
    import ssl as ssl_
    ssl_enabled = True
except:
    ssl_enabled = False

# import everything from the regular socket module
if __name__ == '__main__':
    globals().update(stdsocket.__dict__)
    __name__ = '__main__'
else:
    globals().update(stdsocket.__dict__)

_GLOBAL_DEFAULT_TIMEOUT = 2
setdefaulttimeout(_GLOBAL_DEFAULT_TIMEOUT)

class _fileobject(stdsocket._fileobject):  
    def close(self):
        try:
            if self._sock:
                self.flush()
        finally:
            if self._close:
                self._sock.close()
            self._sock = None

# simple decorator to run a function in a tasklet
def tasklet(task):
    def run(*args, **kwargs):
        stackless.tasklet(task)(*args, **kwargs)
    return run


# Event Loop Management
loop_running = False
sockets = weakref.WeakValueDictionary()

@tasklet
def eventLoop():
    global loop_running
    global event_errors
    
    while len(sockets) is not 0:
        # If there are other tasklets scheduled:
        #     use the nonblocking loop
        # else: use the blocking loop
        event.loop( stackless.getruncount() > 2 ) # main tasklet + this one
        stackless.schedule()
        
    loop_running = False

def abort():
    print '\nKeyboardInterrupt'
    sys.exit()
    
def runEventLoop():
    global loop_running
    if not loop_running:
        event.init()
        event.signal(signal.SIGINT, abort)
        event.signal(signal.SIGQUIT, abort)
        eventLoop()
        loop_running = True


# Replacement Socket Module Functions
def socket(family=AF_INET, type=SOCK_STREAM, proto=0):
    return evsocket(stdsocket.socket(family, type, proto))

def create_connection(address, timeout=None):
    if timeout is None:
        timeout = getdefaulttimeout()
    s = socket()
    s.connect(address, timeout)
    return s

def ssl(sock, keyfile=None, certfile=None):
    if ssl_enabled:
        return evsocketssl(sock, keyfile, certfile)
    else:
        raise RuntimeError(\
            "SSL requires the 'ssl' module: 'http://pypi.python.org/pypi/ssl/'")
    

# Socket Proxy Class
class evsocket():
    # XXX Not all socketobject methods are implemented!
    # XXX Currently, the sockets are using the default, blocking mode.
    
    def __init__(self, sock):
        self.sock = sock
        sock.setblocking(0)
        self.accepting = False
        self.connected = False
        self.remote_addr = None
        self.fileobject = None
        self.read_channel = stackless.channel()
        self.write_channel = stackless.channel()
        self.accept_channel = None
        self.connect_channel = None
        global sockets
        sockets[id(self)] = self
        runEventLoop()
    
    def __getattr__(self, attr):
        return getattr(self.sock, attr)

    def listen(self, backlog=5):
        self.accepting = True
        return self.sock.listen(backlog)

    def accept(self):
        if not self.accept_channel:
            self.accept_channel = stackless.channel()
            event.event(self.handle_accept, handle=self.sock,
                        evtype=event.EV_READ | event.EV_PERSIST).add()
        return self.accept_channel.receive()

    @tasklet
    def handle_accept(self, ev, sock, event_type, *arg):
        try:
            s, a = self.sock.accept()
            s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1)
        except Exception, e:
            self.accept_channel.send_exception(type(e), *e.args)
        else:
            s = evsocket(s)
            self.accept_channel.send((s,a))

    @tasklet
    def handle_connect(self, ev, sock, event_type, *arg):
        if event_type == event.EV_TIMEOUT:
            self.connect_channel.send_exception(
                stdsocket.error, errno.ETIMEDOUT, errno.errorcode[errno.ETIMEDOUT])
        else:
            self.connect_channel.send(None)
            
    def connect(self, address, timeout=None):
        if timeout is None:
            timeout = getdefaulttimeout()
        err = self.sock.connect_ex(address)
        
        if err == 0:
            self.connected = True
            self.remote_addr = address
        elif err == errno.EINPROGRESS:
            if self.connect_channel is None:
                self.connect_channel = stackless.channel()
            event.event(self.handle_connect, handle=self.sock,
                        evtype=event.EV_WRITE).add(timeout)
            self.connect_channel.receive()
        else:
            raise stdsocket.error(err, errno.errorcode[err])

    def send(self, data, *args):
        event.write(self.sock, self.handle_send, data)
        return self.write_channel.receive()

    @tasklet
    def handle_send(self, data):
        if self.write_channel.balance < 0:
            self.write_channel.send(self.sock.send(data))

    def sendall(self, data, *args):
        while data:
            sent = self.send(data)
            data = data[sent:]

    def recv(self, bytes, *args):
        event.read(self.sock, self.handle_recv, bytes)
        return self.read_channel.receive()
    
    @tasklet
    def handle_recv(self, bytes):
        print 'recv check'
        if self.read_channel.balance < 0:
            print 'recv start'
            print self.read_channel.balance
            self.read_channel.send(self.sock.recv(bytes))
            print 'recv end'
    
    # TODO
    def recvfrom(self, bytes, *args):
        event.read(self.sock, self.handle_recv, bytes)
        return self.read_channel.receive()

    # TODO
    @tasklet
    def handle_recvfrom(self, bytes):
        self.read_channel.send(self.sock.recvfrom(bytes))

    def makefile(self, mode='r', bufsize=-1):
        if self.fileobject is None:
            self.close = lambda : None
        self.fileobject = _fileobject(self, mode, bufsize)
        return self.fileobject

    def close(self):
        self.sock.close()
        
# SSL Proxy Class
class evsocketssl(evsocket):
    def __init__(self, sock, keyfile=None, certfile=None):
        if certfile:
            server_side = True
        else:
            server_side = False
        
        # XXX This currently performs a BLOCKING handshake operation
        # TODO Implement a non-blocking handshake
        self.sock = ssl_.wrap_socket(sock, keyfile, certfile, server_side)

    @tasklet
    def handle_accept(self, ev, sock, event_type, *arg):
        s, a = self.sock.accept()
        s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1)
        s.setsockopt(stdsocket.IPPROTO_TCP, stdsocket.TCP_NODELAY, 1)
        s = evsocketssl(s)
        self.accept_channel.send((s,a))


if __name__ == "__main__":
    sys.modules["socket"] = __import__(__name__)

    import urllib2
    import gc
        
    @tasklet
    def test(url):
        print "url read", url
        print urllib2.urlopen(url).read(32)
    
    test('http://localhost')
    test('http://yahoo.com')
    
    stackless.run()
    #gc.get_count()
    


Create a new paste based on this one


Comments: