Category: ELK

ELK Monitoring

We have a number of logstash servers gathering data from various filebeat sources. We’ve recently experienced a problem where the pipeline stops getting data for some of those sources. Not all — and restarting the non-functional filebeat source sends data for ten minutes or so. We were able to rectify the immediate problem by restarting our logstash services (IT troubleshooting step #1 — we restarted all of the filebeats and, when that didn’t help, moved on to restarting the logstashes)

But we need to have a way to ensure this isn’t happening — losing days of log data from some sources is really bad. So I put together a Python script to verify there’s something coming in from each of the filebeat sources.

pip install elasticsearch==7.13.4

#!/usr/bin/env python3
#-*- coding: utf-8 -*-
# Disable warnings that not verifying SSL trust isn't a good idea
import requests
requests.packages.urllib3.disable_warnings()

from elasticsearch import Elasticsearch
import time

# Modules for email alerting
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText


# Config variables
strSenderAddress = "devnull@example.com"
strRecipientAddress = "me@example.com"
strSMTPHostname = "mail.example.com"
iSMTPPort = 25

listSplunkRelayHosts = ['host293', 'host590', 'host591', 'host022', 'host014', 'host135']
iAgeThreashold = 3600 # Alert if last document is more than an hour old (3600 seconds)

strAlert = None

for strRelayHost in listSplunkRelayHosts:
	iCurrentUnixTimestamp = time.time()
	elastic_client = Elasticsearch("https://elasticsearchhost.example.com:9200", http_auth=('rouser','r0pAs5w0rD'), verify_certs=False)

	query_body = {
		"sort": {
			"@timestamp": {
				"order": "desc"
			}
		},
		"query": {
			"bool": {
				"must": {
					"term": {
						"host.hostname": strRelayHost
					}
				},
				"must_not": {
					"term": {
						"source": "/var/log/messages"
					}
				}
			}
		}
	}

	result = elastic_client.search(index="network_syslog*", body=query_body,size=1)
	all_hits = result['hits']['hits']

	iDocumentAge = None
	for num, doc in enumerate(all_hits):
		iDocumentAge =  (  (iCurrentUnixTimestamp*1000) - doc.get('sort')[0]) / 1000.0

	if iDocumentAge is not None:
		if iDocumentAge > iAgeThreashold:
			if strAlert is None:
				strAlert = f"<tr><td>{strRelayHost}</td><td>{iDocumentAge}</td></tr>"
			else:
				strAlert = f"{strAlert}\n<tr><td>{strRelayHost}</td><td>{iDocumentAge}</td></tr>\n"
			print(f"PROBLEM - For {strRelayHost}, document age is {iDocumentAge} second(s)")
		else:
			print(f"GOOD - For {strRelayHost}, document age is {iDocumentAge} second(s)")
	else:
		print(f"PROBLEM - For {strRelayHost}, no recent record found")


if strAlert is not None:
	msg = MIMEMultipart('alternative')
	msg['Subject'] = "ELK Filebeat Alert"
	msg['From'] = strSenderAddress
	msg['To'] = strRecipientAddress

	strHTMLMessage = f"<html><body><table><tr><th>Server</th><th>Document Age</th></tr>{strAlert}</table></body></html>"
	strTextMessage = strAlert

	part1 = MIMEText(strTextMessage, 'plain')
	part2 = MIMEText(strHTMLMessage, 'html')

	msg.attach(part1)
	msg.attach(part2)

	s = smtplib.SMTP(strSMTPHostname)
	s.sendmail(strSenderAddress, strRecipientAddress, msg.as_string())
	s.quit()

Debugging Filebeat

# Run filebeat from the command line and add debugging flags to increase verbosity of output
# -e directs output to STDERR instead of syslog
# -c indicates the config file to use
# -d indicates which debugging items you want -- * for all
/opt/filebeat/filebeat -e -c /opt/filebeat/filebeat.yml -d "*"

Python Logging to Logstash Server

Since we are having a problem with some of our filebeat servers actually delivering data over to logstash, I put together a really quick python script that connects to the logstash server and sends a log record. I can then run tcpdump on the logstash server and hopefully see what is going wrong.

import logging
import logstash
import sys

strHost = 'logstash.example.com'
iPort = 5048

test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
test_logger.addHandler(logstash.TCPLogstashHandler(host=strHost,port=iPort))

test_logger.info('May 22 23:34:13 ABCDOHEFG66SC03 sipd[3863cc60] CRITICAL One or more Dns Servers are currently unreachable!')
test_logger.warning('May 22 23:34:13 ABCDOHEFG66SC03 sipd[3863cc60] CRITICAL One or more Dns Servers are currently unreachable!')
test_logger.error('May 22 23:34:13 ABCDOHEFG66SC03 sipd[3863cc60] CRITICAL One or more Dns Servers are currently unreachable!')

ElasticSearch Analyzer

Analyzer Components

Character filters are the first component of an analyzer. They can remove unwanted characters – this could be html tags (“char_filter”: [“html_strip”]) or some custom replacement – or change character(s) into other character(s). Output from the character filter is passed to the tokenizer.

The tokenizer breaks the string out into individual components (tokens). A commonly used tokenizer is the whitespace tokenizer which uses whitespace characters as the token delimiter. For CSV data, you could build a custom pattern tokenizer with “,” as the delimiter.

Then token filters removes anything deemed unnecessary. The standard token filter applies a lower-case function too – so NOW, Now, and now all produce the same token.

Testing an analyzer

You can one-off analyze a string using any of the

curl -u “admin:admin” -k -X GET https://localhost:9200/_analyze –header ‘Content-Type: application/json’ –data ‘

“analyzer”:”standard”,

“text”: “THE QUICK BROWN FOX JUMPED OVER THE LAZY DOG’\”S BACK 1234567890″

}’

