Initial commit

master
Sven Slootweg 12 years ago
commit 150197513d

1
.gitignore vendored

@ -0,0 +1 @@
*.pyc

@ -0,0 +1,4 @@
from reactor import Reactor
from baseclient import BaseClient
from server import Server
from filelike import FileLike

@ -0,0 +1,144 @@
import socket, ssl, msgpack, tempfile, os
from collections import deque
from bitstring import BitArray
from filelike import FileLike
class BaseClient:
_tempbuff = b""
_read_left = 0
_read_buff = b""
_tempfiles = {}
_last_datastream_id = 10
_active_datastreams = {}
_filelike_counter = 0
max_mem = 32 * 1024 * 1024
reactor = None
def __init__(self, host=None, port=None, use_ssl=False, allowed_certs=None, conn=None, source=None, **kwargs):
self.objtype = "client"
self.sendq = deque([])
if (host is None or port is None) and (conn is None or source is None):
raise Exception("You must specify either a connection and source address, or a hostname and port.")
if host is not None:
# Treat this as a new client
self.host = host
self.port = port
self.ssl = use_ssl
self.spawned = False
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.ssl == True:
self.stream = ssl.wrap_socket(sock, cert_reqs=ssl.CERT_REQUIRED, ca_certs=allowed_certs)
else:
self.stream = sock
self.stream.connect((self.host, self.port))
self.event_connected()
elif conn is not None:
# Treat this as a client spawned by a server
self.host = source[0]
self.port = source[1]
self.stream = conn
self.spawned = True
self.event_connected()
def _send_chunk(self, chunk):
self.stream.send(chunk)
def _encode_header(self, chunktype, size, channel):
header_type = BitArray(uint=chunktype, length=7)
header_size = BitArray(uint=size, length=25)
header_channel = BitArray(uint=channel, length=24)
header = header_type + header_size + header_channel
return header.bytes
def _pack(self, data):
return msgpack.packb(data, default=self._encode_pack)
def _encode_pack(self, obj):
if hasattr(obj, "read"):
datastream_id = self._create_datastream(obj)
# Determine the total size of the file
current_pos = obj.tell()
obj.seek(0, os.SEEK_END)
total_size = obj.tell()
obj.seek(current_pos)
obj = {"__type__": "file", "__id__": datastream_id, "__size__": total_size}
return obj
def _unpack(self, data):
return msgpack.unpackb(data, object_hook=self._decode_unpack)
def _decode_unpack(self, obj):
if "__type__" in obj:
if obj['__type__'] == "file":
# TODO: Validate item
datastream_id = obj['__id__']
size = obj['__size__']
self._create_tempfile(datastream_id)
obj = self._tempfiles[datastream_id]
obj._total_size = size
return obj
def _create_datastream(self, obj):
datastream_id = self._get_datastream_id()
self._active_datastreams[datastream_id] = obj
#print "Datastream created on ID %d." % datastream_id
return datastream_id
def _get_datastream_id(self):
self._last_datastream_id += 1
if self._last_datastream_id > 10000:
self._last_datastream_id = 10
if self._last_datastream_id in self._active_datastreams:
return self._get_datastream_id()
return self._last_datastream_id
def _create_tempfile(self, datastream_id):
# This creates a temporary file for the specified datastream if it does not already exist.
if datastream_id not in self._tempfiles:
self._filelike_counter += 1
self._tempfiles[datastream_id] = FileLike(tempfile.SpooledTemporaryFile(max_size=self.max_mem), self._filelike_counter)
def _receive_datastream(self, datastream_id, data):
self._create_tempfile(datastream_id)
obj = self._tempfiles[datastream_id]
obj.write(data)
obj._bytes_finished += len(data)
self.event_datastream_progress(obj, obj._bytes_finished, obj._total_size)
def _send_system_message(self, data):
encoded = self._pack(data)
header = self._encode_header(3, len(encoded), 1)
self.sendq.append(header + encoded)
def send(self, data):
encoded = self._pack(data)
header = self._encode_header(1, len(encoded), 1)
self.sendq.append(header + encoded)
def event_connected(self):
pass
def event_disconnected(self):
pass
def event_receive(self, data):
pass
def event_datastream_progress(self, stream, finished_bytes, total_bytes):
pass
def event_datastream_finished(self, stream):
pass

@ -0,0 +1,23 @@
import os
class FileLike:
_pos = 0
_total_size = 0
_bytes_finished = 0
def __init__(self, original, file_id):
self._file = original
self.id = file_id
def write(self, str):
return self._file.write(str)
def read(self, size=-1):
self._file.seek(self._pos)
data = self._file.read(size)
self._pos = self._file.tell()
self._file.seek(0, os.SEEK_END)
return data
def tell(self):
return self._file.tell()

