Compare commits

..

7 Commits

1
.gitignore vendored

@ -1 +1,2 @@
*.pyc *.pyc
test.dat

@ -1,20 +1,27 @@
import socket, ssl, msgpack, tempfile, os import socket, ssl, msgpack, tempfile, os
from collections import deque from collections import deque
from bitstring import BitArray
from filelike import FileLike from filelike import FileLike
class BaseClient: class BaseClient:
_tempbuff = b"" # Overridable settings
_read_left = 0 max_mem = 32 * 1024 * 1024 # Maximum amount of memory per RAM-based temp file
_read_buff = b"" chunk_size = 1024 # Size per chunk of raw datastream
_tempfiles = {} recv_size = 1024 # Amount of data to receive at once
_last_datastream_id = 10
_active_datastreams = {}
_filelike_counter = 0
max_mem = 32 * 1024 * 1024
reactor = None
def __init__(self, host=None, port=None, use_ssl=False, allowed_certs=None, conn=None, source=None, **kwargs): def __init__(self, host=None, port=None, use_ssl=False, allowed_certs=None, conn=None, source=None, **kwargs):
# Internal variables
self._tempbuff = b""
self._read_left = 0
self._read_buff = b""
self._tempfiles = {}
self._last_datastream_id = 10
self._active_datastreams = {}
self._filelike_counter = 0
self._datastream_started = []
# Parent reactor
self.reactor = None
self.objtype = "client" self.objtype = "client"
self.sendq = deque([]) self.sendq = deque([])
@ -36,6 +43,7 @@ class BaseClient:
self.stream = sock self.stream = sock
self.stream.connect((self.host, self.port)) self.stream.connect((self.host, self.port))
self.event_connected() self.event_connected()
elif conn is not None: elif conn is not None:
@ -50,12 +58,30 @@ class BaseClient:
self.stream.send(chunk) self.stream.send(chunk)
def _encode_header(self, chunktype, size, channel): def _encode_header(self, chunktype, size, channel):
header_type = BitArray(uint=chunktype, length=7) bits = ""
header_size = BitArray(uint=size, length=25) bits += bin(chunktype)[2:].zfill(7)
header_channel = BitArray(uint=channel, length=24) bits += bin(size)[2:].zfill(25)
header = header_type + header_size + header_channel bits += bin(channel)[2:].zfill(24)
return header.bytes
header = b""
for i in xrange(0, 7):
header += chr(int(bits[i*8:(i+1)*8], 2))
return header
def _decode_header(self, header):
bits = ""
for i in xrange(0, len(header)):
bits += bin(ord(header[i]))[2:].zfill(8)
chunktype = int(bits[:7], 2) # Bits 0-6 inc
chunksize = int(bits[7:32], 2) # Bits 7-31 inc
channel = int(bits[32:56], 2) # Bits 32-55 inc
return (chunktype, chunksize, channel)
def _pack(self, data): def _pack(self, data):
return msgpack.packb(data, default=self._encode_pack) return msgpack.packb(data, default=self._encode_pack)
@ -85,13 +111,15 @@ class BaseClient:
self._create_tempfile(datastream_id) self._create_tempfile(datastream_id)
obj = self._tempfiles[datastream_id] obj = self._tempfiles[datastream_id]
obj._total_size = size obj._total_size = size
self._datastream_started.append(datastream_id)
self.event_datastream_start(obj, size)
return obj return obj
def _create_datastream(self, obj): def _create_datastream(self, obj):
datastream_id = self._get_datastream_id() datastream_id = self._get_datastream_id()
self._active_datastreams[datastream_id] = obj self._active_datastreams[datastream_id] = obj
#print "Datastream created on ID %d." % datastream_id
return datastream_id return datastream_id
def _get_datastream_id(self): def _get_datastream_id(self):
@ -116,16 +144,118 @@ class BaseClient:
obj = self._tempfiles[datastream_id] obj = self._tempfiles[datastream_id]
obj.write(data) obj.write(data)
obj._bytes_finished += len(data) obj._bytes_finished += len(data)
self.event_datastream_progress(obj, obj._bytes_finished, obj._total_size)
if datastream_id in self._datastream_started:
self.event_datastream_progress(obj, obj._bytes_finished, obj._total_size)
def _send_system_message(self, data): def _send_system_message(self, data):
encoded = self._pack(data) encoded = self._pack(data)
header = self._encode_header(3, len(encoded), 1) header = self._encode_header(3, len(encoded), 1)
self.sendq.append(header + encoded) self.sendq.append(header + encoded)
def _read_cycle(self):
fileno = self.stream.fileno()
while True:
try:
buff = self.stream.recv(self.recv_size)
break
except ssl.SSLError, err:
if err.args[0] == ssl.SSL_ERROR_WANT_READ:
select.select([self.stream], [], [])
elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
select.select([], [self.stream], [])
else:
raise
if not buff:
# The client has ceased to exist - most likely it has closed the connection.
del self.reactor.objmap[fileno]
self.reactor.queue.remove(self.stream)
self.event_disconnected()
buff = self._tempbuff + buff
self._tempbuff = b""
while buff != b"":
if self._read_left > 0:
# Continue reading a chunk in progress
if self._read_left <= len(buff):
# All the data we need is in the buffer.
self._read_buff += buff[:self._read_left]
buff = buff[self._read_left:]
self._read_left = 0
self._process_chunk(self._chunktype, self._channel, self._read_buff)
self._read_buff = b""
else:
# We need to read more data than is in the buffer.
self._read_buff += buff
self._read_left -= len(buff)
buff = b""
elif len(buff) >= 7:
# Start reading a new chunk
header = buff[:7]
chunktype, chunksize, channel = self._decode_header(header)
buff = buff[7:]
self._read_left = chunksize
self._chunksize = chunksize
self._chunktype = chunktype
self._channel = channel
else:
# We need more data to do anything meaningful
self._tempbuff = buff
buff = b""
def _process_chunk(self, chunktype, channel, data):
if chunktype == 1:
# Client message
self.event_receive(self._unpack(data))
elif chunktype == 2:
# Datastream chunk
self._receive_datastream(channel, data)
elif chunktype == 3:
# System message
self._process_system_message(msgpack.unpackb(data))
def _process_system_message(self, data):
if data['type'] == "datastream_finished":
datastream_id = data['id']
self.event_datastream_finished(self._tempfiles[datastream_id])
self._datastream_started.remove(datastream_id)
del self._tempfiles[datastream_id]
def _write_cycle(self):
if len(self.sendq) > 0:
item = self.sendq.popleft()
self._send_chunk(item)
if len(self._active_datastreams) > 0:
to_delete = []
for datastream_id, datastream in self._active_datastreams.iteritems():
data = datastream.read(self.chunk_size)
if data:
header = self._encode_header(2, len(data), datastream_id)
self._send_chunk(header + data)
else:
# Done with this datastream.
self._send_system_message({"type": "datastream_finished", "id": datastream_id})
to_delete.append(datastream_id)
for datastream_id in to_delete:
del self._active_datastreams[datastream_id]
def send(self, data): def send(self, data):
encoded = self._pack(data) encoded = self._pack(data)
header = self._encode_header(1, len(encoded), 1) header = self._encode_header(1, len(encoded), 0)
self.sendq.append(header + encoded) self.sendq.append(header + encoded)
def event_connected(self): def event_connected(self):
@ -137,6 +267,9 @@ class BaseClient:
def event_receive(self, data): def event_receive(self, data):
pass pass
def event_datastream_start(self, stream, total_bytes):
pass
def event_datastream_progress(self, stream, finished_bytes, total_bytes): def event_datastream_progress(self, stream, finished_bytes, total_bytes):
pass pass

