Compare commits

...

7 Commits

1
.gitignore vendored

@ -1 +1,2 @@
*.pyc
test.dat

@ -1,20 +1,27 @@
import socket, ssl, msgpack, tempfile, os
from collections import deque
from bitstring import BitArray
from filelike import FileLike
class BaseClient:
_tempbuff = b""
_read_left = 0
_read_buff = b""
_tempfiles = {}
_last_datastream_id = 10
_active_datastreams = {}
_filelike_counter = 0
max_mem = 32 * 1024 * 1024
reactor = None
# Overridable settings
max_mem = 32 * 1024 * 1024 # Maximum amount of memory per RAM-based temp file
chunk_size = 1024 # Size per chunk of raw datastream
recv_size = 1024 # Amount of data to receive at once
def __init__(self, host=None, port=None, use_ssl=False, allowed_certs=None, conn=None, source=None, **kwargs):
# Internal variables
self._tempbuff = b""
self._read_left = 0
self._read_buff = b""
self._tempfiles = {}
self._last_datastream_id = 10
self._active_datastreams = {}
self._filelike_counter = 0
self._datastream_started = []
# Parent reactor
self.reactor = None
self.objtype = "client"
self.sendq = deque([])
@ -36,6 +43,7 @@ class BaseClient:
self.stream = sock
self.stream.connect((self.host, self.port))
self.event_connected()
elif conn is not None:
@ -50,12 +58,30 @@ class BaseClient:
self.stream.send(chunk)
def _encode_header(self, chunktype, size, channel):
header_type = BitArray(uint=chunktype, length=7)
header_size = BitArray(uint=size, length=25)
header_channel = BitArray(uint=channel, length=24)
header = header_type + header_size + header_channel
return header.bytes
bits = ""
bits += bin(chunktype)[2:].zfill(7)
bits += bin(size)[2:].zfill(25)
bits += bin(channel)[2:].zfill(24)
header = b""
for i in xrange(0, 7):
header += chr(int(bits[i*8:(i+1)*8], 2))
return header
def _decode_header(self, header):
bits = ""
for i in xrange(0, len(header)):
bits += bin(ord(header[i]))[2:].zfill(8)
chunktype = int(bits[:7], 2) # Bits 0-6 inc
chunksize = int(bits[7:32], 2) # Bits 7-31 inc
channel = int(bits[32:56], 2) # Bits 32-55 inc
return (chunktype, chunksize, channel)
def _pack(self, data):
return msgpack.packb(data, default=self._encode_pack)
@ -85,13 +111,15 @@ class BaseClient:
self._create_tempfile(datastream_id)
obj = self._tempfiles[datastream_id]
obj._total_size = size
self._datastream_started.append(datastream_id)
self.event_datastream_start(obj, size)
return obj
def _create_datastream(self, obj):
datastream_id = self._get_datastream_id()
self._active_datastreams[datastream_id] = obj
#print "Datastream created on ID %d." % datastream_id
return datastream_id
def _get_datastream_id(self):
@ -116,16 +144,118 @@ class BaseClient:
obj = self._tempfiles[datastream_id]
obj.write(data)
obj._bytes_finished += len(data)
self.event_datastream_progress(obj, obj._bytes_finished, obj._total_size)
if datastream_id in self._datastream_started:
self.event_datastream_progress(obj, obj._bytes_finished, obj._total_size)
def _send_system_message(self, data):
encoded = self._pack(data)
header = self._encode_header(3, len(encoded), 1)
self.sendq.append(header + encoded)
def _read_cycle(self):
fileno = self.stream.fileno()
while True:
try:
buff = self.stream.recv(self.recv_size)
break
except ssl.SSLError, err:
if err.args[0] == ssl.SSL_ERROR_WANT_READ:
select.select([self.stream], [], [])
elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
select.select([], [self.stream], [])
else:
raise
if not buff:
# The client has ceased to exist - most likely it has closed the connection.
del self.reactor.objmap[fileno]
self.reactor.queue.remove(self.stream)
self.event_disconnected()
buff = self._tempbuff + buff
self._tempbuff = b""
while buff != b"":
if self._read_left > 0:
# Continue reading a chunk in progress
if self._read_left <= len(buff):
# All the data we need is in the buffer.
self._read_buff += buff[:self._read_left]
buff = buff[self._read_left:]
self._read_left = 0
self._process_chunk(self._chunktype, self._channel, self._read_buff)
self._read_buff = b""
else:
# We need to read more data than is in the buffer.
self._read_buff += buff
self._read_left -= len(buff)
buff = b""
elif len(buff) >= 7:
# Start reading a new chunk
header = buff[:7]
chunktype, chunksize, channel = self._decode_header(header)
buff = buff[7:]
self._read_left = chunksize
self._chunksize = chunksize
self._chunktype = chunktype
self._channel = channel
else:
# We need more data to do anything meaningful
self._tempbuff = buff
buff = b""
def _process_chunk(self, chunktype, channel, data):
if chunktype == 1:
# Client message
self.event_receive(self._unpack(data))
elif chunktype == 2:
# Datastream chunk
self._receive_datastream(channel, data)
elif chunktype == 3:
# System message
self._process_system_message(msgpack.unpackb(data))
def _process_system_message(self, data):
if data['type'] == "datastream_finished":
datastream_id = data['id']
self.event_datastream_finished(self._tempfiles[datastream_id])
self._datastream_started.remove(datastream_id)
del self._tempfiles[datastream_id]
def _write_cycle(self):
if len(self.sendq) > 0:
item = self.sendq.popleft()
self._send_chunk(item)
if len(self._active_datastreams) > 0:
to_delete = []
for datastream_id, datastream in self._active_datastreams.iteritems():
data = datastream.read(self.chunk_size)
if data:
header = self._encode_header(2, len(data), datastream_id)
self._send_chunk(header + data)
else:
# Done with this datastream.
self._send_system_message({"type": "datastream_finished", "id": datastream_id})
to_delete.append(datastream_id)
for datastream_id in to_delete:
del self._active_datastreams[datastream_id]
def send(self, data):
encoded = self._pack(data)
header = self._encode_header(1, len(encoded), 1)
header = self._encode_header(1, len(encoded), 0)
self.sendq.append(header + encoded)
def event_connected(self):
@ -137,6 +267,9 @@ class BaseClient:
def event_receive(self, data):
pass
def event_datastream_start(self, stream, total_bytes):
pass
def event_datastream_progress(self, stream, finished_bytes, total_bytes):
pass

