diff --git a/.gitignore b/.gitignore index e48a2c8..20a01bb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ cstatsd/config/*.yaml ccollectd/config.yaml +alert/config.yaml *.pyc ccollectd/pubkey.dat ccollectd/privkey.dat diff --git a/alert/alert b/alert/alert new file mode 100755 index 0000000..4ab63a3 --- /dev/null +++ b/alert/alert @@ -0,0 +1,298 @@ +#!/usr/bin/env python2 + +import socket, yaml, random, zmq, msgpack, time, uuid, fnmatch +import cPickle as pickle + +ctx = zmq.Context() + +with open("config.yaml", "r") as cfile: + config = yaml.safe_load(cfile) + +try: + with open("rules.pickle", "r") as pfile: + rules = pickle.load(pfile) +except IOError, e: + rules = {} + +fetcher = ctx.socket(zmq.SUB) +fetcher.setsockopt(zmq.SUBSCRIBE, "") +fetcher.connect("ipc:///tmp/ccollectd-stats") + +class Bot(object): + def __init__(self, hosts, port, nickname, realname, channels, admins, subsock): + self.hosts = hosts + self.port = port + self.nickname = nickname + self.realname = realname + self.channels = channels + self.admins = admins + self.subsock = subsock + self.connected = False + self.last_down = {} + self.known_alarms = {} + + self.command_map = { + "422": self.join_all_channels, + "376": self.join_all_channels, + "PRIVMSG": self.receive_message + } + + def split_irc(self, message): + if message[0] == ":": + prefix = ":" + message = message[1:] + else: + prefix = "" + + if ":" in message: + rest, last = message.split(":", 1) + parts = rest.strip().split() + [last] + else: + parts = message.split() + + parts[0] = prefix + parts[0] + return parts + + def run(self): + while True: # Connect loop + host = random.choice(self.hosts) + + self.sock = socket.socket() + try: + self.sock.connect((host, self.port)) + except socket.error, e: + continue # Reconnect + self.send_raw("NICK %s" % self.nickname) + self.sock.send("USER %s 0 0 :%s\r\n" % (self.nickname, self.realname)) + + buff = "" + while True: # Read loop + r, w, x = zmq.select([self.sock, self.subsock], [], []) + + for s in r: + if s == self.sock.fileno(): + try: + recvdata = self.sock.recv(1024) + except socket.error, e: + break # Something went wrong, reconnect... + + if len(recvdata) == 0: + break # We have disconnected... + + buff += recvdata + messages = buff.split("\n") + buff = messages.pop() + + for message in messages: + self.process_message(self.split_irc(message.strip("\r"))) + elif self.subsock.getsockopt(zmq.EVENTS) & zmq.POLLIN != 0: + # Process incoming data from the subscribe socket... + message = msgpack.unpackb(s.recv()) + self.process_stats(message) + + def send_raw(self, message): + self.sock.send("%s\r\n" % message) + + def send_message(self, recipient, message): + if self.connected == True: + self.send_raw("PRIVMSG %s :%s" % (recipient, message)) + + def send_all(self, message): + for channel in self.channels: + self.send_message(channel, message) + + def join(self, channel): + self.send_raw("JOIN %s" % channel) + + def join_all_channels(self, message): + self.connected = True + for channel in self.channels: + self.join(channel) + + def receive_message(self, message): + args = message[3].split() + sender = message[0][1:].split("!", 1)[0] + channel = message[2] + + try: + if sender in self.admins: + if args[0] == "!addrule": + target, rel, value = args[1:4] + target = self.parse_target(target) + + rule_id = uuid.uuid4() + rules[rule_id] = { + "target": target, + "operator": rel, + "value": value + } + + with open("rules.pickle", "w") as pfile: + pickle.dump(rules, pfile) + + self.send_message(channel, "Added rule for %s with ID %s." % (args[1], rule_id)) + except Exception, e: + self.send_message(channel, str(e)) + + def parse_target(self, target): + host, rest = target.split("!", 1) + service, rest = rest.split(".", 1) + resource, rest = rest.split(":", 1) + unit, attribute = rest.split(".", 1) + # TODO: unit = unit.split("(", 1)[0].strip() # Allow () for comments + if host == "*": + host = True + if service == "*": + service = True + if attribute == "*": + attribute = True + if resource == "*": + resource = True + if unit == "*": + unit = True + return { + "host": host, + "service": service, + "resource": resource, + "unit": unit, + "attribute": attribute + } + + + def format_time_duration(self, seconds): + # http://stackoverflow.com/a/20222351/1332715 + days, rem = divmod(seconds, 86400) + hours, rem = divmod(rem, 3600) + minutes, seconds = divmod(rem, 60) + if seconds < 1: + seconds = 1 + locals_ = locals() + magnitudes_str = ("{n} {magnitude}".format(n=int(locals_[magnitude]), magnitude=magnitude) for magnitude in ("days", "hours", "minutes", "seconds") if locals_[magnitude]) + return ", ".join(magnitudes_str) + + def process_stats(self, message): + data = message["message"] + data["host"] = message["host"] + + if data["msg_type"] == "up" and data["initial"] == True: + return # We don't need to say what is up, initially... + + # TODO: Duration + if data["msg_type"] == "up": + try: + data["duration"] = self.format_time_duration(time.time() - self.last_down["%(host)s!%(service)s.%(unit)s" % data]) + except KeyError, e: + data["duration"] = "0 seconds" + self.send_all("\x02\x030,3 [ UP ] \x03\x02 Service \x032%(service)s\x03 on host \x037%(host)s\x03 reports that \x036%(unit)s\x03 is now back up. It was down for %(duration)s." % data) + elif data["msg_type"] == "down": + self.last_down["%(host)s!%(service)s.%(unit)s" % data] = time.time() + self.send_all("\x02\x030,4 [ DOWN ] \x03\x02 Service \x032%(service)s\x03 on host \x037%(host)s\x03 reports that \x036%(unit)s\x03 is \x02down!\x02" % data) + elif data["msg_type"] == "value": + for rule_id, rule in rules.iteritems(): + check_vals = { + "host": [data["host"]], + "service": [data["service"]], + "resource": [data["resource_type"]], + "unit": [data["unit"]] + } + + for segment in ("host", "service", "resource", "unit"): + for val in check_vals[segment]: + if rule["target"][segment] is not True and not fnmatch.fnmatch(val, rule["target"][segment]): + continue + + # We haven't broken out in the past bit of code, so we're still matching the pattern... + eligible_keys = [key for key in data["values"].keys() if fnmatch.fnmatch(key, rule["target"]["attribute"])] + + for key in eligible_keys: + value = data["values"][key] + rule_value = float(rule["value"]) + operator = rule["operator"] + + if operator == "=": + alarm = (value == rule_value) + elif operator == ">": + alarm = (value > rule_value) + elif operator == "<": + alarm = (value < rule_value) + elif operator == ">=": + alarm = (value >= rule_value) + elif operator == "<=": + alarm = (value <= rule_value) + elif operator == "!=": + alarm = (value != rule_value) + else: + alarm = False + + print value, operator, rule_value, alarm + + self.trigger_alarm(rule_id, data, alarm, value, key) + + def trigger_alarm(self, rule_id, data, active, offending_value=None, offending_key=None): + key = "%s/%s/%s/%s" % (rule_id, data["host"], data["unit"], offending_key) + + if key not in self.known_alarms: + if active: + self.transmit_alarm(rule_id, data, active, offending_value, offending_key) + print "ALARM ACTIVE, UNKNOWN ENTRY" + self.known_alarms[key] = time.time() + else: + self.known_alarms[key] = False + print "ALARM STOP, UNKNOWN ENTRY" + else: + if self.known_alarms[key] == False and active: + # Alarm activated + self.transmit_alarm(rule_id, data, active, offending_value, offending_key) + print "ALARM ACTIVE, EXISTING ENTRY" + self.known_alarms[key] = time.time() + elif self.known_alarms[key] != False and not active: + # Alarm deactivated + self.transmit_alarm(rule_id, data, active, offending_value, offending_key) + print "ALARM STOP, EXISTING ENTRY" + self.known_alarms[key] = False + + def transmit_alarm(self, rule_id, data, active, offending_value=None, offending_key=None): + # At this point, we're sure that we want to notify... + rule_target = rules[rule_id]["target"].copy() + for k, v in rule_target.iteritems(): + if v is True: + rule_target[k] = "*" + + rule_pattern = "%(host)s!%(service)s.%(resource)s:%(unit)s.%(attribute)s" % rule_target + + info = { + "host": data["host"], + "rule_id": rule_id, + "rule_pattern": rule_pattern + } + + if not active: + key = "%s/%s/%s/%s" % (rule_id, data["host"], data["unit"], offending_key) + try: + info["duration"] = self.format_time_duration(time.time() - self.known_alarms[key]) + except KeyError, e: + info["duration"] = "0 seconds" + info["unit"] = data["unit"] + info["attribute"] = offending_key + + self.send_all("\x02\x030,3 [ SOLVED ] \x03\x02 Host \x037%(host)s\x03 reports that the alarm for rule %(rule_id)s (\x036%(rule_pattern)s\x03) was resolved for \x034%(unit)s\x03.\x034%(attribute)s\x03. It was active for %(duration)s." % info) + else: + info["value"] = offending_value + info["spec"] = "%s %s" % (rules[rule_id]["operator"], rules[rule_id]["value"]) + info["unit"] = data["unit"] + info["attribute"] = offending_key + + self.send_all("\x02\x030,7 [ ALARM ] \x03\x02 Host \x037%(host)s\x03 reports that an alarm was triggered for rule %(rule_id)s (\x036%(rule_pattern)s\x03). The reported value was\x034 %(value)s\x03 for\x034 %(unit)s\x03.\x034%(attribute)s\x03 , triggering the \x032%(spec)s\x03 condition." % info) + + def process_message(self, message): + if message[0].upper() == "PING": + self.send_raw("PONG %s" % message[1]) + else: + try: + self.command_map[message[1].upper()](message) + except KeyError, e: + pass + + +bot = Bot(config["irc"]["hosts"], config["irc"]["port"], config["irc"]["nickname"], config["irc"]["realname"], config["irc"]["channels"], config["irc"]["admins"], fetcher) +bot.run() diff --git a/alert/config.yaml.example b/alert/config.yaml.example new file mode 100644 index 0000000..4555054 --- /dev/null +++ b/alert/config.yaml.example @@ -0,0 +1,12 @@ +irc: + hosts: + - kerpia.cryto.net + - box.cryto.net + - arvel.cryto.net + port: 6667 + nickname: StatusBot + realname: Cryto System Monitoring Service + admins: + - joepie91 + channels: + - "#test" diff --git a/ccollectd/ccollectd b/ccollectd/ccollectd index 9efe91b..1c461ff 100755 --- a/ccollectd/ccollectd +++ b/ccollectd/ccollectd @@ -45,5 +45,8 @@ while True: # Probably a spoofed message... skip to next socket sys.stderr.write("Ignoring message... spoofed?\n") # FIXME: Use logging module... continue - print "%s: %s" % (host, message) + distributor.send(msgpack.packb({ + "host": host, + "message": message + })) diff --git a/ccollectd/config.yaml.example b/ccollectd/config.yaml.example new file mode 100644 index 0000000..e815d98 --- /dev/null +++ b/ccollectd/config.yaml.example @@ -0,0 +1,6 @@ +nodes: + localhost: + ip: 127.0.0.1 + port: 6543 + endpoint: tcp://127.0.0.1:6543 + pubkey: bd784ef4065c9bd31627106dc55e26764605a144c6fc45ce93f33cbd19dd7333 diff --git a/cstatsd/config/cstatsd.yaml.example b/cstatsd/config/cstatsd.yaml.example new file mode 100644 index 0000000..1f05c0f --- /dev/null +++ b/cstatsd/config/cstatsd.yaml.example @@ -0,0 +1,2 @@ +endpoint: tcp://*:6543 +pubkey: a266a0634790a79c6934385892f7c377d35b8f03b9c6ac7d5bfed4a94f93ba65 diff --git a/cstatsd/config/processes.yaml.example b/cstatsd/config/processes.yaml.example new file mode 100644 index 0000000..d2b2b44 --- /dev/null +++ b/cstatsd/config/processes.yaml.example @@ -0,0 +1,15 @@ +interval: 5 + +processes: + radiotray: + name: '*python*' + args: + 1: /usr/bin/radiotray + + guake: + name: '*python*' + args: + 1: /usr/local/bin/guake + + keepassx: + name: keepassx