@ -1,4 +1,4 @@
import os import os, shutil
class FileLike: class FileLike:
_pos = 0 _pos = 0
@ -21,3 +21,6 @@ class FileLike:
def tell(self): def tell(self):
return self._file.tell() return self._file.tell()
def copy(self, destination):
shutil.copyfileobj(self, open(destination, "wb"), 4096)

@ -1,4 +1,3 @@
from bitstring import BitArray
import select, msgpack import select, msgpack
class ReactorException(Exception): class ReactorException(Exception):
@ -7,139 +6,52 @@ class ReactorException(Exception):
class Reactor: class Reactor:
queue = [] queue = []
objmap = {} objmap = {}
abort_flag = False _abort_flag = False
recv_size = 1024
#chunk_size = 512 * 1024
chunk_size = 512
def __init__(self): def __init__(self):
pass pass
def _process_chunk(self, obj, chunktype, channel, data): def _cycle(self):
if chunktype == 1:
# Client message
obj.event_receive(obj._unpack(data))
elif chunktype == 2:
# Datastream chunk
obj._receive_datastream(channel, data)
elif chunktype == 3:
# System message
self._process_message(obj, msgpack.unpackb(data))
def _process_message(self, client, data):
if data['type'] == "datastream_finished":
datastream_id = data['id']
#print "Successfully finished receiving datastream %d." % datastream_id
#print client._tempfiles[datastream_id].read()
client.event_datastream_finished(client._tempfiles[datastream_id])
del client._tempfiles[datastream_id]
def add(self, obj):
try:
self.queue.append(obj.stream)
self.objmap[obj.stream.fileno()] = obj
except AttributeError, e:
raise ReactorException("The provided object has no valid stream attached to it.")
obj.reactor = self
def cycle(self):
readable, writable, error = select.select(self.queue, self.queue, self.queue) readable, writable, error = select.select(self.queue, self.queue, self.queue)
for stream in readable: for stream in readable:
fileno = stream.fileno() fileno = stream.fileno()
obj = self.objmap[fileno] obj = self.objmap[fileno]
#print "Data is available to read on %s with id %d" % (repr(stream), stream.fileno())
if obj.objtype == "server": if obj.objtype == "server":
sock, addr = obj.stream.accept() sock, addr = obj.stream.accept()
new_client = obj.spawn_client(sock, addr) new_client = obj.spawn_client(sock, addr)
self.add(new_client) self.add(new_client)
print "Found new client from %s:%d" % addr print "Found new client from %s:%d" % addr
elif obj.objtype == "client": elif obj.objtype == "client":
while True: obj._read_cycle()
try:
buff = stream.recv(self.recv_size)
break
except ssl.SSLError, err:
if err.args[0] == ssl.SSL_ERROR_WANT_READ:
select.select([sock], [], [])
elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
select.select([], [sock], [])
else:
raise
if not buff:
# The client has ceased to exist - most likely it has closed the connection.
del self.objmap[fileno]
self.queue.remove(stream)
print "Client disconnected: %s:%d" % (obj.host, obj.port)
buff = obj._tempbuff + buff
while buff != b"":
if obj._read_left > 0:
# Continue reading a chunk in progress
if obj._read_left < self.recv_size:
# All the data we need is in the buffer.
obj._read_buff += buff[:obj._read_left]
buff = buff[obj._read_left:]
obj._read_left = 0
self._process_chunk(obj, obj._chunktype, obj._channel, obj._read_buff)
obj._read_buff = b""
else:
# We need to read more data than is in the buffer.
obj._read_buff += buff
obj._read_left -= len(buff)
buff = b""
elif len(buff) >= 7:
# Start reading a new chunk
header = BitArray(bytes=buff[:7]) # 7 byte chunk header
chunktype = header[:7].uint # Bits 0-6 inc
chunksize = header[7:32].uint # Bits 7-31 inc
channel = header[32:56].uint # Bits 32-55 inc
buff = buff[7:]
obj._read_left = chunksize
obj._chunksize = chunksize
obj._chunktype = chunktype
obj._channel = channel
else:
# We need more data to do anything meaningful
obj._tempbuff = buff
break
for stream in writable: for stream in writable:
fileno = stream.fileno() fileno = stream.fileno()
obj = self.objmap[fileno]
try:
obj = self.objmap[fileno]
except KeyError, e:
# The client has disconnected. Skip to the next stream.
continue
if obj.objtype == "client": if obj.objtype == "client":
if len(obj.sendq) > 0: obj._write_cycle()
item = obj.sendq.popleft()
obj._send_chunk(item) def add(self, obj):
try:
if len(obj._active_datastreams) > 0: self.queue.append(obj.stream)
to_delete = [] self.objmap[obj.stream.fileno()] = obj
except AttributeError, e:
for datastream_id, datastream in obj._active_datastreams.iteritems(): raise ReactorException("The provided object has no valid stream attached to it.")
data = datastream.read(self.chunk_size)
obj.reactor = self
if data:
header = obj._encode_header(2, len(data), datastream_id)
stream.send(header + data)
else:
# Done with this datastream.
obj._send_system_message({"type": "datastream_finished", "id": datastream_id})
to_delete.append(datastream_id)
for datastream_id in to_delete:
del obj._active_datastreams[datastream_id]
def loop(self): def loop(self):
while self.abort_flag == False: while self._abort_flag == False:
self.cycle() self._cycle()
def stop(self): def stop(self):
self.abort_flag = True self._abort_flag = True