@ -1,4 +1,4 @@
import os
import os, shutil
class FileLike:
_pos = 0
@ -21,3 +21,6 @@ class FileLike:
def tell(self):
return self._file.tell()
def copy(self, destination):
shutil.copyfileobj(self, open(destination, "wb"), 4096)

@ -1,4 +1,3 @@
from bitstring import BitArray
import select, msgpack
class ReactorException(Exception):
@ -7,139 +6,52 @@ class ReactorException(Exception):
class Reactor:
queue = []
objmap = {}
abort_flag = False
recv_size = 1024
#chunk_size = 512 * 1024
chunk_size = 512
_abort_flag = False
def __init__(self):
pass
def _process_chunk(self, obj, chunktype, channel, data):
if chunktype == 1:
# Client message
obj.event_receive(obj._unpack(data))
elif chunktype == 2:
# Datastream chunk
obj._receive_datastream(channel, data)
elif chunktype == 3:
# System message
self._process_message(obj, msgpack.unpackb(data))
def _process_message(self, client, data):
if data['type'] == "datastream_finished":
datastream_id = data['id']
#print "Successfully finished receiving datastream %d." % datastream_id
#print client._tempfiles[datastream_id].read()
client.event_datastream_finished(client._tempfiles[datastream_id])
del client._tempfiles[datastream_id]
def add(self, obj):
try:
self.queue.append(obj.stream)
self.objmap[obj.stream.fileno()] = obj
except AttributeError, e:
raise ReactorException("The provided object has no valid stream attached to it.")
obj.reactor = self
def cycle(self):
def _cycle(self):
readable, writable, error = select.select(self.queue, self.queue, self.queue)
for stream in readable:
fileno = stream.fileno()
obj = self.objmap[fileno]
#print "Data is available to read on %s with id %d" % (repr(stream), stream.fileno())
if obj.objtype == "server":
sock, addr = obj.stream.accept()
new_client = obj.spawn_client(sock, addr)
self.add(new_client)
print "Found new client from %s:%d" % addr
elif obj.objtype == "client":
while True:
try:
buff = stream.recv(self.recv_size)
break
except ssl.SSLError, err:
if err.args[0] == ssl.SSL_ERROR_WANT_READ:
select.select([sock], [], [])
elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
select.select([], [sock], [])
else:
raise
if not buff:
# The client has ceased to exist - most likely it has closed the connection.
del self.objmap[fileno]
self.queue.remove(stream)
print "Client disconnected: %s:%d" % (obj.host, obj.port)
buff = obj._tempbuff + buff
while buff != b"":
if obj._read_left > 0:
# Continue reading a chunk in progress
if obj._read_left < self.recv_size:
# All the data we need is in the buffer.
obj._read_buff += buff[:obj._read_left]
buff = buff[obj._read_left:]
obj._read_left = 0
self._process_chunk(obj, obj._chunktype, obj._channel, obj._read_buff)
obj._read_buff = b""
else:
# We need to read more data than is in the buffer.
obj._read_buff += buff
obj._read_left -= len(buff)
buff = b""
elif len(buff) >= 7:
# Start reading a new chunk
header = BitArray(bytes=buff[:7]) # 7 byte chunk header
chunktype = header[:7].uint # Bits 0-6 inc
chunksize = header[7:32].uint # Bits 7-31 inc
channel = header[32:56].uint # Bits 32-55 inc
buff = buff[7:]
obj._read_left = chunksize
obj._chunksize = chunksize
obj._chunktype = chunktype
obj._channel = channel
else:
# We need more data to do anything meaningful
obj._tempbuff = buff
break
obj._read_cycle()
for stream in writable:
fileno = stream.fileno()
obj = self.objmap[fileno]
try:
obj = self.objmap[fileno]
except KeyError, e:
# The client has disconnected. Skip to the next stream.
continue
if obj.objtype == "client":
if len(obj.sendq) > 0:
item = obj.sendq.popleft()
obj._send_chunk(item)
if len(obj._active_datastreams) > 0:
to_delete = []
for datastream_id, datastream in obj._active_datastreams.iteritems():
data = datastream.read(self.chunk_size)
if data:
header = obj._encode_header(2, len(data), datastream_id)
stream.send(header + data)
else:
# Done with this datastream.
obj._send_system_message({"type": "datastream_finished", "id": datastream_id})
to_delete.append(datastream_id)
for datastream_id in to_delete:
del obj._active_datastreams[datastream_id]
obj._write_cycle()
def add(self, obj):
try:
self.queue.append(obj.stream)
self.objmap[obj.stream.fileno()] = obj
except AttributeError, e:
raise ReactorException("The provided object has no valid stream attached to it.")
obj.reactor = self
def loop(self):
while self.abort_flag == False:
self.cycle()
while self._abort_flag == False:
self._cycle()
def stop(self):
self.abort_flag = True
self._abort_flag = True

