|
|
@ -1,20 +1,27 @@
|
|
|
|
import socket, ssl, msgpack, tempfile, os
|
|
|
|
import socket, ssl, msgpack, tempfile, os
|
|
|
|
from collections import deque
|
|
|
|
from collections import deque
|
|
|
|
from bitstring import BitArray
|
|
|
|
|
|
|
|
from filelike import FileLike
|
|
|
|
from filelike import FileLike
|
|
|
|
|
|
|
|
|
|
|
|
class BaseClient:
|
|
|
|
class BaseClient:
|
|
|
|
_tempbuff = b""
|
|
|
|
# Overridable settings
|
|
|
|
_read_left = 0
|
|
|
|
max_mem = 32 * 1024 * 1024 # Maximum amount of memory per RAM-based temp file
|
|
|
|
_read_buff = b""
|
|
|
|
chunk_size = 1024 # Size per chunk of raw datastream
|
|
|
|
_tempfiles = {}
|
|
|
|
recv_size = 1024 # Amount of data to receive at once
|
|
|
|
_last_datastream_id = 10
|
|
|
|
|
|
|
|
_active_datastreams = {}
|
|
|
|
|
|
|
|
_filelike_counter = 0
|
|
|
|
|
|
|
|
max_mem = 32 * 1024 * 1024
|
|
|
|
|
|
|
|
reactor = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, host=None, port=None, use_ssl=False, allowed_certs=None, conn=None, source=None, **kwargs):
|
|
|
|
def __init__(self, host=None, port=None, use_ssl=False, allowed_certs=None, conn=None, source=None, **kwargs):
|
|
|
|
|
|
|
|
# Internal variables
|
|
|
|
|
|
|
|
self._tempbuff = b""
|
|
|
|
|
|
|
|
self._read_left = 0
|
|
|
|
|
|
|
|
self._read_buff = b""
|
|
|
|
|
|
|
|
self._tempfiles = {}
|
|
|
|
|
|
|
|
self._last_datastream_id = 10
|
|
|
|
|
|
|
|
self._active_datastreams = {}
|
|
|
|
|
|
|
|
self._filelike_counter = 0
|
|
|
|
|
|
|
|
self._datastream_started = []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Parent reactor
|
|
|
|
|
|
|
|
self.reactor = None
|
|
|
|
|
|
|
|
|
|
|
|
self.objtype = "client"
|
|
|
|
self.objtype = "client"
|
|
|
|
self.sendq = deque([])
|
|
|
|
self.sendq = deque([])
|
|
|
|
|
|
|
|
|
|
|
@ -36,6 +43,7 @@ class BaseClient:
|
|
|
|
self.stream = sock
|
|
|
|
self.stream = sock
|
|
|
|
|
|
|
|
|
|
|
|
self.stream.connect((self.host, self.port))
|
|
|
|
self.stream.connect((self.host, self.port))
|
|
|
|
|
|
|
|
|
|
|
|
self.event_connected()
|
|
|
|
self.event_connected()
|
|
|
|
|
|
|
|
|
|
|
|
elif conn is not None:
|
|
|
|
elif conn is not None:
|
|
|
@ -50,11 +58,29 @@ class BaseClient:
|
|
|
|
self.stream.send(chunk)
|
|
|
|
self.stream.send(chunk)
|
|
|
|
|
|
|
|
|
|
|
|
def _encode_header(self, chunktype, size, channel):
|
|
|
|
def _encode_header(self, chunktype, size, channel):
|
|
|
|
header_type = BitArray(uint=chunktype, length=7)
|
|
|
|
bits = ""
|
|
|
|
header_size = BitArray(uint=size, length=25)
|
|
|
|
bits += bin(chunktype)[2:].zfill(7)
|
|
|
|
header_channel = BitArray(uint=channel, length=24)
|
|
|
|
bits += bin(size)[2:].zfill(25)
|
|
|
|
header = header_type + header_size + header_channel
|
|
|
|
bits += bin(channel)[2:].zfill(24)
|
|
|
|
return header.bytes
|
|
|
|
|
|
|
|
|
|
|
|
header = b""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for i in xrange(0, 7):
|
|
|
|
|
|
|
|
header += chr(int(bits[i*8:(i+1)*8], 2))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return header
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _decode_header(self, header):
|
|
|
|
|
|
|
|
bits = ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for i in xrange(0, len(header)):
|
|
|
|
|
|
|
|
bits += bin(ord(header[i]))[2:].zfill(8)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
chunktype = int(bits[:7], 2) # Bits 0-6 inc
|
|
|
|
|
|
|
|
chunksize = int(bits[7:32], 2) # Bits 7-31 inc
|
|
|
|
|
|
|
|
channel = int(bits[32:56], 2) # Bits 32-55 inc
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return (chunktype, chunksize, channel)
|
|
|
|
|
|
|
|
|
|
|
|
def _pack(self, data):
|
|
|
|
def _pack(self, data):
|
|
|
|
return msgpack.packb(data, default=self._encode_pack)
|
|
|
|
return msgpack.packb(data, default=self._encode_pack)
|
|
|
@ -86,12 +112,14 @@ class BaseClient:
|
|
|
|
obj = self._tempfiles[datastream_id]
|
|
|
|
obj = self._tempfiles[datastream_id]
|
|
|
|
obj._total_size = size
|
|
|
|
obj._total_size = size
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._datastream_started.append(datastream_id)
|
|
|
|
|
|
|
|
self.event_datastream_start(obj, size)
|
|
|
|
|
|
|
|
|
|
|
|
return obj
|
|
|
|
return obj
|
|
|
|
|
|
|
|
|
|
|
|
def _create_datastream(self, obj):
|
|
|
|
def _create_datastream(self, obj):
|
|
|
|
datastream_id = self._get_datastream_id()
|
|
|
|
datastream_id = self._get_datastream_id()
|
|
|
|
self._active_datastreams[datastream_id] = obj
|
|
|
|
self._active_datastreams[datastream_id] = obj
|
|
|
|
#print "Datastream created on ID %d." % datastream_id
|
|
|
|
|
|
|
|
return datastream_id
|
|
|
|
return datastream_id
|
|
|
|
|
|
|
|
|
|
|
|
def _get_datastream_id(self):
|
|
|
|
def _get_datastream_id(self):
|
|
|
@ -116,6 +144,8 @@ class BaseClient:
|
|
|
|
obj = self._tempfiles[datastream_id]
|
|
|
|
obj = self._tempfiles[datastream_id]
|
|
|
|
obj.write(data)
|
|
|
|
obj.write(data)
|
|
|
|
obj._bytes_finished += len(data)
|
|
|
|
obj._bytes_finished += len(data)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if datastream_id in self._datastream_started:
|
|
|
|
self.event_datastream_progress(obj, obj._bytes_finished, obj._total_size)
|
|
|
|
self.event_datastream_progress(obj, obj._bytes_finished, obj._total_size)
|
|
|
|
|
|
|
|
|
|
|
|
def _send_system_message(self, data):
|
|
|
|
def _send_system_message(self, data):
|
|
|
@ -123,9 +153,109 @@ class BaseClient:
|
|
|
|
header = self._encode_header(3, len(encoded), 1)
|
|
|
|
header = self._encode_header(3, len(encoded), 1)
|
|
|
|
self.sendq.append(header + encoded)
|
|
|
|
self.sendq.append(header + encoded)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _read_cycle(self):
|
|
|
|
|
|
|
|
fileno = self.stream.fileno()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
buff = self.stream.recv(self.recv_size)
|
|
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
except ssl.SSLError, err:
|
|
|
|
|
|
|
|
if err.args[0] == ssl.SSL_ERROR_WANT_READ:
|
|
|
|
|
|
|
|
select.select([self.stream], [], [])
|
|
|
|
|
|
|
|
elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
|
|
|
|
|
|
|
|
select.select([], [self.stream], [])
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not buff:
|
|
|
|
|
|
|
|
# The client has ceased to exist - most likely it has closed the connection.
|
|
|
|
|
|
|
|
del self.reactor.objmap[fileno]
|
|
|
|
|
|
|
|
self.reactor.queue.remove(self.stream)
|
|
|
|
|
|
|
|
self.event_disconnected()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
buff = self._tempbuff + buff
|
|
|
|
|
|
|
|
self._tempbuff = b""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while buff != b"":
|
|
|
|
|
|
|
|
if self._read_left > 0:
|
|
|
|
|
|
|
|
# Continue reading a chunk in progress
|
|
|
|
|
|
|
|
if self._read_left <= len(buff):
|
|
|
|
|
|
|
|
# All the data we need is in the buffer.
|
|
|
|
|
|
|
|
self._read_buff += buff[:self._read_left]
|
|
|
|
|
|
|
|
buff = buff[self._read_left:]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._read_left = 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._process_chunk(self._chunktype, self._channel, self._read_buff)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._read_buff = b""
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
# We need to read more data than is in the buffer.
|
|
|
|
|
|
|
|
self._read_buff += buff
|
|
|
|
|
|
|
|
self._read_left -= len(buff)
|
|
|
|
|
|
|
|
buff = b""
|
|
|
|
|
|
|
|
elif len(buff) >= 7:
|
|
|
|
|
|
|
|
# Start reading a new chunk
|
|
|
|
|
|
|
|
header = buff[:7]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
chunktype, chunksize, channel = self._decode_header(header)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
buff = buff[7:]
|
|
|
|
|
|
|
|
self._read_left = chunksize
|
|
|
|
|
|
|
|
self._chunksize = chunksize
|
|
|
|
|
|
|
|
self._chunktype = chunktype
|
|
|
|
|
|
|
|
self._channel = channel
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
# We need more data to do anything meaningful
|
|
|
|
|
|
|
|
self._tempbuff = buff
|
|
|
|
|
|
|
|
buff = b""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _process_chunk(self, chunktype, channel, data):
|
|
|
|
|
|
|
|
if chunktype == 1:
|
|
|
|
|
|
|
|
# Client message
|
|
|
|
|
|
|
|
self.event_receive(self._unpack(data))
|
|
|
|
|
|
|
|
elif chunktype == 2:
|
|
|
|
|
|
|
|
# Datastream chunk
|
|
|
|
|
|
|
|
self._receive_datastream(channel, data)
|
|
|
|
|
|
|
|
elif chunktype == 3:
|
|
|
|
|
|
|
|
# System message
|
|
|
|
|
|
|
|
self._process_system_message(msgpack.unpackb(data))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _process_system_message(self, data):
|
|
|
|
|
|
|
|
if data['type'] == "datastream_finished":
|
|
|
|
|
|
|
|
datastream_id = data['id']
|
|
|
|
|
|
|
|
self.event_datastream_finished(self._tempfiles[datastream_id])
|
|
|
|
|
|
|
|
self._datastream_started.remove(datastream_id)
|
|
|
|
|
|
|
|
del self._tempfiles[datastream_id]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _write_cycle(self):
|
|
|
|
|
|
|
|
if len(self.sendq) > 0:
|
|
|
|
|
|
|
|
item = self.sendq.popleft()
|
|
|
|
|
|
|
|
self._send_chunk(item)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if len(self._active_datastreams) > 0:
|
|
|
|
|
|
|
|
to_delete = []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for datastream_id, datastream in self._active_datastreams.iteritems():
|
|
|
|
|
|
|
|
data = datastream.read(self.chunk_size)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if data:
|
|
|
|
|
|
|
|
header = self._encode_header(2, len(data), datastream_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._send_chunk(header + data)
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
# Done with this datastream.
|
|
|
|
|
|
|
|
self._send_system_message({"type": "datastream_finished", "id": datastream_id})
|
|
|
|
|
|
|
|
to_delete.append(datastream_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for datastream_id in to_delete:
|
|
|
|
|
|
|
|
del self._active_datastreams[datastream_id]
|
|
|
|
|
|
|
|
|
|
|
|
def send(self, data):
|
|
|
|
def send(self, data):
|
|
|
|
encoded = self._pack(data)
|
|
|
|
encoded = self._pack(data)
|
|
|
|
header = self._encode_header(1, len(encoded), 1)
|
|
|
|
header = self._encode_header(1, len(encoded), 0)
|
|
|
|
self.sendq.append(header + encoded)
|
|
|
|
self.sendq.append(header + encoded)
|
|
|
|
|
|
|
|
|
|
|
|
def event_connected(self):
|
|
|
|
def event_connected(self):
|
|
|
@ -137,6 +267,9 @@ class BaseClient:
|
|
|
|
def event_receive(self, data):
|
|
|
|
def event_receive(self, data):
|
|
|
|
pass
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def event_datastream_start(self, stream, total_bytes):
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
def event_datastream_progress(self, stream, finished_bytes, total_bytes):
|
|
|
|
def event_datastream_progress(self, stream, finished_bytes, total_bytes):
|
|
|
|
pass
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|