@ -0,0 +1,145 @@
from bitstring import BitArray
import select, msgpack
class ReactorException(Exception):
pass
class Reactor:
queue = []
objmap = {}
abort_flag = False
recv_size = 1024
#chunk_size = 512 * 1024
chunk_size = 512
def __init__(self):
pass
def _process_chunk(self, obj, chunktype, channel, data):
if chunktype == 1:
# Client message
obj.event_receive(obj._unpack(data))
elif chunktype == 2:
# Datastream chunk
obj._receive_datastream(channel, data)
elif chunktype == 3:
# System message
self._process_message(obj, msgpack.unpackb(data))
def _process_message(self, client, data):
if data['type'] == "datastream_finished":
datastream_id = data['id']
#print "Successfully finished receiving datastream %d." % datastream_id
#print client._tempfiles[datastream_id].read()
client.event_datastream_finished(client._tempfiles[datastream_id])
del client._tempfiles[datastream_id]
def add(self, obj):
try:
self.queue.append(obj.stream)
self.objmap[obj.stream.fileno()] = obj
except AttributeError, e:
raise ReactorException("The provided object has no valid stream attached to it.")
obj.reactor = self
def cycle(self):
readable, writable, error = select.select(self.queue, self.queue, self.queue)
for stream in readable:
fileno = stream.fileno()
obj = self.objmap[fileno]
if obj.objtype == "server":
sock, addr = obj.stream.accept()
new_client = obj.spawn_client(sock, addr)
self.add(new_client)
print "Found new client from %s:%d" % addr
elif obj.objtype == "client":
while True:
try:
buff = stream.recv(self.recv_size)
break
except ssl.SSLError, err:
if err.args[0] == ssl.SSL_ERROR_WANT_READ:
select.select([sock], [], [])
elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
select.select([], [sock], [])
else:
raise
if not buff:
# The client has ceased to exist - most likely it has closed the connection.
del self.objmap[fileno]
self.queue.remove(stream)
print "Client disconnected: %s:%d" % (obj.host, obj.port)
buff = obj._tempbuff + buff
while buff != b"":
if obj._read_left > 0:
# Continue reading a chunk in progress
if obj._read_left < self.recv_size:
# All the data we need is in the buffer.
obj._read_buff += buff[:obj._read_left]
buff = buff[obj._read_left:]
obj._read_left = 0
self._process_chunk(obj, obj._chunktype, obj._channel, obj._read_buff)
obj._read_buff = b""
else:
# We need to read more data than is in the buffer.
obj._read_buff += buff
obj._read_left -= len(buff)
buff = b""
elif len(buff) >= 7:
# Start reading a new chunk
header = BitArray(bytes=buff[:7]) # 7 byte chunk header
chunktype = header[:7].uint # Bits 0-6 inc
chunksize = header[7:32].uint # Bits 7-31 inc
channel = header[32:56].uint # Bits 32-55 inc
buff = buff[7:]
obj._read_left = chunksize
obj._chunksize = chunksize
obj._chunktype = chunktype
obj._channel = channel
else:
# We need more data to do anything meaningful
obj._tempbuff = buff
break
for stream in writable:
fileno = stream.fileno()
obj = self.objmap[fileno]
if obj.objtype == "client":
if len(obj.sendq) > 0:
item = obj.sendq.popleft()
obj._send_chunk(item)
if len(obj._active_datastreams) > 0:
to_delete = []
for datastream_id, datastream in obj._active_datastreams.iteritems():
data = datastream.read(self.chunk_size)
if data:
header = obj._encode_header(2, len(data), datastream_id)
stream.send(header + data)
else:
# Done with this datastream.
obj._send_system_message({"type": "datastream_finished", "id": datastream_id})
to_delete.append(datastream_id)
for datastream_id in to_delete:
del obj._active_datastreams[datastream_id]
def loop(self):
while self.abort_flag == False:
self.cycle()
def stop(self):
self.abort_flag = True

@ -0,0 +1,35 @@
import socket, ssl
class Server:
reactor = None
def __init__(self, interface, port, client_class, use_ssl=False, **kwargs):
self.interface = interface
self.port = port
self.objtype = "server"
self.ssl = use_ssl
self.client_class = client_class
if self.ssl == True and (kwargs.haskey('certfile') == False or kwargs.hasfile('keyfile') == False):
raise Exception("SSL mode requires both a certificate and a keyfile.")
try:
self.certificate = kwargs['certfile']
self.keyfile = kwargs['keyfile']
except KeyError, e:
pass
self.stream = socket.socket()
self.stream.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.stream.bind((self.interface, self.port))
self.stream.listen(5)
def spawn_client(self, connection, source):
if self.ssl == True:
new_socket = ssl.wrap_socket(connection, server_side=True, certfile=self.certificate, keyfile=self.keyfile, ssl_version=ssl.PROTOCOL_TLSv1)
else:
new_socket = connection
return self.client_class(conn=connection, source=source)

@ -0,0 +1,27 @@
import pyreactor
class TestClient(pyreactor.BaseClient):
def event_receive(self, data):
print "Received message: %s" % repr(data)
def event_datastream_progress(self, stream, finished_bytes, total_bytes):
print "Completed %d of %d bytes for stream %d." % (finished_bytes, total_bytes, stream.id)
def event_datastream_finished(self, stream):
print "Completed stream %d! Data follows..." % stream.id
print "============================================"
print stream.read()
print "============================================"
print "Exiting..."
self.reactor.stop()
s = pyreactor.Server("127.0.0.1", 4006, TestClient)
c = TestClient(host="127.0.0.1", port=4006)
c.send({"test": "just sending some test data...", "number": 41, "file": open("test.py", "r")})
reactor = pyreactor.Reactor()
reactor.add(s)
reactor.add(c)
reactor.loop()
Loading…
Cancel
Save