Browse Source

Implement crypto, add disk i/o stats to machine statistics (and change config format), proper error handling for process monitoring

develop
Sven Slootweg 8 years ago
parent
commit
56ae6b5305
  1. 4
      .gitignore
  2. 33
      ccollectd/ccollectd
  3. 3
      cstatsd/config/machine.yaml.example
  4. 15
      cstatsd/cstatsd
  5. 36
      cstatsd/stats-machine.py
  6. 35
      cstatsd/stats-processes.py

4
.gitignore

@ -1,3 +1,7 @@
cstatsd/config/*.yaml
ccollectd/config.yaml
*.pyc
ccollectd/pubkey.dat
ccollectd/privkey.dat
cstatsd/pubkey.dat
cstatsd/privkey.dat

33
ccollectd/ccollectd

@ -1,30 +1,32 @@
#!/usr/bin/env python2
import zmq, msgpack, yaml, zmqtimer
import zmq, msgpack, yaml, zmqtimer, binascii, nacl, sys
from nacl.public import PublicKey, PrivateKey, Box
ctx = zmq.Context()
poller = zmq.Poller()
def test():
print "TEST"
distributor = ctx.socket(zmq.PUB)
distributor.bind("ipc:///tmp/ccollectd-stats")
def test10():
print "TEST 10"
poller = zmq.Poller()
# Perhaps connect back to configured nodes...? Nodes could send pubkey-encrypted data...
# process manager, like port manager
def heartbeat():
pass # TO DO: Check if all endpoints are still pingable...
with open("config.yaml", "r") as cfile:
config = yaml.safe_load(cfile)
with open("privkey.dat", "r") as f:
privkey = PrivateKey(binascii.unhexlify(f.read()))
nodes = config["nodes"]
socket_map = {}
boxes = {}
timers = zmqtimer.ZmqTimerManager()
timers.add_timer(zmqtimer.ZmqTimer(1, test))
timers.add_timer(zmqtimer.ZmqTimer(10.2, test10))
timers.add_timer(zmqtimer.ZmqTimer(5, heartbeat))
for hostname, node in config["nodes"].iteritems():
boxes[hostname] = Box(privkey, PublicKey(binascii.unhexlify(node["pubkey"])))
grabber = ctx.socket(zmq.PULL)
grabber.connect(node["endpoint"])
socket_map[grabber] = hostname
@ -37,6 +39,11 @@ while True:
for sock in socks:
if socks[sock] == zmq.POLLIN:
host = socket_map[sock]
message = msgpack.unpackb(sock.recv())
try:
message = msgpack.unpackb(boxes[host].decrypt(sock.recv()))
except nacl.exceptions.CryptoError, e:
# Probably a spoofed message... skip to next socket
sys.stderr.write("Ignoring message... spoofed?\n") # FIXME: Use logging module...
continue
print "%s: %s" % (host, message)

3
cstatsd/config/machine.yaml.example

@ -1,4 +1,5 @@
interval: 1
drives:
- /
- /dev/sda1
- /dev/sdb1

15
cstatsd/cstatsd

@ -1,12 +1,20 @@
#!/usr/bin/env python2
import zmq, yaml
import zmq, yaml, binascii, nacl
from nacl.public import PublicKey, PrivateKey, Box
ctx = zmq.Context()
with open("config/cstatsd.yaml", "r") as cfile:
config = yaml.safe_load(cfile)
pubkey = PublicKey(binascii.unhexlify(config["pubkey"]))
with open("privkey.dat", "r") as f:
privkey = PrivateKey(binascii.unhexlify(f.read()))
box = Box(privkey, pubkey)
collector = ctx.socket(zmq.PULL)
collector.bind("ipc:///tmp/cstatsd")
@ -14,5 +22,6 @@ shipper = ctx.socket(zmq.PUSH)
shipper.bind(config["endpoint"])
while True:
#print msgpack.unpackb(collector.recv())
shipper.send(collector.recv())
message = collector.recv()
nonce = nacl.utils.random(Box.NONCE_SIZE)
shipper.send(box.encrypt(message, nonce))

36
cstatsd/stats-machine.py

@ -13,6 +13,12 @@ with open("config/machine.yaml", "r") as cfile:
interval = config["interval"]
old_net_data = {}
disk_map = {}
last_io_data = {}
for disk in psutil.disk_partitions():
disk_map[disk.device] = disk
while True:
load_avgs = os.getloadavg()
sock.send(msgpack.packb({
@ -40,8 +46,30 @@ while True:
}
}))
io_counters = psutil.disk_io_counters(perdisk=True)
for drive in config["drives"]:
drive_data = psutil.disk_usage(drive)
drive_data = psutil.disk_usage(disk_map[drive].mountpoint)
io_data = None
for diskname, data in io_counters.iteritems():
if drive.endswith(diskname):
io_data = data
if io_data is None or drive not in last_io_data:
read_bps = 0
write_bps = 0
read_iops = 0
write_iops = 0
else:
read_bps = (io_data.read_bytes - last_io_data[drive].read_bytes) / interval
write_bps = (io_data.write_bytes - last_io_data[drive].write_bytes) / interval
read_iops = (io_data.read_count - last_io_data[drive].read_count) / interval
write_iops = (io_data.write_count - last_io_data[drive].write_count) / interval
if io_data is not None:
last_io_data[drive] = io_data
sock.send(msgpack.packb({
"service": "machine",
"msg_type": "value",
@ -51,7 +79,11 @@ while True:
"total": drive_data.total,
"used": drive_data.used,
"free": drive_data.free,
"used_percentage": drive_data.percent
"used_percentage": drive_data.percent,
"bps_read": read_bps,
"bps_write": write_bps,
"iops_read": read_iops,
"iops_write": write_iops,
}
}))

35
cstatsd/stats-processes.py

@ -18,23 +18,26 @@ while True:
all_procs = psutil.get_process_list()
for service_name, patterns in config["processes"].iteritems():
matches = all_procs
try:
matches = filter(lambda proc: len(proc.cmdline) > 0 and fnmatch.fnmatch(proc.cmdline[0], patterns["name"]), matches)
except KeyError, e:
pass
try:
for arg, pattern in patterns["args"].iteritems():
try:
matches = filter(lambda proc: len(proc.cmdline) >= (arg + 1) and fnmatch.fnmatch(proc.cmdline[arg], pattern), matches)
except KeyError, e:
pass
except KeyError, e:
pass # No arguments specified to filter
matching = []
for proc in all_procs: # Can't use filter() because of exceptions...
try:
if len(proc.cmdline) > 0 and fnmatch.fnmatch(proc.cmdline[0], patterns["name"]):
failed = False
try:
for arg, pattern in patterns["args"].iteritems():
try:
if len(proc.cmdline) < (arg + 1) or not fnmatch.fnmatch(proc.cmdline[arg], pattern):
failed = True
except KeyError, e:
pass
except KeyError, e:
pass
if failed == False:
matching.append(proc)
except psutil._error.NoSuchProcess, e:
pass
if len(matches) > 0:
if len(matching) > 0:
up = True
else:
up = False

Loading…
Cancel
Save