@ -0,0 +1,12 @@
from setuptools import setup
setup(name='pyreactor',
version='0.1',
description='Simple evented networking library, designed for custom protocols.',
author='Sven Slootweg',
author_email='pyreactor@cryto.net',
url='http://cryto.net/pyreactor',
packages=['pyreactor'],
provides=['pyreactor'],
install_requires=['msgpack-python']
)

@ -1,27 +1,16 @@
import pyreactor import pyreactor, time
from testclient import TestClient
import cProfile as profile
class TestClient(pyreactor.BaseClient): def main():
def event_receive(self, data): s = pyreactor.Server("127.0.0.1", 4006, TestClient)
print "Received message: %s" % repr(data) c = TestClient(host="127.0.0.1", port=4006)
def event_datastream_progress(self, stream, finished_bytes, total_bytes):
print "Completed %d of %d bytes for stream %d." % (finished_bytes, total_bytes, stream.id)
def event_datastream_finished(self, stream):
print "Completed stream %d! Data follows..." % stream.id
print "============================================"
print stream.read()
print "============================================"
print "Exiting..."
self.reactor.stop()
s = pyreactor.Server("127.0.0.1", 4006, TestClient) c.send({"test": "just sending some test data...", "number": 41, "file": open("testdata.dat", "rb")})
c = TestClient(host="127.0.0.1", port=4006)
c.send({"test": "just sending some test data...", "number": 41, "file": open("test.py", "r")}) reactor = pyreactor.Reactor()
reactor.add(s)
reactor.add(c)
reactor.loop()
reactor = pyreactor.Reactor() profile.run("main()", None, "time")
reactor.add(s)
reactor.add(c)
reactor.loop()