Specifying different analyzers produces different tokens

It’s even possible to define a custom analyzer in an index – you’ll see this in the index configuration. Adding character mappings to a custom filter – the example used in Elastic’s documentation maps Arabic numbers to their European counterparts – might be a useful tool in our implementation. One of the examples is turning ASCII emoticons into emotional descriptors (_happy_, _sad_, _crying_, _raspberry_, etc) that would be useful in analyzing customer communications. In log processing, we might want to map phrases into commonly used abbreviations (not a real-world example, but if programmatic input spelled out “self-contained breathing apparatus”, I expect most people would still search for SCBA if they wanted to see how frequently SCBA tanks were used for call-outs). It will be interesting to see how frequently programmatic input doesn’t line up with user expectations to see if character mappings will be beneficial.

In addition to testing individual analyzers, you can test the analyzer associated to an index – instead of using the /_analyze endpoint, use the /indexname/_analyze endpoint.

 

Resetting Lost/Forgotten ElasticSearch Admin Passwords

There are a few ways to reset the password on an individual account … but they require you to have a known password. But what about when you don’t have any good passwords? (You might be able to read your kibana.yml and get a known good password, so that would be a good place to check). Provided you have OS access, just create another superuser account using the elasticsearch-users binary:

/usr/share/elasticsearch/bin/elasticsearch-users useradd ljradmin -p S0m3pA5sw0Rd -r superuser

You can then use curl to the ElasticSearch API to reset the elastic account password

curl -s --user ljradmin:S0m3pA5sw0Rd -XPUT "http://127.0.0.1:9200/_xpack/security/user/elastic/_password" -H 'Content-Type: application/json' -d'
{
"password" : "N3wPa5sw0Rd4ElasticU53r"
}
'

 

ElasticSearch ILM – Data Lifecycle

The following defines a simple data lifecycle policy we use for event log data.

Immediately, the data is in the “hot” phase.

After one day, it is moved to the “warm” phase where the number of segments is compressed to 1 (lots-o-segments are good for writing, but since we’re dealing with timescale stats & log data [i.e. something that’s not being written to the next day], there is no need to optimize write performance. The index will be read only, thus can be optimized for read performance). After seven days, the index is frozen (mostly moved out of memory) as in this use case, data generally isn’t used after a week. Thus, there is no need to fill up the server’s memory to speed up access to unused data elements. Since freeze is deprecated in a future version (due to improvements in memory utilization that should obsolete freezing indices), we’ll need to watch our memory usage after upgrading to ES8.

Finally, after fourteen days, the data is deleted.

To use the policy, set it as the template on an index:

Upon creating a new index (ljrlogs-5), the ILM policy has been applied:

Upgrading ElasticSearch – From 7.6 to 7.17

Before upgrading to 8, you must be running at least version 7.17 … so I am first upgrading my ES7 to a new enough version that upgrading to ES8 is possible.

Environment

Not master eligible nodes:
a6b30865c82c.example.com
a6b30865c83c.example.com

Master eligible nodes:
a6b30865c81c.example.com

 

  1. Disable shard allocation

PUT _cluster/settings{  "persistent": {    "cluster.routing.allocation.enable": "primaries"  }}

 

  1. Stop non-essential indexing and flush

POST _flush/synced

  1. Upgrade the non-master eligible nodes first then the master-eligible nodes. One at a time, SSH to the host and upgrade ES
    a. Stop ES

systemctl stop elasticsearch
b. Install the new RPM:
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.3-x86_64.rpm
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.3-x86_64.rpm.sha512
shasum -a 512 -c elasticsearch-7.17.3-x86_64.rpm.sha512


rpm -U elasticsearch-7.17.3-x86_64.rpm

c. Update configuration for new version
vi /usr/lib/tmpfiles.d/elasticsearch.conf


vi /etc/elasticsearch/elasticsearch.yml # Add the action.auto_create_index as required -- * for all, or you can restrict auto-creation to certain indices

d. Update unit file and start services
systemctl daemon-reload
systemctl enable elasticsearch
systemctl start elasticsearch.service

  1. On the Kibana server, upgrade Kibana to a matching version:systemctl stop kibana
    wget https://artifacts.elastic.co/downloads/kibana/kibana-7.17.3-x86_64.rpm
    rpm -U kibana-7.17.3-x86_64.rpm
    sytemctl daemon-reload
    systemctl enable kibana
    systemctl start kibana
  2. Access the Kibana console and ensure the upgraded node is back online

  1. Re-enable shard allocation

PUT _cluster/settings{"persistent": {"cluster.routing.allocation.enable": null }}

Kibana – Defining Index Patterns

This isn’t something I will notice again now that I’ve got my heap space sorted … but when you are using Kibana to add index patterns, there’s nothing that waits until you’ve stopped typing for x seconds or implements a minimum characters before searching. So, after each character you type into the dialog, a search is performed against the ES server — here I wanted to create a pattern for logstash-* — which sent nine different search requests to my ES. And, if the server is operating reasonably, all of these searches would deliver a result set that Kibana essentially ignores as I’m tying more characters. But, in my case, the ES server fell over before I could even type logstash!

ElasticSearch Java Heap Size — Order of Precedence

I was asked to look at a malfunctioning ElasticSearch server this week. It’s a lab sandbox, so not a huge deal … but still something they wanted online and functioning for some proof-of-concept testing. And, as a bonus, they’re willing to let us use their lab servers for our own sandboxing (IPv6 implementation, ELK upgrades). There were a handful of problems (it looks like the whole thing used to be a multi-node cluster but the second cluster node has vanished without any trace, the entire platform was re-IP’d, vm.max_map_count wasn’t set on the Docker server, and the logstash folder had a backup of a pipeline config in /path/to/logstash/pipeline/ … which still loads, so causes a continual stream of port-in-use exceptions). But the biggest problem was that any attempt at actually using the ElasticSearch server resulted in it falling over. Java crashed with an out of memory error because the heap space was exhausted. Now I’ve had really small sandboxes before — my first ES sandbox only had a gig of memory and was quite prone to this crash. But the lab server they’ve set up has 64GB of memory. So allocating a few gigs seemed like a quick solution.

The jvm.options file and jvm.options.d folder weren’t mounted into the container — they were the default files held within the container. Which seemed odd, and I made a mental note that it was something we’d need to either mount in or update again when the container gets updated. But no matter how much heap space I allocated, ES crashed.

I discovered that the Docker deployment set an ENV variable for ES_JAVA_OPTS — something which, per the ElasticSearch documentation, overrides all other JVM options. So no matter what I was putting into the jvm options file, the 256 meg set in the ENV was actually being used.

Luckily it’s not terribly difficult to modify the ENV’s within an existing container. You could, of course, redeploy the container with the new settings (and I’ll do that next time, since I’ve also got to get IPv6 enabled). But I wasn’t planning on making any other changes.