You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
105 lines
2.6 KiB
Python
105 lines
2.6 KiB
Python
#!/usr/bin/env python2
|
|
|
|
import zmq, msgpack, yaml, zmqtimer, binascii, nacl, sys, socket
|
|
from nacl.public import PublicKey, PrivateKey, Box
|
|
|
|
ctx = zmq.Context()
|
|
|
|
distributor = ctx.socket(zmq.PUB)
|
|
distributor.bind("tcp://127.0.0.1:8998")
|
|
|
|
poller = zmq.Poller()
|
|
|
|
with open("config.yaml", "r") as cfile:
|
|
config = yaml.safe_load(cfile)
|
|
|
|
with open("privkey.dat", "r") as f:
|
|
privkey = PrivateKey(binascii.unhexlify(f.read()))
|
|
|
|
nodes = config["nodes"]
|
|
last_node_status = {}
|
|
socket_map = {}
|
|
boxes = {}
|
|
|
|
def heartbeat():
|
|
for hostname, node in nodes.iteritems():
|
|
retries = 0
|
|
while retries < config["heartbeat"]["attempts"]:
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(float(config["heartbeat"]["timeout"]) / (retries + 1))
|
|
s.connect((node["ip"], node["port"]))
|
|
s.shutdown(socket.SHUT_RDWR)
|
|
s.close()
|
|
up = True
|
|
break
|
|
except socket.error, e:
|
|
up = False
|
|
retries += 1
|
|
|
|
try:
|
|
status_changed = (up != last_node_status[hostname])
|
|
initial = False
|
|
except KeyError, e:
|
|
status_changed = True
|
|
initial = True
|
|
|
|
last_node_status[hostname] = up
|
|
|
|
send_message = False
|
|
if status_changed:
|
|
if up:
|
|
msg_type = "up"
|
|
send_message = True
|
|
else:
|
|
msg_type = "down"
|
|
send_message = True
|
|
else:
|
|
if up and retries > 0:
|
|
msg_type = "blip"
|
|
send_message = True
|
|
|
|
if send_message:
|
|
distributor.send(msgpack.packb({
|
|
"host": config["hostname"],
|
|
"message": {
|
|
"service": "heartbeat",
|
|
"msg_type": msg_type,
|
|
"unit": hostname,
|
|
"initial": initial
|
|
}
|
|
}))
|
|
|
|
timers = zmqtimer.ZmqTimerManager()
|
|
timers.add_timer(zmqtimer.ZmqTimer(config["heartbeat"]["interval"], heartbeat))
|
|
|
|
for hostname, node in config["nodes"].iteritems():
|
|
boxes[hostname] = Box(privkey, PublicKey(binascii.unhexlify(node["pubkey"])))
|
|
grabber = ctx.socket(zmq.SUB)
|
|
grabber.setsockopt(zmq.SUBSCRIBE, "")
|
|
grabber.connect(node["endpoint"])
|
|
socket_map[grabber] = hostname
|
|
poller.register(grabber, zmq.POLLIN)
|
|
|
|
while True:
|
|
timers.check()
|
|
socks = dict(poller.poll(timers.get_next_interval()))
|
|
|
|
for sock in socks:
|
|
if socks[sock] == zmq.POLLIN:
|
|
host = socket_map[sock]
|
|
try:
|
|
message = msgpack.unpackb(boxes[host].decrypt(sock.recv()))
|
|
except nacl.exceptions.CryptoError, e:
|
|
# Probably a spoofed message... skip to next socket
|
|
sys.stderr.write("Ignoring message... spoofed? (origin: %s)\n" % host) # FIXME: Use logging module...
|
|
continue
|
|
except Exception, e:
|
|
sys.stderr.write(repr(e) + "\n")
|
|
continue
|
|
distributor.send(msgpack.packb({
|
|
"host": host,
|
|
"message": message
|
|
}))
|
|
|