@ -0,0 +1,12 @@
from setuptools import setup
setup(name='pyreactor',
version='0.1',
description='Simple evented networking library, designed for custom protocols.',
author='Sven Slootweg',
author_email='pyreactor@cryto.net',
url='http://cryto.net/pyreactor',
packages=['pyreactor'],
provides=['pyreactor'],
install_requires=['msgpack-python']
)

@ -1,27 +1,16 @@
import pyreactor
import pyreactor, time
from testclient import TestClient
import cProfile as profile
class TestClient(pyreactor.BaseClient):
def event_receive(self, data):
print "Received message: %s" % repr(data)
def event_datastream_progress(self, stream, finished_bytes, total_bytes):
print "Completed %d of %d bytes for stream %d." % (finished_bytes, total_bytes, stream.id)
def event_datastream_finished(self, stream):
print "Completed stream %d! Data follows..." % stream.id
print "============================================"
print stream.read()
print "============================================"
print "Exiting..."
self.reactor.stop()
def main():
s = pyreactor.Server("127.0.0.1", 4006, TestClient)
c = TestClient(host="127.0.0.1", port=4006)
s = pyreactor.Server("127.0.0.1", 4006, TestClient)
c = TestClient(host="127.0.0.1", port=4006)
c.send({"test": "just sending some test data...", "number": 41, "file": open("testdata.dat", "rb")})
c.send({"test": "just sending some test data...", "number": 41, "file": open("test.py", "r")})
reactor = pyreactor.Reactor()
reactor.add(s)
reactor.add(c)
reactor.loop()
reactor = pyreactor.Reactor()
reactor.add(s)
reactor.add(c)
reactor.loop()
profile.run("main()", None, "time")

@ -0,0 +1,10 @@
import pyreactor, time
from testclient import TestClient
c = TestClient(host="kerpia.cryto.net", port=4006)
c.send({"test": "just sending some test data...", "number": 41, "file": open("testdata.dat", "rb")})
reactor = pyreactor.Reactor()
reactor.add(c)
reactor.loop()

@ -0,0 +1,8 @@
import pyreactor, time
from testclient import TestClient
s = pyreactor.Server("0.0.0.0", 4006, TestClient)
reactor = pyreactor.Reactor()
reactor.add(s)
reactor.loop()

@ -0,0 +1,34 @@
import pyreactor, time
class TestClient(pyreactor.BaseClient):
chunk_size = 8192
def event_receive(self, data):
print "Received message: %s" % repr(data)
def event_datastream_start(self, stream, size):
print "Starting transfer of file with size %d..." % size
stream.start_time = time.time()
stream.last_measure_time = stream.start_time
stream.last_measure_bytes = 0
stream.last_measure_speed = 0
def event_datastream_progress(self, stream, finished_bytes, total_bytes):
cur_time = time.time()
if int(cur_time) > int(stream.last_measure_time):
speed = (finished_bytes - stream.last_measure_bytes) / (cur_time - stream.last_measure_time)
stream.last_measure_time = cur_time
stream.last_measure_bytes = finished_bytes
stream.last_measure_speed = speed
print "Completed %d of %d bytes for stream %d. (%.2fMB/sec)" % (finished_bytes, total_bytes, stream.id, speed / 1024 / 1024)
def event_datastream_finished(self, stream):
print "Completed stream %d in %.2f seconds! Saved to file test.dat." % (stream.id, time.time() - stream.start_time)
stream.copy("test.dat")
print "Exiting..."
self.reactor.stop()
def event_disconnected(self):
print "Client disconnected."

Binary file not shown.
Loading…
Cancel
Save