From 4550e0a4253655f1ad90a0e9f9ee92d4ed248ee0 Mon Sep 17 00:00:00 2001 From: Sven Slootweg Date: Sat, 7 Dec 2013 16:17:18 +0100 Subject: [PATCH] Add process watch, write code for ccollectd, implement ZeroMQ timer class, add dep installation script --- .gitignore | 2 ++ ccollectd/ccollectd | 42 +++++++++++++++++++++++ ccollectd/zmqtimer.py | 41 ++++++++++++++++++++++ cstatsd/cstatsd | 12 +++++-- cstatsd/stats-processes.py | 69 ++++++++++++++++++++++++++++++++++++++ deps.sh | 2 ++ 6 files changed, 165 insertions(+), 3 deletions(-) create mode 100755 ccollectd/ccollectd create mode 100644 ccollectd/zmqtimer.py create mode 100644 cstatsd/stats-processes.py create mode 100644 deps.sh diff --git a/.gitignore b/.gitignore index 5d3fc77..b5760c2 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ cstatsd/config/*.yaml +ccollectd/config.yaml +*.pyc diff --git a/ccollectd/ccollectd b/ccollectd/ccollectd new file mode 100755 index 0000000..d02c61d --- /dev/null +++ b/ccollectd/ccollectd @@ -0,0 +1,42 @@ +#!/usr/bin/env python2 + +import zmq, msgpack, yaml, zmqtimer + +ctx = zmq.Context() + +poller = zmq.Poller() + +def test(): + print "TEST" + +def test10(): + print "TEST 10" + +# Perhaps connect back to configured nodes...? Nodes could send pubkey-encrypted data... +# process manager, like port manager + +with open("config.yaml", "r") as cfile: + config = yaml.safe_load(cfile) + +nodes = config["nodes"] +socket_map = {} +timers = zmqtimer.ZmqTimerManager() +timers.add_timer(zmqtimer.ZmqTimer(1, test)) +timers.add_timer(zmqtimer.ZmqTimer(10.2, test10)) + +for hostname, node in config["nodes"].iteritems(): + grabber = ctx.socket(zmq.PULL) + grabber.connect(node["endpoint"]) + socket_map[grabber] = hostname + poller.register(grabber, zmq.POLLIN) + +while True: + timers.check() + socks = dict(poller.poll(timers.get_next_interval())) + + for sock in socks: + if socks[sock] == zmq.POLLIN: + host = socket_map[sock] + message = msgpack.unpackb(sock.recv()) + print "%s: %s" % (host, message) + diff --git a/ccollectd/zmqtimer.py b/ccollectd/zmqtimer.py new file mode 100644 index 0000000..4e7ebc2 --- /dev/null +++ b/ccollectd/zmqtimer.py @@ -0,0 +1,41 @@ +import time + +class ZmqTimerManager(object): + def __init__(self): + self.timers = [] + self.next_call = 0 + + def add_timer(self, timer): + self.timers.append(timer) + + def check(self): + if time.time() > self.next_call: + for timer in self.timers: + timer.check() + + def get_next_interval(self): + if time.time() >= self.next_call: + call_times = [] + for timer in self.timers: + call_times.append(timer.get_next_call()) + self.next_call = min(call_times) + if self.next_call < time.time(): + return 0 + else: + return (self.next_call - time.time()) * 1000 + else: + return (self.next_call - time.time()) * 1000 + +class ZmqTimer(object): + def __init__(self, interval, callback): + self.interval = interval + self.callback = callback + self.last_call = 0 + + def check(self): + if time.time() > (self.interval + self.last_call): + self.callback() + self.last_call = time.time() + + def get_next_call(self): + return self.last_call + self.interval diff --git a/cstatsd/cstatsd b/cstatsd/cstatsd index dcc9b63..91c0754 100755 --- a/cstatsd/cstatsd +++ b/cstatsd/cstatsd @@ -1,12 +1,18 @@ #!/usr/bin/env python2 -import zmq, msgpack +import zmq, yaml ctx = zmq.Context() +with open("config/cstatsd.yaml", "r") as cfile: + config = yaml.safe_load(cfile) + collector = ctx.socket(zmq.PULL) collector.bind("ipc:///tmp/cstatsd") +shipper = ctx.socket(zmq.PUSH) +shipper.bind(config["endpoint"]) + while True: - message = msgpack.unpackb(collector.recv()) - print message + #print msgpack.unpackb(collector.recv()) + shipper.send(collector.recv()) diff --git a/cstatsd/stats-processes.py b/cstatsd/stats-processes.py new file mode 100644 index 0000000..d01e5ca --- /dev/null +++ b/cstatsd/stats-processes.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python2 + +import zmq, msgpack, time, yaml, psutil, fnmatch + +ctx = zmq.Context() + +sock = ctx.socket(zmq.PUSH) +sock.connect("ipc:///tmp/cstatsd") + +with open("config/processes.yaml", "r") as cfile: + config = yaml.safe_load(cfile) + +interval = config["interval"] + +old_status = {} + +while True: + all_procs = psutil.get_process_list() + + for service_name, patterns in config["processes"].iteritems(): + matches = all_procs + + try: + matches = filter(lambda proc: len(proc.cmdline) > 0 and fnmatch.fnmatch(proc.cmdline[0], patterns["name"]), matches) + except KeyError, e: + pass + + try: + for arg, pattern in patterns["args"].iteritems(): + try: + matches = filter(lambda proc: len(proc.cmdline) >= (arg + 1) and fnmatch.fnmatch(proc.cmdline[arg], pattern), matches) + except KeyError, e: + pass + except KeyError, e: + pass # No arguments specified to filter + + if len(matches) > 0: + up = True + else: + up = False + + try: + if up == old_status[service_name]: + send_notice = False + else: + send_notice = True + initial = False + except KeyError, e: + send_notice = True + initial = True + + old_status[service_name] = up + + if send_notice: + if up: + msg_type = "up" + else: + msg_type = "down" + + sock.send(msgpack.packb({ + "service": "process", + "msg_type": msg_type, + "unit": service_name, + "initial": initial + })) + + + time.sleep(interval) + diff --git a/deps.sh b/deps.sh new file mode 100644 index 0000000..c0654ce --- /dev/null +++ b/deps.sh @@ -0,0 +1,2 @@ +apt-get install -y libzmq-dev +pip install pyzmq msgpack pynacl pyyaml