#!/usr/bin/env python2 import zmq, msgpack, yaml, zmqtimer, binascii, nacl, sys, socket from nacl.public import PublicKey, PrivateKey, Box ctx = zmq.Context() distributor = ctx.socket(zmq.PUB) distributor.bind("ipc:///tmp/ccollectd-stats") poller = zmq.Poller() with open("config.yaml", "r") as cfile: config = yaml.safe_load(cfile) with open("privkey.dat", "r") as f: privkey = PrivateKey(binascii.unhexlify(f.read())) nodes = config["nodes"] last_node_status = {} socket_map = {} boxes = {} def heartbeat(): for hostname, node in nodes.iteritems(): retries = 0 while retries < config["heartbeat"]["attempts"]: try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(float(config["heartbeat"]["timeout"]) / (retries + 1)) s.connect((node["ip"], node["port"])) s.shutdown(socket.SHUT_RDWR) s.close() up = True break except socket.error, e: up = False retries += 1 try: status_changed = (up != last_node_status[hostname]) initial = False except KeyError, e: status_changed = True initial = True last_node_status[hostname] = up if status_changed: if up: msg_type = "up" send_message = True else: msg_type = "down" send_message = True else: if retries > 0: msg_type = "blip" send_message = True if send_message: distributor.send(msgpack.packb({ "host": config["hostname"], "message": { "service": "heartbeat", "msg_type": msg_type, "unit": hostname, "initial": initial } })) timers = zmqtimer.ZmqTimerManager() timers.add_timer(zmqtimer.ZmqTimer(config["heartbeat"]["interval"], heartbeat)) for hostname, node in config["nodes"].iteritems(): boxes[hostname] = Box(privkey, PublicKey(binascii.unhexlify(node["pubkey"]))) grabber = ctx.socket(zmq.PULL) grabber.connect(node["endpoint"]) socket_map[grabber] = hostname poller.register(grabber, zmq.POLLIN) while True: timers.check() socks = dict(poller.poll(timers.get_next_interval())) for sock in socks: if socks[sock] == zmq.POLLIN: host = socket_map[sock] try: message = msgpack.unpackb(boxes[host].decrypt(sock.recv())) except nacl.exceptions.CryptoError, e: # Probably a spoofed message... skip to next socket sys.stderr.write("Ignoring message... spoofed?\n") # FIXME: Use logging module... continue distributor.send(msgpack.packb({ "host": host, "message": message }))