diff --git a/ccollectd/ccollectd b/ccollectd/ccollectd index 1c461ff..9a0c58d 100755 --- a/ccollectd/ccollectd +++ b/ccollectd/ccollectd @@ -10,9 +10,6 @@ distributor.bind("ipc:///tmp/ccollectd-stats") poller = zmq.Poller() -def heartbeat(): - pass # TO DO: Check if all endpoints are still pingable... - with open("config.yaml", "r") as cfile: config = yaml.safe_load(cfile) @@ -20,10 +17,49 @@ with open("privkey.dat", "r") as f: privkey = PrivateKey(binascii.unhexlify(f.read())) nodes = config["nodes"] +last_node_status = {} socket_map = {} boxes = {} + +def heartbeat(): + for hostname, node in nodes.iteritems(): + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(config["heartbeat"]["timeout"]) + s.connect((node["ip"], node["port"])) + s.shutdown(socket.SHUT_RDWR) + s.close() + up = True + except socket.error, e: + up = False + + try: + status_changed = (up != last_node_status[hostname]) + initial = False + except KeyError, e: + status_changed = True + initial = True + + last_node_status[hostname] = up + + if status_changed: + if up: + msg_type = "up" + else: + msg_type = "down" + + distributor.send(msgpack.packb({ + "host": config["hostname"], + "message": { + "service": "heartbeat", + "msg_type": msg_type, + "unit": hostname, + "initial": initial + } + })) + timers = zmqtimer.ZmqTimerManager() -timers.add_timer(zmqtimer.ZmqTimer(5, heartbeat)) +timers.add_timer(zmqtimer.ZmqTimer(config["heartbeat"]["interval"], heartbeat)) for hostname, node in config["nodes"].iteritems(): boxes[hostname] = Box(privkey, PublicKey(binascii.unhexlify(node["pubkey"]))) diff --git a/ccollectd/config.yaml.example b/ccollectd/config.yaml.example index e815d98..da3d140 100644 --- a/ccollectd/config.yaml.example +++ b/ccollectd/config.yaml.example @@ -1,3 +1,9 @@ +hostname: monitoring.cryto.net + +heartbeat: + interval: 5 + timeout: 1 + nodes: localhost: ip: 127.0.0.1