@ -0,0 +1,10 @@
import pyreactor, time
from testclient import TestClient
c = TestClient(host="kerpia.cryto.net", port=4006)
c.send({"test": "just sending some test data...", "number": 41, "file": open("testdata.dat", "rb")})
reactor = pyreactor.Reactor()
reactor.add(c)
reactor.loop()

@ -0,0 +1,8 @@
import pyreactor, time
from testclient import TestClient
s = pyreactor.Server("0.0.0.0", 4006, TestClient)
reactor = pyreactor.Reactor()
reactor.add(s)
reactor.loop()

@ -0,0 +1,34 @@
import pyreactor, time
class TestClient(pyreactor.BaseClient):
chunk_size = 8192
def event_receive(self, data):
print "Received message: %s" % repr(data)
def event_datastream_start(self, stream, size):
print "Starting transfer of file with size %d..." % size
stream.start_time = time.time()
stream.last_measure_time = stream.start_time
stream.last_measure_bytes = 0
stream.last_measure_speed = 0
def event_datastream_progress(self, stream, finished_bytes, total_bytes):
cur_time = time.time()
if int(cur_time) > int(stream.last_measure_time):
speed = (finished_bytes - stream.last_measure_bytes) / (cur_time - stream.last_measure_time)
stream.last_measure_time = cur_time
stream.last_measure_bytes = finished_bytes
stream.last_measure_speed = speed
print "Completed %d of %d bytes for stream %d. (%.2fMB/sec)" % (finished_bytes, total_bytes, stream.id, speed / 1024 / 1024)
def event_datastream_finished(self, stream):
print "Completed stream %d in %.2f seconds! Saved to file test.dat." % (stream.id, time.time() - stream.start_time)
stream.copy("test.dat")
print "Exiting..."
self.reactor.stop()
def event_disconnected(self):
print "Client disconnected."

Binary file not shown.
Loading…
Cancel
Save