commit 150197513deb05021c160209eefe9346ed3c4a81 Author: Sven Slootweg Date: Fri Dec 21 22:50:58 2012 +0100 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0d20b64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/pyreactor/__init__.py b/pyreactor/__init__.py new file mode 100644 index 0000000..3b14c67 --- /dev/null +++ b/pyreactor/__init__.py @@ -0,0 +1,4 @@ +from reactor import Reactor +from baseclient import BaseClient +from server import Server +from filelike import FileLike diff --git a/pyreactor/baseclient.py b/pyreactor/baseclient.py new file mode 100644 index 0000000..d9a792d --- /dev/null +++ b/pyreactor/baseclient.py @@ -0,0 +1,144 @@ +import socket, ssl, msgpack, tempfile, os +from collections import deque +from bitstring import BitArray +from filelike import FileLike + +class BaseClient: + _tempbuff = b"" + _read_left = 0 + _read_buff = b"" + _tempfiles = {} + _last_datastream_id = 10 + _active_datastreams = {} + _filelike_counter = 0 + max_mem = 32 * 1024 * 1024 + reactor = None + + def __init__(self, host=None, port=None, use_ssl=False, allowed_certs=None, conn=None, source=None, **kwargs): + self.objtype = "client" + self.sendq = deque([]) + + if (host is None or port is None) and (conn is None or source is None): + raise Exception("You must specify either a connection and source address, or a hostname and port.") + + if host is not None: + # Treat this as a new client + self.host = host + self.port = port + self.ssl = use_ssl + self.spawned = False + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + if self.ssl == True: + self.stream = ssl.wrap_socket(sock, cert_reqs=ssl.CERT_REQUIRED, ca_certs=allowed_certs) + else: + self.stream = sock + + self.stream.connect((self.host, self.port)) + self.event_connected() + + elif conn is not None: + # Treat this as a client spawned by a server + self.host = source[0] + self.port = source[1] + self.stream = conn + self.spawned = True + self.event_connected() + + def _send_chunk(self, chunk): + self.stream.send(chunk) + + def _encode_header(self, chunktype, size, channel): + header_type = BitArray(uint=chunktype, length=7) + header_size = BitArray(uint=size, length=25) + header_channel = BitArray(uint=channel, length=24) + header = header_type + header_size + header_channel + return header.bytes + + def _pack(self, data): + return msgpack.packb(data, default=self._encode_pack) + + def _encode_pack(self, obj): + if hasattr(obj, "read"): + datastream_id = self._create_datastream(obj) + + # Determine the total size of the file + current_pos = obj.tell() + obj.seek(0, os.SEEK_END) + total_size = obj.tell() + obj.seek(current_pos) + + obj = {"__type__": "file", "__id__": datastream_id, "__size__": total_size} + + return obj + + def _unpack(self, data): + return msgpack.unpackb(data, object_hook=self._decode_unpack) + + def _decode_unpack(self, obj): + if "__type__" in obj: + if obj['__type__'] == "file": + # TODO: Validate item + datastream_id = obj['__id__'] + size = obj['__size__'] + self._create_tempfile(datastream_id) + obj = self._tempfiles[datastream_id] + obj._total_size = size + + return obj + + def _create_datastream(self, obj): + datastream_id = self._get_datastream_id() + self._active_datastreams[datastream_id] = obj + #print "Datastream created on ID %d." % datastream_id + return datastream_id + + def _get_datastream_id(self): + self._last_datastream_id += 1 + + if self._last_datastream_id > 10000: + self._last_datastream_id = 10 + + if self._last_datastream_id in self._active_datastreams: + return self._get_datastream_id() + + return self._last_datastream_id + + def _create_tempfile(self, datastream_id): + # This creates a temporary file for the specified datastream if it does not already exist. + if datastream_id not in self._tempfiles: + self._filelike_counter += 1 + self._tempfiles[datastream_id] = FileLike(tempfile.SpooledTemporaryFile(max_size=self.max_mem), self._filelike_counter) + + def _receive_datastream(self, datastream_id, data): + self._create_tempfile(datastream_id) + obj = self._tempfiles[datastream_id] + obj.write(data) + obj._bytes_finished += len(data) + self.event_datastream_progress(obj, obj._bytes_finished, obj._total_size) + + def _send_system_message(self, data): + encoded = self._pack(data) + header = self._encode_header(3, len(encoded), 1) + self.sendq.append(header + encoded) + + def send(self, data): + encoded = self._pack(data) + header = self._encode_header(1, len(encoded), 1) + self.sendq.append(header + encoded) + + def event_connected(self): + pass + + def event_disconnected(self): + pass + + def event_receive(self, data): + pass + + def event_datastream_progress(self, stream, finished_bytes, total_bytes): + pass + + def event_datastream_finished(self, stream): + pass diff --git a/pyreactor/filelike.py b/pyreactor/filelike.py new file mode 100644 index 0000000..d350c9b --- /dev/null +++ b/pyreactor/filelike.py @@ -0,0 +1,23 @@ +import os + +class FileLike: + _pos = 0 + _total_size = 0 + _bytes_finished = 0 + + def __init__(self, original, file_id): + self._file = original + self.id = file_id + + def write(self, str): + return self._file.write(str) + + def read(self, size=-1): + self._file.seek(self._pos) + data = self._file.read(size) + self._pos = self._file.tell() + self._file.seek(0, os.SEEK_END) + return data + + def tell(self): + return self._file.tell() diff --git a/pyreactor/reactor.py b/pyreactor/reactor.py new file mode 100644 index 0000000..f3013ce --- /dev/null +++ b/pyreactor/reactor.py @@ -0,0 +1,145 @@ +from bitstring import BitArray +import select, msgpack + +class ReactorException(Exception): + pass + +class Reactor: + queue = [] + objmap = {} + abort_flag = False + + recv_size = 1024 + #chunk_size = 512 * 1024 + chunk_size = 512 + + def __init__(self): + pass + + def _process_chunk(self, obj, chunktype, channel, data): + if chunktype == 1: + # Client message + obj.event_receive(obj._unpack(data)) + elif chunktype == 2: + # Datastream chunk + obj._receive_datastream(channel, data) + elif chunktype == 3: + # System message + self._process_message(obj, msgpack.unpackb(data)) + + def _process_message(self, client, data): + if data['type'] == "datastream_finished": + datastream_id = data['id'] + #print "Successfully finished receiving datastream %d." % datastream_id + #print client._tempfiles[datastream_id].read() + client.event_datastream_finished(client._tempfiles[datastream_id]) + del client._tempfiles[datastream_id] + + def add(self, obj): + try: + self.queue.append(obj.stream) + self.objmap[obj.stream.fileno()] = obj + except AttributeError, e: + raise ReactorException("The provided object has no valid stream attached to it.") + + obj.reactor = self + + def cycle(self): + readable, writable, error = select.select(self.queue, self.queue, self.queue) + + for stream in readable: + fileno = stream.fileno() + obj = self.objmap[fileno] + + if obj.objtype == "server": + sock, addr = obj.stream.accept() + new_client = obj.spawn_client(sock, addr) + self.add(new_client) + print "Found new client from %s:%d" % addr + elif obj.objtype == "client": + while True: + try: + buff = stream.recv(self.recv_size) + break + except ssl.SSLError, err: + if err.args[0] == ssl.SSL_ERROR_WANT_READ: + select.select([sock], [], []) + elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: + select.select([], [sock], []) + else: + raise + + if not buff: + # The client has ceased to exist - most likely it has closed the connection. + del self.objmap[fileno] + self.queue.remove(stream) + print "Client disconnected: %s:%d" % (obj.host, obj.port) + + buff = obj._tempbuff + buff + + while buff != b"": + if obj._read_left > 0: + # Continue reading a chunk in progress + if obj._read_left < self.recv_size: + # All the data we need is in the buffer. + obj._read_buff += buff[:obj._read_left] + buff = buff[obj._read_left:] + obj._read_left = 0 + + self._process_chunk(obj, obj._chunktype, obj._channel, obj._read_buff) + + obj._read_buff = b"" + else: + # We need to read more data than is in the buffer. + obj._read_buff += buff + obj._read_left -= len(buff) + buff = b"" + elif len(buff) >= 7: + # Start reading a new chunk + header = BitArray(bytes=buff[:7]) # 7 byte chunk header + chunktype = header[:7].uint # Bits 0-6 inc + chunksize = header[7:32].uint # Bits 7-31 inc + channel = header[32:56].uint # Bits 32-55 inc + + buff = buff[7:] + obj._read_left = chunksize + obj._chunksize = chunksize + obj._chunktype = chunktype + obj._channel = channel + else: + # We need more data to do anything meaningful + obj._tempbuff = buff + break + + for stream in writable: + fileno = stream.fileno() + obj = self.objmap[fileno] + + if obj.objtype == "client": + if len(obj.sendq) > 0: + item = obj.sendq.popleft() + obj._send_chunk(item) + + if len(obj._active_datastreams) > 0: + to_delete = [] + + for datastream_id, datastream in obj._active_datastreams.iteritems(): + data = datastream.read(self.chunk_size) + + if data: + header = obj._encode_header(2, len(data), datastream_id) + stream.send(header + data) + else: + # Done with this datastream. + obj._send_system_message({"type": "datastream_finished", "id": datastream_id}) + to_delete.append(datastream_id) + + for datastream_id in to_delete: + del obj._active_datastreams[datastream_id] + + def loop(self): + while self.abort_flag == False: + self.cycle() + + def stop(self): + self.abort_flag = True diff --git a/pyreactor/server.py b/pyreactor/server.py new file mode 100644 index 0000000..b2e79de --- /dev/null +++ b/pyreactor/server.py @@ -0,0 +1,35 @@ +import socket, ssl + +class Server: + reactor = None + + def __init__(self, interface, port, client_class, use_ssl=False, **kwargs): + self.interface = interface + self.port = port + self.objtype = "server" + self.ssl = use_ssl + self.client_class = client_class + + if self.ssl == True and (kwargs.haskey('certfile') == False or kwargs.hasfile('keyfile') == False): + raise Exception("SSL mode requires both a certificate and a keyfile.") + + try: + self.certificate = kwargs['certfile'] + self.keyfile = kwargs['keyfile'] + except KeyError, e: + pass + + self.stream = socket.socket() + self.stream.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.stream.bind((self.interface, self.port)) + self.stream.listen(5) + + def spawn_client(self, connection, source): + if self.ssl == True: + new_socket = ssl.wrap_socket(connection, server_side=True, certfile=self.certificate, keyfile=self.keyfile, ssl_version=ssl.PROTOCOL_TLSv1) + else: + new_socket = connection + + return self.client_class(conn=connection, source=source) + + diff --git a/test.py b/test.py new file mode 100644 index 0000000..f28d1f6 --- /dev/null +++ b/test.py @@ -0,0 +1,27 @@ +import pyreactor + +class TestClient(pyreactor.BaseClient): + def event_receive(self, data): + print "Received message: %s" % repr(data) + + def event_datastream_progress(self, stream, finished_bytes, total_bytes): + print "Completed %d of %d bytes for stream %d." % (finished_bytes, total_bytes, stream.id) + + def event_datastream_finished(self, stream): + print "Completed stream %d! Data follows..." % stream.id + print "============================================" + print stream.read() + + print "============================================" + print "Exiting..." + self.reactor.stop() + +s = pyreactor.Server("127.0.0.1", 4006, TestClient) +c = TestClient(host="127.0.0.1", port=4006) + +c.send({"test": "just sending some test data...", "number": 41, "file": open("test.py", "r")}) + +reactor = pyreactor.Reactor() +reactor.add(s) +reactor.add(c) +reactor.loop()