ELK is fun

As I do a lot of testing and CI in my day job, I took ELK for a test drive. (ELK stands for ElasticSearch, LogStash and Kibana)

I’ve played around with Logstash before, but it’s first time i’m trying Kibana.

I’ve look for a quick way to setup it, and there where gazillion hits in Docker Hub (I’ve forgot which one I’ve actually installed :)) EDIT: it was cyberabis/docker-elkauto.

That went quick quickly, with a small change that I’ve used port 81

Next thing I wanted to feed it with some data, good thing someone else added some key=value files to our CI just a few days ago (Thanks Mike). And I had first real data in to play with in matter of minutes

That’s was not enough for me, I wanted all the information from my tests. Seem like the junit-xml output has all the information I wanted. I hated the idea of running something after the tests are running, I wanted it to run automatic on any test I’ll run with py.test.

Took me some time to figure it out which py.test hook fits the best, and here’s how it can be done:

    # content of conftest.py
    def collect_ec_data():
        """
        if we run in EC, collect all data we need
        if not in EC, collect only commit and commit_sha
        :return: dict
        """

        # lets assume you don't really want to know what we save in our setup :)
        return data_collected

    def send_to_elk(item):
        """
        send `item` to Logstash port
        :param item: dict
        :return: None
        """
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect((TCP_IP, TCP_PORT))
        logger.info("Sending to %s:%d", TCP_IP, TCP_PORT)
        s.sendall(json.dumps(item) + '\n')
        s.close()

    def find_test_data(junit_xml_filename):
        """
        flatten all the data from junit based xml file into a list of dicts

        :param junit_xml_filename: the output from py.test --junit-xml
        :return:
        """

        tests_list = []

        xml_test = open(junit_xml_filename, 'r').read()

        et = ElementTree.fromstring(xml_test)
        for test in et.iter('testcase'):

            item = dict(**test.attrib)
            outcome = None
            failure = test.find("failure")
            error = test.find("error")
            skipped = test.find("skipped")

            if failure is not None:
                item['failure_message'] = failure.text
                outcome = 'failure'
            elif skipped is not None:
                item['skip_message'] = skipped.attrib['message']
                outcome = 'skipped'
            elif error is not None:
                item['error_message'] = error.text
                outcome = 'error'
            else:
                outcome = 'success'

            item['outcome'] = outcome
            item['run_time'] = item['time']
            # time is a built in in Logstash
            del item['time']

            tests_list.append(item)

        # filter by group, and merge result
        cumulative_keys = ['outcome', 'run_time']
        tests_list_grouped = []
        for key, group in groupby(tests_list, lambda x: (x['name'], x['classname'])):
            item = dict()
            for thing in group:
                # merge needed values
                if item.has_key('outcome'):
                    item['outcome'] += " & " + thing['outcome']

                if item.has_key('run_time'):
                    item['run_time'] += thing['run_time']

                # overwrite the rest
                for key in thing.keys():
                    if key not in cumulative_keys or not item.has_key(key):
                        item[key] = thing[key]
            tests_list_grouped.append(item)

        return tests_list_grouped

    def pytest_unconfigure():

        junit_file = pytest.config.option.xmlpath
        ec_data = collect_ec_data()
        log_dir = os.environ.get('LOG_DEST_DIR', './')

        if junit_file:  # if found xml file, send all of it
            tests_list = find_test_data(junit_file)
            for i, test in enumerate(tests_list):
                item = dict(**ec_data)
                item.update(test)

                # save data to a file
                with open(os.path.join(log_dir, 'job_info%d.json' % i), 'w+') as f:
                    f.write(json.dumps(item, sort_keys=True,
                                       indent=4, separators=(',', ': ')))

                send_to_elk(item)

Just for reference, to show how simple is the logstash configuration, almost untouched:

input {
        tcp { port => 3333 type => "text event"}
        tcp { port => 3334 type => "json event" codec => json_lines {} }
}
filter {
        if [createTime] {
                date {
                        match => [ "createTime", "ISO8601" ]
                        target => "@timestamp"
                }
        }

}
output { elasticsearch { host => localhost } }

ELK is a great combo, me like a lot…

Published: October 30 2014

blog comments powered by Disqus