Add process watch, write code for ccollectd, implement ZeroMQ timer class, add dep installation script
parent
d310a95e7a
commit
4550e0a425
@ -1 +1,3 @@
|
|||||||
cstatsd/config/*.yaml
|
cstatsd/config/*.yaml
|
||||||
|
ccollectd/config.yaml
|
||||||
|
*.pyc
|
||||||
|
@ -0,0 +1,42 @@
|
|||||||
|
#!/usr/bin/env python2
|
||||||
|
|
||||||
|
import zmq, msgpack, yaml, zmqtimer
|
||||||
|
|
||||||
|
ctx = zmq.Context()
|
||||||
|
|
||||||
|
poller = zmq.Poller()
|
||||||
|
|
||||||
|
def test():
|
||||||
|
print "TEST"
|
||||||
|
|
||||||
|
def test10():
|
||||||
|
print "TEST 10"
|
||||||
|
|
||||||
|
# Perhaps connect back to configured nodes...? Nodes could send pubkey-encrypted data...
|
||||||
|
# process manager, like port manager
|
||||||
|
|
||||||
|
with open("config.yaml", "r") as cfile:
|
||||||
|
config = yaml.safe_load(cfile)
|
||||||
|
|
||||||
|
nodes = config["nodes"]
|
||||||
|
socket_map = {}
|
||||||
|
timers = zmqtimer.ZmqTimerManager()
|
||||||
|
timers.add_timer(zmqtimer.ZmqTimer(1, test))
|
||||||
|
timers.add_timer(zmqtimer.ZmqTimer(10.2, test10))
|
||||||
|
|
||||||
|
for hostname, node in config["nodes"].iteritems():
|
||||||
|
grabber = ctx.socket(zmq.PULL)
|
||||||
|
grabber.connect(node["endpoint"])
|
||||||
|
socket_map[grabber] = hostname
|
||||||
|
poller.register(grabber, zmq.POLLIN)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
timers.check()
|
||||||
|
socks = dict(poller.poll(timers.get_next_interval()))
|
||||||
|
|
||||||
|
for sock in socks:
|
||||||
|
if socks[sock] == zmq.POLLIN:
|
||||||
|
host = socket_map[sock]
|
||||||
|
message = msgpack.unpackb(sock.recv())
|
||||||
|
print "%s: %s" % (host, message)
|
||||||
|
|
@ -0,0 +1,41 @@
|
|||||||
|
import time
|
||||||
|
|
||||||
|
class ZmqTimerManager(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.timers = []
|
||||||
|
self.next_call = 0
|
||||||
|
|
||||||
|
def add_timer(self, timer):
|
||||||
|
self.timers.append(timer)
|
||||||
|
|
||||||
|
def check(self):
|
||||||
|
if time.time() > self.next_call:
|
||||||
|
for timer in self.timers:
|
||||||
|
timer.check()
|
||||||
|
|
||||||
|
def get_next_interval(self):
|
||||||
|
if time.time() >= self.next_call:
|
||||||
|
call_times = []
|
||||||
|
for timer in self.timers:
|
||||||
|
call_times.append(timer.get_next_call())
|
||||||
|
self.next_call = min(call_times)
|
||||||
|
if self.next_call < time.time():
|
||||||
|
return 0
|
||||||
|
else:
|
||||||
|
return (self.next_call - time.time()) * 1000
|
||||||
|
else:
|
||||||
|
return (self.next_call - time.time()) * 1000
|
||||||
|
|
||||||
|
class ZmqTimer(object):
|
||||||
|
def __init__(self, interval, callback):
|
||||||
|
self.interval = interval
|
||||||
|
self.callback = callback
|
||||||
|
self.last_call = 0
|
||||||
|
|
||||||
|
def check(self):
|
||||||
|
if time.time() > (self.interval + self.last_call):
|
||||||
|
self.callback()
|
||||||
|
self.last_call = time.time()
|
||||||
|
|
||||||
|
def get_next_call(self):
|
||||||
|
return self.last_call + self.interval
|
@ -1,12 +1,18 @@
|
|||||||
#!/usr/bin/env python2
|
#!/usr/bin/env python2
|
||||||
|
|
||||||
import zmq, msgpack
|
import zmq, yaml
|
||||||
|
|
||||||
ctx = zmq.Context()
|
ctx = zmq.Context()
|
||||||
|
|
||||||
|
with open("config/cstatsd.yaml", "r") as cfile:
|
||||||
|
config = yaml.safe_load(cfile)
|
||||||
|
|
||||||
collector = ctx.socket(zmq.PULL)
|
collector = ctx.socket(zmq.PULL)
|
||||||
collector.bind("ipc:///tmp/cstatsd")
|
collector.bind("ipc:///tmp/cstatsd")
|
||||||
|
|
||||||
|
shipper = ctx.socket(zmq.PUSH)
|
||||||
|
shipper.bind(config["endpoint"])
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
message = msgpack.unpackb(collector.recv())
|
#print msgpack.unpackb(collector.recv())
|
||||||
print message
|
shipper.send(collector.recv())
|
||||||
|
@ -0,0 +1,69 @@
|
|||||||
|
#!/usr/bin/env python2
|
||||||
|
|
||||||
|
import zmq, msgpack, time, yaml, psutil, fnmatch
|
||||||
|
|
||||||
|
ctx = zmq.Context()
|
||||||
|
|
||||||
|
sock = ctx.socket(zmq.PUSH)
|
||||||
|
sock.connect("ipc:///tmp/cstatsd")
|
||||||
|
|
||||||
|
with open("config/processes.yaml", "r") as cfile:
|
||||||
|
config = yaml.safe_load(cfile)
|
||||||
|
|
||||||
|
interval = config["interval"]
|
||||||
|
|
||||||
|
old_status = {}
|
||||||
|
|
||||||
|
while True:
|
||||||
|
all_procs = psutil.get_process_list()
|
||||||
|
|
||||||
|
for service_name, patterns in config["processes"].iteritems():
|
||||||
|
matches = all_procs
|
||||||
|
|
||||||
|
try:
|
||||||
|
matches = filter(lambda proc: len(proc.cmdline) > 0 and fnmatch.fnmatch(proc.cmdline[0], patterns["name"]), matches)
|
||||||
|
except KeyError, e:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
for arg, pattern in patterns["args"].iteritems():
|
||||||
|
try:
|
||||||
|
matches = filter(lambda proc: len(proc.cmdline) >= (arg + 1) and fnmatch.fnmatch(proc.cmdline[arg], pattern), matches)
|
||||||
|
except KeyError, e:
|
||||||
|
pass
|
||||||
|
except KeyError, e:
|
||||||
|
pass # No arguments specified to filter
|
||||||
|
|
||||||
|
if len(matches) > 0:
|
||||||
|
up = True
|
||||||
|
else:
|
||||||
|
up = False
|
||||||
|
|
||||||
|
try:
|
||||||
|
if up == old_status[service_name]:
|
||||||
|
send_notice = False
|
||||||
|
else:
|
||||||
|
send_notice = True
|
||||||
|
initial = False
|
||||||
|
except KeyError, e:
|
||||||
|
send_notice = True
|
||||||
|
initial = True
|
||||||
|
|
||||||
|
old_status[service_name] = up
|
||||||
|
|
||||||
|
if send_notice:
|
||||||
|
if up:
|
||||||
|
msg_type = "up"
|
||||||
|
else:
|
||||||
|
msg_type = "down"
|
||||||
|
|
||||||
|
sock.send(msgpack.packb({
|
||||||
|
"service": "process",
|
||||||
|
"msg_type": msg_type,
|
||||||
|
"unit": service_name,
|
||||||
|
"initial": initial
|
||||||
|
}))
|
||||||
|
|
||||||
|
|
||||||
|
time.sleep(interval)
|
||||||
|
|
Loading…
Reference in New Issue