ELK & Nagios Part 2: How to get your logs from Redis to Elasticsearch

- log-management log logstash websphere

In the last part we collected the application logs and transferred them to Redis, our high performance messaging queue.

In this part we want to get the logs/messages out of Redis, filter them, cut them in pieces/fields and finally transfer them to our index database Elasticsearch.

We will use Logstash to normalize and enrich our data and to parse it to Elasticsearch.

To get the data out of Redis, we have to define an input plugin, fortunately Logstash comes with an input plugin for Redis, we just have to point it to the Redis server/container and the used Redis port:

input {
 redis {
 host => "dockerhost.webgate.intern"
 port => "32768"
 data_type => "list"
 key => "filebeat"
 tags => [ "filebeat", "redis" ]
 }
}

Now we need to use filter plugins to filter the data and cut it into pieces/fields and to enrich it. Following is an logstash filter example for the WebSphere logs, I used Stoeps filter and adapted it to my needs:

filter {
 if [type] =~ "websphere" {
 grok {
 match => ["source", "%{GREEDYDATA}/%{GREEDYDATA:server_name}/SystemOut.log"]
 }
 grok {
 match => ["message", "\[%{DATA:wastimestamp} %{WORD:tz}\] %{BASE16NUM:was_threadID} (?<was_shortname>\b[A-Za-z0-9\$]{2,}\b) %{SPACE}%{WORD:was_loglevel}%{SPACE} %{GREEDYDATA:was_msg}"]
 }
 grok {
 match => ["was_msg", "(?<was_errcode>[A-Z0-9]{9,10})[:,\s\s]%{GREEDYDATA:was_msg}"]
 overwrite => [ "was_msg" ]
 tag_on_failure => [ ]
 }
 translate {
 field => "tz"
 destination => "tz_num"
 dictionary => [
 "CET", "+0100",
 "CEST", "+0200",
 "EDT", "-0400"
 ]
 }
 translate {
 field => "was_errcode"
 destination => "was_application"
 regex => "true"
 exact => "true"
 dictionary => [
 "CLFRW", "Search",
 "CLFRA", "Activities",
 "CLFRS", "Blogs",
 "CLFRL", "Bookmarks",
 "CLFRK", "Common",
 "CLFRM", "Communities",
 "EJPVJ", "Files",
 "CLFRV", "Forums",
 "CLFRQ", "Homepage",
 "CLFRP", "Installer",
 "CLFRO", "Configuration",
 "CLFRR", "Notifications",
 "CLFNF", "Portlet",
 "CLFRT", "FedSearch",
 "CLFWX", "News",
 "CLFWY", "Event",
 "CLFWZ", "Widget",
 "CLFRN", "Profiles",
 "CLFWY", "User",
 "EJPIC", "Portal",
 "EJPVJ", "Wikis",
 "ADMS", "Websphere",
 "SECJ", "Security"
 ]
 }
 mutate {
 replace => ['timestamp', '%{wastimestamp} %{tz_num}']
 }
 date{
 match => ["timestamp", "MM/dd/YY HH:mm:ss:SSS Z", "M/d/YY HH:mm:ss:SSS Z"]
 tag_on_failure => [ ]
 }
 mutate {
 remove_field => [ 'tz', 'tz_num', 'wastimestamp' ]
 }
 }
}

Main difference is that I added a dictionary to translate error messages prefixes to WAS applications/components. As you can see we will get following fields out of the log messages:

These fields can later be used to easily visualize the log data with Kibana.  Further I created Logstash filters to catch my IBM DB2 and TDI data.

Last part is to use the Elasticsearch output plugin form Logstash to transfer the data to the index db. This is again pretty straight forward, I just have to point to the server/container IP and port:

output {
elasticsearch {
 hosts => ["dockerhost.webgate.intern:9200"]
 }
}

In the next part I will show you how to visualize this data and how to use it in combination with Elastalert, stay tuned.