import socket
import json
from random import choice
from six.moves.xmlrpc_client import ServerProxy as XMLRPCServer
import redis
from tornado.iostream import StreamClosedError
from tornado.tcpclient import TCPClient
from tornado.httpclient import AsyncHTTPClient, HTTPError
from tornado import gen, escape
from . import logger
try:
import msgpack
except ImportError:
msgpack = None
[docs]class Checker(object):
"""Base class for all checkers. Subclass this class to define your
own checkers.
"""
[docs] def check(self):
"""Override this method to check the status of an arbitrary
system. This method should return a boolean, or at least
something that behaves sensibly as one.
It is highly recommended, though not required, to implement
:meth:`check` as a Tornado coroutine in order to minimize
blocking.
"""
raise NotImplementedError("The check method must be implemented.")
[docs]class DummyChecker(Checker):
"""A fake checker used for testing. This will either randomly
return True or False, or always return one or the other if the
keyword argument ``return_value`` is set.
"""
def __init__(self, return_value=None):
assert isinstance(return_value, bool) or return_value is None
self.return_value = return_value
def check(self):
if self.return_value is None:
return choice([True, False])
else:
return self.return_value
[docs]class PortChecker(Checker):
"""Check if a TCP port is open.
Note that this is somewhat of a hack and will only work properly
if firewall rules allow it.
"""
def __init__(self, host, port):
"""Configure a port checker.
:param str host:
:param int port:
"""
assert isinstance(host, str)
assert isinstance(port, int)
self.host = host
self.port = port
@gen.coroutine
def check(self):
status = False
try:
conn = yield TCPClient().connect(self.host, self.port)
conn.close()
status = True
except socket.gaierror:
pass
except StreamClosedError:
pass
except Exception as e:
logger.error('Exception: ' + str(e))
raise gen.Return(status)
[docs]class HttpChecker(Checker):
"""Check if an HTTP response (for a GET request) returns an
expected HTTP status code.
:param str url: URL to check
:param float timeout: timeout in seconds
:param int code: Expected HTTP status code (default: 200)
"""
def __init__(self, url, timeout=1.5, code=200):
assert isinstance(url, str)
assert isinstance(timeout, (int, float))
assert isinstance(code, int)
self.url = url
self.timeout = timeout
self.code = code
@gen.coroutine
def check(self):
try:
res = yield AsyncHTTPClient().fetch(self.url,
raise_error=False,
request_timeout=self.timeout)
ok = True if res.code == self.code else False
except socket.gaierror:
ok = False
except HTTPError:
ok = False
raise gen.Return(ok)
[docs]class SupervisorChecker(Checker):
"""Check that a process running via a supervisord instance is
running.
See the supervisor_ documentation on how to configure
``supervisord`` to accept XMLRPC requests over HTTP.
.. _supervisor: http://supervisord.org/
:param str host: hostname of the XMLRPC server
:param str name: the process name to check
"""
def __init__(self, host, name):
assert isinstance(host, str)
self.server = XMLRPCServer(host)
assert isinstance(name, str)
self.name = name
@gen.coroutine
def check(self):
# TODO: make async query
try:
reply = self.server.supervisor.getProcessInfo(self.name)
if 'statename' in reply:
result = True if reply['statename'] == 'RUNNING' else False
except Exception as error:
logger.error(str(error))
result = False
raise gen.Return(result)
[docs]class JsonChecker(Checker):
"""Check that one or more keys has a particular value in an HTTP
JSON response.
If multiple keys/values are specified, the result of the check
will only be True if *all* key/value pairs match the input
dictionary.
:param str url: URL to get JSON response from
:param dict dictionary: compare keys/values with the JSON response
"""
def __init__(self, url, dictionary):
assert isinstance(url, str)
assert isinstance(dictionary, dict)
self.url = url
self.dictionary = dictionary
@gen.coroutine
def check(self):
result = True
try:
res = yield AsyncHTTPClient().fetch(self.url)
except socket.gaierror:
logger.warn('Unable to connect to ' + self.url)
result = False
try:
obj = json.loads(escape.to_basestring(res.body))
except ValueError:
logger.error('No JSON found at URL ' + self.url)
result = False
for key in self.dictionary:
try:
if obj[key] != self.dictionary[key]:
result = False
except KeyError:
result = False
raise gen.Return(result)
[docs]class RedisChecker(Checker):
"""Check that one or more keys have certain values in a Redis
database.
:param str host: Redis hostname
:param dict dictionary: dictionary to match Redis keys/values
:param **redis_kwargs: kwargs to pass to the Redis constructor
"""
def __init__(self, host, dictionary, **redis_kwargs):
self.db = redis.StrictRedis(host=host, **redis_kwargs)
assert isinstance(dictionary, dict)
self.dictionary = dictionary
def check(self):
result = True
for key in self.dictionary:
try:
value = self.db.get(key).decode()
if self.dictionary[key] != value:
result = False
except KeyError:
result = False
return result
[docs]class SerializedRedisChecker(RedisChecker):
"""Check multiple key/value pairs that are serialized into a single
Redis key/value pair with either JSON or msgpack.
"""
def __init__(self, host, key, packing, dictionary, **redis_kwargs):
super(SerializedRedisChecker, self).__init__(
host, dictionary, **redis_kwargs)
self.key = key
packing = packing.lower()
assert packing in ['json', 'msgpack']
if packing == 'msgpack' and msgpack is None:
raise RuntimeError(
'Please install msgpack for unpacking msgpack-packed things.')
self.packing = packing
def check(self):
result = False
packed = self.db.get(self.key)
if self.packing == 'json':
unpacked = json.loads(packed.decode())
else:
unpacked = msgpack.loads(packed, encoding='utf-8')
for key in self.dictionary:
try:
if self.dictionary[key] == unpacked[key]:
result = True
except KeyError:
logger.error("Invalid key: " + key)
return result
# Aliases for convenience
HTTPChecker = HttpChecker
JSONChecker = JsonChecker