# socketlibevent.py - MIT License
# phoenix@burninglabs.com
#
# Non-blocking socket I/O for Stackless Python using libevent, via pyevent.
#
# Usage:
# import sys, socketlibevent; sys.modules['socket'] = socketlibevent
#
# Based on Richard Tew's stacklesssocket module.
# Uses Dug Song's pyevent.
#
# Thanks a Heap !
import stackless
import sys
import errno
import weakref
import signal
import socket as stdsocket
try:
import event
except:
try:
import rel; rel.override()
import event
except:
print "please install libevent and pyevent"
# http://code.google.com/p/pyevent/
print "(or 'stackless ez_setup.py rel' for quick testing)"
# http://code.google.com/p/registeredeventlistener/
sys.exit()
# For SSL support, this module uses the 'ssl' module (built in from 2.6 up):
# ('back-port' for Python < 2.6: http://pypi.python.org/pypi/ssl/)
try:
import ssl as ssl_
ssl_enabled = True
except:
ssl_enabled = False
# import everything from the regular socket module
if __name__ == '__main__':
globals().update(stdsocket.__dict__)
__name__ = '__main__'
else:
globals().update(stdsocket.__dict__)
_GLOBAL_DEFAULT_TIMEOUT = 2
setdefaulttimeout(_GLOBAL_DEFAULT_TIMEOUT)
class _fileobject(stdsocket._fileobject):
def close(self):
try:
if self._sock:
self.flush()
finally:
if self._close:
self._sock.close()
self._sock = None
# simple decorator to run a function in a tasklet
def tasklet(task):
def run(*args, **kwargs):
stackless.tasklet(task)(*args, **kwargs)
return run
# Event Loop Management
loop_running = False
sockets = weakref.WeakValueDictionary()
@tasklet
def eventLoop():
global loop_running
global event_errors
while len(sockets) is not 0:
# If there are other tasklets scheduled:
# use the nonblocking loop
# else: use the blocking loop
event.loop( stackless.getruncount() > 2 ) # main tasklet + this one
stackless.schedule()
loop_running = False
def abort():
print '\nKeyboardInterrupt'
sys.exit()
def runEventLoop():
global loop_running
if not loop_running:
event.init()
event.signal(signal.SIGINT, abort)
event.signal(signal.SIGQUIT, abort)
eventLoop()
loop_running = True
# Replacement Socket Module Functions
def socket(family=AF_INET, type=SOCK_STREAM, proto=0):
return evsocket(stdsocket.socket(family, type, proto))
def create_connection(address, timeout=None):
if timeout is None:
timeout = getdefaulttimeout()
s = socket()
s.connect(address, timeout)
return s
def ssl(sock, keyfile=None, certfile=None):
if ssl_enabled:
return evsocketssl(sock, keyfile, certfile)
else:
raise RuntimeError(\
"SSL requires the 'ssl' module: 'http://pypi.python.org/pypi/ssl/'")
# Socket Proxy Class
class evsocket():
# XXX Not all socketobject methods are implemented!
# XXX Currently, the sockets are using the default, blocking mode.
def __init__(self, sock):
self.sock = sock
sock.setblocking(0)
self.accepting = False
self.connected = False
self.remote_addr = None
self.fileobject = None
self.read_channel = stackless.channel()
self.write_channel = stackless.channel()
self.accept_channel = None
self.connect_channel = None
global sockets
sockets[id(self)] = self
runEventLoop()
def __getattr__(self, attr):
return getattr(self.sock, attr)
def listen(self, backlog=5):
self.accepting = True
return self.sock.listen(backlog)
def accept(self):
if not self.accept_channel:
self.accept_channel = stackless.channel()
event.event(self.handle_accept, handle=self.sock,
evtype=event.EV_READ | event.EV_PERSIST).add()
return self.accept_channel.receive()
@tasklet
def handle_accept(self, ev, sock, event_type, *arg):
try:
s, a = self.sock.accept()
s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1)
except Exception, e:
self.accept_channel.send_exception(type(e), *e.args)
else:
s = evsocket(s)
self.accept_channel.send((s,a))
@tasklet
def handle_connect(self, ev, sock, event_type, *arg):
if event_type == event.EV_TIMEOUT:
self.connect_channel.send_exception(
stdsocket.error, errno.ETIMEDOUT, errno.errorcode[errno.ETIMEDOUT])
else:
self.connect_channel.send(None)
def connect(self, address, timeout=None):
if timeout is None:
timeout = getdefaulttimeout()
err = self.sock.connect_ex(address)
if err == 0:
self.connected = True
self.remote_addr = address
elif err == errno.EINPROGRESS:
if self.connect_channel is None:
self.connect_channel = stackless.channel()
event.event(self.handle_connect, handle=self.sock,
evtype=event.EV_WRITE).add(timeout)
self.connect_channel.receive()
else:
raise stdsocket.error(err, errno.errorcode[err])
def send(self, data, *args):
event.write(self.sock, self.handle_send, data)
return self.write_channel.receive()
@tasklet
def handle_send(self, data):
if self.write_channel.balance < 0:
self.write_channel.send(self.sock.send(data))
def sendall(self, data, *args):
while data:
sent = self.send(data)
data = data[sent:]
def recv(self, bytes, *args):
event.read(self.sock, self.handle_recv, bytes)
return self.read_channel.receive()
@tasklet
def handle_recv(self, bytes):
print 'recv check'
if self.read_channel.balance < 0:
print 'recv start'
print self.read_channel.balance
self.read_channel.send(self.sock.recv(bytes))
print 'recv end'
# TODO
def recvfrom(self, bytes, *args):
event.read(self.sock, self.handle_recv, bytes)
return self.read_channel.receive()
# TODO
@tasklet
def handle_recvfrom(self, bytes):
self.read_channel.send(self.sock.recvfrom(bytes))
def makefile(self, mode='r', bufsize=-1):
if self.fileobject is None:
self.close = lambda : None
self.fileobject = _fileobject(self, mode, bufsize)
return self.fileobject
def close(self):
self.sock.close()
# SSL Proxy Class
class evsocketssl(evsocket):
def __init__(self, sock, keyfile=None, certfile=None):
if certfile:
server_side = True
else:
server_side = False
# XXX This currently performs a BLOCKING handshake operation
# TODO Implement a non-blocking handshake
self.sock = ssl_.wrap_socket(sock, keyfile, certfile, server_side)
@tasklet
def handle_accept(self, ev, sock, event_type, *arg):
s, a = self.sock.accept()
s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1)
s.setsockopt(stdsocket.IPPROTO_TCP, stdsocket.TCP_NODELAY, 1)
s = evsocketssl(s)
self.accept_channel.send((s,a))
if __name__ == "__main__":
sys.modules["socket"] = __import__(__name__)
import urllib2
import gc
@tasklet
def test(url):
print "url read", url
print urllib2.urlopen(url).read(32)
test('http://localhost')
test('http://yahoo.com')
stackless.run()
#gc.get_count()