diff --git a/.gitignore b/.gitignore index b5760c2..e48a2c8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ cstatsd/config/*.yaml ccollectd/config.yaml *.pyc +ccollectd/pubkey.dat +ccollectd/privkey.dat +cstatsd/pubkey.dat +cstatsd/privkey.dat diff --git a/ccollectd/ccollectd b/ccollectd/ccollectd index d02c61d..9efe91b 100755 --- a/ccollectd/ccollectd +++ b/ccollectd/ccollectd @@ -1,30 +1,32 @@ #!/usr/bin/env python2 -import zmq, msgpack, yaml, zmqtimer +import zmq, msgpack, yaml, zmqtimer, binascii, nacl, sys +from nacl.public import PublicKey, PrivateKey, Box ctx = zmq.Context() -poller = zmq.Poller() - -def test(): - print "TEST" +distributor = ctx.socket(zmq.PUB) +distributor.bind("ipc:///tmp/ccollectd-stats") -def test10(): - print "TEST 10" +poller = zmq.Poller() -# Perhaps connect back to configured nodes...? Nodes could send pubkey-encrypted data... -# process manager, like port manager +def heartbeat(): + pass # TO DO: Check if all endpoints are still pingable... with open("config.yaml", "r") as cfile: config = yaml.safe_load(cfile) - + +with open("privkey.dat", "r") as f: + privkey = PrivateKey(binascii.unhexlify(f.read())) + nodes = config["nodes"] socket_map = {} +boxes = {} timers = zmqtimer.ZmqTimerManager() -timers.add_timer(zmqtimer.ZmqTimer(1, test)) -timers.add_timer(zmqtimer.ZmqTimer(10.2, test10)) +timers.add_timer(zmqtimer.ZmqTimer(5, heartbeat)) for hostname, node in config["nodes"].iteritems(): + boxes[hostname] = Box(privkey, PublicKey(binascii.unhexlify(node["pubkey"]))) grabber = ctx.socket(zmq.PULL) grabber.connect(node["endpoint"]) socket_map[grabber] = hostname @@ -37,6 +39,11 @@ while True: for sock in socks: if socks[sock] == zmq.POLLIN: host = socket_map[sock] - message = msgpack.unpackb(sock.recv()) + try: + message = msgpack.unpackb(boxes[host].decrypt(sock.recv())) + except nacl.exceptions.CryptoError, e: + # Probably a spoofed message... skip to next socket + sys.stderr.write("Ignoring message... spoofed?\n") # FIXME: Use logging module... + continue print "%s: %s" % (host, message) diff --git a/cstatsd/config/machine.yaml.example b/cstatsd/config/machine.yaml.example index 7c1e7e5..80b95f4 100644 --- a/cstatsd/config/machine.yaml.example +++ b/cstatsd/config/machine.yaml.example @@ -1,4 +1,5 @@ interval: 1 drives: - - / + - /dev/sda1 + - /dev/sdb1 diff --git a/cstatsd/cstatsd b/cstatsd/cstatsd index 91c0754..acaaa61 100755 --- a/cstatsd/cstatsd +++ b/cstatsd/cstatsd @@ -1,12 +1,20 @@ #!/usr/bin/env python2 -import zmq, yaml +import zmq, yaml, binascii, nacl +from nacl.public import PublicKey, PrivateKey, Box ctx = zmq.Context() with open("config/cstatsd.yaml", "r") as cfile: config = yaml.safe_load(cfile) +pubkey = PublicKey(binascii.unhexlify(config["pubkey"])) + +with open("privkey.dat", "r") as f: + privkey = PrivateKey(binascii.unhexlify(f.read())) + +box = Box(privkey, pubkey) + collector = ctx.socket(zmq.PULL) collector.bind("ipc:///tmp/cstatsd") @@ -14,5 +22,6 @@ shipper = ctx.socket(zmq.PUSH) shipper.bind(config["endpoint"]) while True: - #print msgpack.unpackb(collector.recv()) - shipper.send(collector.recv()) + message = collector.recv() + nonce = nacl.utils.random(Box.NONCE_SIZE) + shipper.send(box.encrypt(message, nonce)) diff --git a/cstatsd/stats-machine.py b/cstatsd/stats-machine.py index 5c3d94b..83f9d05 100755 --- a/cstatsd/stats-machine.py +++ b/cstatsd/stats-machine.py @@ -13,6 +13,12 @@ with open("config/machine.yaml", "r") as cfile: interval = config["interval"] old_net_data = {} +disk_map = {} +last_io_data = {} + +for disk in psutil.disk_partitions(): + disk_map[disk.device] = disk + while True: load_avgs = os.getloadavg() sock.send(msgpack.packb({ @@ -40,8 +46,30 @@ while True: } })) + io_counters = psutil.disk_io_counters(perdisk=True) + for drive in config["drives"]: - drive_data = psutil.disk_usage(drive) + drive_data = psutil.disk_usage(disk_map[drive].mountpoint) + io_data = None + + for diskname, data in io_counters.iteritems(): + if drive.endswith(diskname): + io_data = data + + if io_data is None or drive not in last_io_data: + read_bps = 0 + write_bps = 0 + read_iops = 0 + write_iops = 0 + else: + read_bps = (io_data.read_bytes - last_io_data[drive].read_bytes) / interval + write_bps = (io_data.write_bytes - last_io_data[drive].write_bytes) / interval + read_iops = (io_data.read_count - last_io_data[drive].read_count) / interval + write_iops = (io_data.write_count - last_io_data[drive].write_count) / interval + + if io_data is not None: + last_io_data[drive] = io_data + sock.send(msgpack.packb({ "service": "machine", "msg_type": "value", @@ -51,7 +79,11 @@ while True: "total": drive_data.total, "used": drive_data.used, "free": drive_data.free, - "used_percentage": drive_data.percent + "used_percentage": drive_data.percent, + "bps_read": read_bps, + "bps_write": write_bps, + "iops_read": read_iops, + "iops_write": write_iops, } })) diff --git a/cstatsd/stats-processes.py b/cstatsd/stats-processes.py index d01e5ca..390740b 100644 --- a/cstatsd/stats-processes.py +++ b/cstatsd/stats-processes.py @@ -18,23 +18,26 @@ while True: all_procs = psutil.get_process_list() for service_name, patterns in config["processes"].iteritems(): - matches = all_procs - - try: - matches = filter(lambda proc: len(proc.cmdline) > 0 and fnmatch.fnmatch(proc.cmdline[0], patterns["name"]), matches) - except KeyError, e: - pass - - try: - for arg, pattern in patterns["args"].iteritems(): - try: - matches = filter(lambda proc: len(proc.cmdline) >= (arg + 1) and fnmatch.fnmatch(proc.cmdline[arg], pattern), matches) - except KeyError, e: - pass - except KeyError, e: - pass # No arguments specified to filter + matching = [] + for proc in all_procs: # Can't use filter() because of exceptions... + try: + if len(proc.cmdline) > 0 and fnmatch.fnmatch(proc.cmdline[0], patterns["name"]): + failed = False + try: + for arg, pattern in patterns["args"].iteritems(): + try: + if len(proc.cmdline) < (arg + 1) or not fnmatch.fnmatch(proc.cmdline[arg], pattern): + failed = True + except KeyError, e: + pass + except KeyError, e: + pass + if failed == False: + matching.append(proc) + except psutil._error.NoSuchProcess, e: + pass - if len(matches) > 0: + if len(matching) > 0: up = True else: up = False