|
|
@ -10,9 +10,6 @@ distributor.bind("ipc:///tmp/ccollectd-stats") |
|
|
|
|
|
|
|
poller = zmq.Poller() |
|
|
|
|
|
|
|
def heartbeat(): |
|
|
|
pass # TO DO: Check if all endpoints are still pingable... |
|
|
|
|
|
|
|
with open("config.yaml", "r") as cfile: |
|
|
|
config = yaml.safe_load(cfile) |
|
|
|
|
|
|
@ -20,10 +17,49 @@ with open("privkey.dat", "r") as f: |
|
|
|
privkey = PrivateKey(binascii.unhexlify(f.read())) |
|
|
|
|
|
|
|
nodes = config["nodes"] |
|
|
|
last_node_status = {} |
|
|
|
socket_map = {} |
|
|
|
boxes = {} |
|
|
|
|
|
|
|
def heartbeat(): |
|
|
|
for hostname, node in nodes.iteritems(): |
|
|
|
try: |
|
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
|
|
s.settimeout(config["heartbeat"]["timeout"]) |
|
|
|
s.connect((node["ip"], node["port"])) |
|
|
|
s.shutdown(socket.SHUT_RDWR) |
|
|
|
s.close() |
|
|
|
up = True |
|
|
|
except socket.error, e: |
|
|
|
up = False |
|
|
|
|
|
|
|
try: |
|
|
|
status_changed = (up != last_node_status[hostname]) |
|
|
|
initial = False |
|
|
|
except KeyError, e: |
|
|
|
status_changed = True |
|
|
|
initial = True |
|
|
|
|
|
|
|
last_node_status[hostname] = up |
|
|
|
|
|
|
|
if status_changed: |
|
|
|
if up: |
|
|
|
msg_type = "up" |
|
|
|
else: |
|
|
|
msg_type = "down" |
|
|
|
|
|
|
|
distributor.send(msgpack.packb({ |
|
|
|
"host": config["hostname"], |
|
|
|
"message": { |
|
|
|
"service": "heartbeat", |
|
|
|
"msg_type": msg_type, |
|
|
|
"unit": hostname, |
|
|
|
"initial": initial |
|
|
|
} |
|
|
|
})) |
|
|
|
|
|
|
|
timers = zmqtimer.ZmqTimerManager() |
|
|
|
timers.add_timer(zmqtimer.ZmqTimer(5, heartbeat)) |
|
|
|
timers.add_timer(zmqtimer.ZmqTimer(config["heartbeat"]["interval"], heartbeat)) |
|
|
|
|
|
|
|
for hostname, node in config["nodes"].iteritems(): |
|
|
|
boxes[hostname] = Box(privkey, PublicKey(binascii.unhexlify(node["pubkey"]))) |
|
|
|