Set up publish/subscribe mechanism, add example configurations, write IRC bot and alarm management mechanism

develop
Sven Slootweg 10 years ago
parent 56ae6b5305
commit 97290dbb1c

1
.gitignore vendored

@ -1,5 +1,6 @@
cstatsd/config/*.yaml cstatsd/config/*.yaml
ccollectd/config.yaml ccollectd/config.yaml
alert/config.yaml
*.pyc *.pyc
ccollectd/pubkey.dat ccollectd/pubkey.dat
ccollectd/privkey.dat ccollectd/privkey.dat

@ -0,0 +1,298 @@
#!/usr/bin/env python2
import socket, yaml, random, zmq, msgpack, time, uuid, fnmatch
import cPickle as pickle
ctx = zmq.Context()
with open("config.yaml", "r") as cfile:
config = yaml.safe_load(cfile)
try:
with open("rules.pickle", "r") as pfile:
rules = pickle.load(pfile)
except IOError, e:
rules = {}
fetcher = ctx.socket(zmq.SUB)
fetcher.setsockopt(zmq.SUBSCRIBE, "")
fetcher.connect("ipc:///tmp/ccollectd-stats")
class Bot(object):
def __init__(self, hosts, port, nickname, realname, channels, admins, subsock):
self.hosts = hosts
self.port = port
self.nickname = nickname
self.realname = realname
self.channels = channels
self.admins = admins
self.subsock = subsock
self.connected = False
self.last_down = {}
self.known_alarms = {}
self.command_map = {
"422": self.join_all_channels,
"376": self.join_all_channels,
"PRIVMSG": self.receive_message
}
def split_irc(self, message):
if message[0] == ":":
prefix = ":"
message = message[1:]
else:
prefix = ""
if ":" in message:
rest, last = message.split(":", 1)
parts = rest.strip().split() + [last]
else:
parts = message.split()
parts[0] = prefix + parts[0]
return parts
def run(self):
while True: # Connect loop
host = random.choice(self.hosts)
self.sock = socket.socket()
try:
self.sock.connect((host, self.port))
except socket.error, e:
continue # Reconnect
self.send_raw("NICK %s" % self.nickname)
self.sock.send("USER %s 0 0 :%s\r\n" % (self.nickname, self.realname))
buff = ""
while True: # Read loop
r, w, x = zmq.select([self.sock, self.subsock], [], [])
for s in r:
if s == self.sock.fileno():
try:
recvdata = self.sock.recv(1024)
except socket.error, e:
break # Something went wrong, reconnect...
if len(recvdata) == 0:
break # We have disconnected...
buff += recvdata
messages = buff.split("\n")
buff = messages.pop()
for message in messages:
self.process_message(self.split_irc(message.strip("\r")))
elif self.subsock.getsockopt(zmq.EVENTS) & zmq.POLLIN != 0:
# Process incoming data from the subscribe socket...
message = msgpack.unpackb(s.recv())
self.process_stats(message)
def send_raw(self, message):
self.sock.send("%s\r\n" % message)
def send_message(self, recipient, message):
if self.connected == True:
self.send_raw("PRIVMSG %s :%s" % (recipient, message))
def send_all(self, message):
for channel in self.channels:
self.send_message(channel, message)
def join(self, channel):
self.send_raw("JOIN %s" % channel)
def join_all_channels(self, message):
self.connected = True
for channel in self.channels:
self.join(channel)
def receive_message(self, message):
args = message[3].split()
sender = message[0][1:].split("!", 1)[0]
channel = message[2]
try:
if sender in self.admins:
if args[0] == "!addrule":
target, rel, value = args[1:4]
target = self.parse_target(target)
rule_id = uuid.uuid4()
rules[rule_id] = {
"target": target,
"operator": rel,
"value": value
}
with open("rules.pickle", "w") as pfile:
pickle.dump(rules, pfile)
self.send_message(channel, "Added rule for %s with ID %s." % (args[1], rule_id))
except Exception, e:
self.send_message(channel, str(e))
def parse_target(self, target):
host, rest = target.split("!", 1)
service, rest = rest.split(".", 1)
resource, rest = rest.split(":", 1)
unit, attribute = rest.split(".", 1)
# TODO: unit = unit.split("(", 1)[0].strip() # Allow () for comments
if host == "*":
host = True
if service == "*":
service = True
if attribute == "*":
attribute = True
if resource == "*":
resource = True
if unit == "*":
unit = True
return {
"host": host,
"service": service,
"resource": resource,
"unit": unit,
"attribute": attribute
}
def format_time_duration(self, seconds):
# http://stackoverflow.com/a/20222351/1332715
days, rem = divmod(seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, seconds = divmod(rem, 60)
if seconds < 1:
seconds = 1
locals_ = locals()
magnitudes_str = ("{n} {magnitude}".format(n=int(locals_[magnitude]), magnitude=magnitude) for magnitude in ("days", "hours", "minutes", "seconds") if locals_[magnitude])
return ", ".join(magnitudes_str)
def process_stats(self, message):
data = message["message"]
data["host"] = message["host"]
if data["msg_type"] == "up" and data["initial"] == True:
return # We don't need to say what is up, initially...
# TODO: Duration
if data["msg_type"] == "up":
try:
data["duration"] = self.format_time_duration(time.time() - self.last_down["%(host)s!%(service)s.%(unit)s" % data])
except KeyError, e:
data["duration"] = "0 seconds"
self.send_all("\x02\x030,3 [ UP ] \x03\x02 Service \x032%(service)s\x03 on host \x037%(host)s\x03 reports that \x036%(unit)s\x03 is now back up. It was down for %(duration)s." % data)
elif data["msg_type"] == "down":
self.last_down["%(host)s!%(service)s.%(unit)s" % data] = time.time()
self.send_all("\x02\x030,4 [ DOWN ] \x03\x02 Service \x032%(service)s\x03 on host \x037%(host)s\x03 reports that \x036%(unit)s\x03 is \x02down!\x02" % data)
elif data["msg_type"] == "value":
for rule_id, rule in rules.iteritems():
check_vals = {
"host": [data["host"]],
"service": [data["service"]],
"resource": [data["resource_type"]],
"unit": [data["unit"]]
}
for segment in ("host", "service", "resource", "unit"):
for val in check_vals[segment]:
if rule["target"][segment] is not True and not fnmatch.fnmatch(val, rule["target"][segment]):
continue
# We haven't broken out in the past bit of code, so we're still matching the pattern...
eligible_keys = [key for key in data["values"].keys() if fnmatch.fnmatch(key, rule["target"]["attribute"])]
for key in eligible_keys:
value = data["values"][key]
rule_value = float(rule["value"])
operator = rule["operator"]
if operator == "=":
alarm = (value == rule_value)
elif operator == ">":
alarm = (value > rule_value)
elif operator == "<":
alarm = (value < rule_value)
elif operator == ">=":
alarm = (value >= rule_value)
elif operator == "<=":
alarm = (value <= rule_value)
elif operator == "!=":
alarm = (value != rule_value)
else:
alarm = False
print value, operator, rule_value, alarm
self.trigger_alarm(rule_id, data, alarm, value, key)
def trigger_alarm(self, rule_id, data, active, offending_value=None, offending_key=None):
key = "%s/%s/%s/%s" % (rule_id, data["host"], data["unit"], offending_key)
if key not in self.known_alarms:
if active:
self.transmit_alarm(rule_id, data, active, offending_value, offending_key)
print "ALARM ACTIVE, UNKNOWN ENTRY"
self.known_alarms[key] = time.time()
else:
self.known_alarms[key] = False
print "ALARM STOP, UNKNOWN ENTRY"
else:
if self.known_alarms[key] == False and active:
# Alarm activated
self.transmit_alarm(rule_id, data, active, offending_value, offending_key)
print "ALARM ACTIVE, EXISTING ENTRY"
self.known_alarms[key] = time.time()
elif self.known_alarms[key] != False and not active:
# Alarm deactivated
self.transmit_alarm(rule_id, data, active, offending_value, offending_key)
print "ALARM STOP, EXISTING ENTRY"
self.known_alarms[key] = False
def transmit_alarm(self, rule_id, data, active, offending_value=None, offending_key=None):
# At this point, we're sure that we want to notify...
rule_target = rules[rule_id]["target"].copy()
for k, v in rule_target.iteritems():
if v is True:
rule_target[k] = "*"
rule_pattern = "%(host)s!%(service)s.%(resource)s:%(unit)s.%(attribute)s" % rule_target
info = {
"host": data["host"],
"rule_id": rule_id,
"rule_pattern": rule_pattern
}
if not active:
key = "%s/%s/%s/%s" % (rule_id, data["host"], data["unit"], offending_key)
try:
info["duration"] = self.format_time_duration(time.time() - self.known_alarms[key])
except KeyError, e:
info["duration"] = "0 seconds"
info["unit"] = data["unit"]
info["attribute"] = offending_key
self.send_all("\x02\x030,3 [ SOLVED ] \x03\x02 Host \x037%(host)s\x03 reports that the alarm for rule %(rule_id)s (\x036%(rule_pattern)s\x03) was resolved for \x034%(unit)s\x03.\x034%(attribute)s\x03. It was active for %(duration)s." % info)
else:
info["value"] = offending_value
info["spec"] = "%s %s" % (rules[rule_id]["operator"], rules[rule_id]["value"])
info["unit"] = data["unit"]
info["attribute"] = offending_key
self.send_all("\x02\x030,7 [ ALARM ] \x03\x02 Host \x037%(host)s\x03 reports that an alarm was triggered for rule %(rule_id)s (\x036%(rule_pattern)s\x03). The reported value was\x034 %(value)s\x03 for\x034 %(unit)s\x03.\x034%(attribute)s\x03 , triggering the \x032%(spec)s\x03 condition." % info)
def process_message(self, message):
if message[0].upper() == "PING":
self.send_raw("PONG %s" % message[1])
else:
try:
self.command_map[message[1].upper()](message)
except KeyError, e:
pass
bot = Bot(config["irc"]["hosts"], config["irc"]["port"], config["irc"]["nickname"], config["irc"]["realname"], config["irc"]["channels"], config["irc"]["admins"], fetcher)
bot.run()

@ -0,0 +1,12 @@
irc:
hosts:
- kerpia.cryto.net
- box.cryto.net
- arvel.cryto.net
port: 6667
nickname: StatusBot
realname: Cryto System Monitoring Service
admins:
- joepie91
channels:
- "#test"

@ -45,5 +45,8 @@ while True:
# Probably a spoofed message... skip to next socket # Probably a spoofed message... skip to next socket
sys.stderr.write("Ignoring message... spoofed?\n") # FIXME: Use logging module... sys.stderr.write("Ignoring message... spoofed?\n") # FIXME: Use logging module...
continue continue
print "%s: %s" % (host, message) distributor.send(msgpack.packb({
"host": host,
"message": message
}))

@ -0,0 +1,6 @@
nodes:
localhost:
ip: 127.0.0.1
port: 6543
endpoint: tcp://127.0.0.1:6543
pubkey: bd784ef4065c9bd31627106dc55e26764605a144c6fc45ce93f33cbd19dd7333

@ -0,0 +1,2 @@
endpoint: tcp://*:6543
pubkey: a266a0634790a79c6934385892f7c377d35b8f03b9c6ac7d5bfed4a94f93ba65

@ -0,0 +1,15 @@
interval: 5
processes:
radiotray:
name: '*python*'
args:
1: /usr/bin/radiotray
guake:
name: '*python*'
args:
1: /usr/local/bin/guake
keepassx:
name: keepassx
Loading…
Cancel
Save