Category: ELK

Using FileBeat to Send Data to ElasticSearch via Logstash

Before sending data, you need a pipleline on logstash to accept the data. If you are using an existing pipeline, you just need the proper host and port for the pipeline to use in the Filebeat configuration. If you need a new pipeline, the input needs to be of type ‘beats’

# Sample Pipeline Config:
input {
  beats   {
    host => "logstashserver.example.com"
    port => 5057
    client_inactivity_timeout => "3000"
  }
}

filter {
  grok{
     match => {"message"=>"\[%{TIMESTAMP_ISO8601:timestamp}] %{DATA:LOGLEVEL} \[Log partition\=%{DATA:LOGPARTITION}, dir\=%{DATA:KAFKADIR}\] %{DATA:MESSAGE} \(%{DATA:LOGSOURCE}\)"}
  }
}

output {
  elasticsearch {
    action => "index"
    hosts => ["https://eshost.example.com:9200"]
    ssl => true
    cacert => ["/path/to/certs/CA_Chain.pem"]
    ssl_certificate_verification => true
    user =>"us3r1d"
    password => "p@s5w0rd"
    index => "ljrkafka-%{+YYYY.MM.dd}"
  }
}

 

Download the appropriate version from https://www.elastic.co/downloads/past-releases#filebeat – I am currently using 7.17.4 as we have a few CentOS + servers.

Install the package (rpm -ihv filebeat-7.17.4-x86_64.rpm) – the installation package places the configuration files in /etc/filebeat and the binaries and other “stuff” in /usr/share/filebeat

Edit /etc/filebeat/filebeat.yml

    • Add inputs for log paths you want to monitor (this may be done under the module config if using a module config instead)
    • Add an output for Logstash to the appropriate port for your pipeline:
      output.logstash:
      hosts: [“logstashhost.example.com:5055”]

Run filebeat in debug mode from the command line and watch for success or failure.
filebeat -e -c /etc/filebeat/filebeat.yml -d "*"

Assuming everything is running well, use systemctl start filebeat to run the service and systemctl enable filebeat to set it to launch on boot.

Filebeats will attempt to parse the log data and send a JSON object to the LogStash server. When you view the record in Kibana, you should see any fields parsed out with your grok rule – in this case, we have KAFKADIR, LOGLEVEL, LOGPARTITION, LOGSOURCE, and MESSAGE fields.

Using Logstash to Send Data to ElasticSearch

Create a logstash pipeline

  1. The quickest thing to do is copy the config of a similar use case and adjusted the pipeline port (and adjusted the ES destination index). But, if this is a unique scenario, build a new pipeline configuration. I am creating a TCP listener that receives data from Python using the python-logstash module. In this configuration, logstash will create the index as needed with YYYY-MM-dd appended to the base index name.
    Text

Description automatically generated
  2. Edit the pipelines.yml to register the config you just created
  3. Restart logstash to activate the new pipeline
  4. Use netstat -nap | grep `pidof java` to ensure the server is listening on the new port
  5. Add the port to the runtime firewalld rules and test that the port is functional (firewall-cmd –zone=public –add-port=5055/tcp)
  6. Assuming the runtime rule has not had any unexpected results, register a permanent firewalld rule (firewall-cmd –permanent –zone=public –add-port=5055/tcp)

We now have a logstash data collector ready. We next need to create the index templates in ES

  1. Log into Kibana
  2. Create an ILM policy – this policy rolls indices into the warm phase after 2 days and forces merge. It also deletes records after 20 days.
    { “policy”: { “phases”: { “hot”: { “min_age”: “0ms”, “actions”: { “set_priority”: { “priority”: 100 } } }, “warm”: { “min_age”: “2d”, “actions”: { “forcemerge”: { “max_num_segments”: 1 }, “set_priority”: { “priority”: 50 } } }, “delete”: { “min_age”: “20d”, “actions”: { “delete”: {} } } } } }
  3. Create an index template — define the number of replicas
  4. Send data through the pipeline – the index will get created per the template definitions and document(s) added to the index

 

ELK Monitoring

We have a number of logstash servers gathering data from various filebeat sources. We’ve recently experienced a problem where the pipeline stops getting data for some of those sources. Not all — and restarting the non-functional filebeat source sends data for ten minutes or so. We were able to rectify the immediate problem by restarting our logstash services (IT troubleshooting step #1 — we restarted all of the filebeats and, when that didn’t help, moved on to restarting the logstashes)

But we need to have a way to ensure this isn’t happening — losing days of log data from some sources is really bad. So I put together a Python script to verify there’s something coming in from each of the filebeat sources.

pip install elasticsearch==7.13.4

#!/usr/bin/env python3
#-*- coding: utf-8 -*-
# Disable warnings that not verifying SSL trust isn't a good idea
import requests
requests.packages.urllib3.disable_warnings()

from elasticsearch import Elasticsearch
import time

# Modules for email alerting
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText


# Config variables
strSenderAddress = "devnull@example.com"
strRecipientAddress = "me@example.com"
strSMTPHostname = "mail.example.com"
iSMTPPort = 25

listSplunkRelayHosts = ['host293', 'host590', 'host591', 'host022', 'host014', 'host135']
iAgeThreashold = 3600 # Alert if last document is more than an hour old (3600 seconds)

strAlert = None

for strRelayHost in listSplunkRelayHosts:
	iCurrentUnixTimestamp = time.time()
	elastic_client = Elasticsearch("https://elasticsearchhost.example.com:9200", http_auth=('rouser','r0pAs5w0rD'), verify_certs=False)

	query_body = {
		"sort": {
			"@timestamp": {
				"order": "desc"
			}
		},
		"query": {
			"bool": {
				"must": {
					"term": {
						"host.hostname": strRelayHost
					}
				},
				"must_not": {
					"term": {
						"source": "/var/log/messages"
					}
				}
			}
		}
	}

	result = elastic_client.search(index="network_syslog*", body=query_body,size=1)
	all_hits = result['hits']['hits']

	iDocumentAge = None
	for num, doc in enumerate(all_hits):
		iDocumentAge =  (  (iCurrentUnixTimestamp*1000) - doc.get('sort')[0]) / 1000.0

	if iDocumentAge is not None:
		if iDocumentAge > iAgeThreashold:
			if strAlert is None:
				strAlert = f"<tr><td>{strRelayHost}</td><td>{iDocumentAge}</td></tr>"
			else:
				strAlert = f"{strAlert}\n<tr><td>{strRelayHost}</td><td>{iDocumentAge}</td></tr>\n"
			print(f"PROBLEM - For {strRelayHost}, document age is {iDocumentAge} second(s)")
		else:
			print(f"GOOD - For {strRelayHost}, document age is {iDocumentAge} second(s)")
	else:
		print(f"PROBLEM - For {strRelayHost}, no recent record found")


if strAlert is not None:
	msg = MIMEMultipart('alternative')
	msg['Subject'] = "ELK Filebeat Alert"
	msg['From'] = strSenderAddress
	msg['To'] = strRecipientAddress

	strHTMLMessage = f"<html><body><table><tr><th>Server</th><th>Document Age</th></tr>{strAlert}</table></body></html>"
	strTextMessage = strAlert

	part1 = MIMEText(strTextMessage, 'plain')
	part2 = MIMEText(strHTMLMessage, 'html')

	msg.attach(part1)
	msg.attach(part2)

	s = smtplib.SMTP(strSMTPHostname)
	s.sendmail(strSenderAddress, strRecipientAddress, msg.as_string())
	s.quit()

Debugging Filebeat

# Run filebeat from the command line and add debugging flags to increase verbosity of output
# -e directs output to STDERR instead of syslog
# -c indicates the config file to use
# -d indicates which debugging items you want -- * for all
/opt/filebeat/filebeat -e -c /opt/filebeat/filebeat.yml -d "*"

Python Logging to Logstash Server

Since we are having a problem with some of our filebeat servers actually delivering data over to logstash, I put together a really quick python script that connects to the logstash server and sends a log record. I can then run tcpdump on the logstash server and hopefully see what is going wrong.

import logging
import logstash
import sys

strHost = 'logstash.example.com'
iPort = 5048

test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
test_logger.addHandler(logstash.TCPLogstashHandler(host=strHost,port=iPort))

test_logger.info('May 22 23:34:13 ABCDOHEFG66SC03 sipd[3863cc60] CRITICAL One or more Dns Servers are currently unreachable!')
test_logger.warning('May 22 23:34:13 ABCDOHEFG66SC03 sipd[3863cc60] CRITICAL One or more Dns Servers are currently unreachable!')
test_logger.error('May 22 23:34:13 ABCDOHEFG66SC03 sipd[3863cc60] CRITICAL One or more Dns Servers are currently unreachable!')

ElasticSearch Analyzer

Analyzer Components

Character filters are the first component of an analyzer. They can remove unwanted characters – this could be html tags (“char_filter”: [“html_strip”]) or some custom replacement – or change character(s) into other character(s). Output from the character filter is passed to the tokenizer.

The tokenizer breaks the string out into individual components (tokens). A commonly used tokenizer is the whitespace tokenizer which uses whitespace characters as the token delimiter. For CSV data, you could build a custom pattern tokenizer with “,” as the delimiter.

Then token filters removes anything deemed unnecessary. The standard token filter applies a lower-case function too – so NOW, Now, and now all produce the same token.

Testing an analyzer

You can one-off analyze a string using any of the

curl -u “admin:admin” -k -X GET https://localhost:9200/_analyze –header ‘Content-Type: application/json’ –data ‘

“analyzer”:”standard”,

“text”: “THE QUICK BROWN FOX JUMPED OVER THE LAZY DOG’\”S BACK 1234567890″

}’

Specifying different analyzers produces different tokens

It’s even possible to define a custom analyzer in an index – you’ll see this in the index configuration. Adding character mappings to a custom filter – the example used in Elastic’s documentation maps Arabic numbers to their European counterparts – might be a useful tool in our implementation. One of the examples is turning ASCII emoticons into emotional descriptors (_happy_, _sad_, _crying_, _raspberry_, etc) that would be useful in analyzing customer communications. In log processing, we might want to map phrases into commonly used abbreviations (not a real-world example, but if programmatic input spelled out “self-contained breathing apparatus”, I expect most people would still search for SCBA if they wanted to see how frequently SCBA tanks were used for call-outs). It will be interesting to see how frequently programmatic input doesn’t line up with user expectations to see if character mappings will be beneficial.

In addition to testing individual analyzers, you can test the analyzer associated to an index – instead of using the /_analyze endpoint, use the /indexname/_analyze endpoint.

 

Resetting Lost/Forgotten ElasticSearch Admin Passwords

There are a few ways to reset the password on an individual account … but they require you to have a known password. But what about when you don’t have any good passwords? (You might be able to read your kibana.yml and get a known good password, so that would be a good place to check). Provided you have OS access, just create another superuser account using the elasticsearch-users binary:

/usr/share/elasticsearch/bin/elasticsearch-users useradd ljradmin -p S0m3pA5sw0Rd -r superuser

You can then use curl to the ElasticSearch API to reset the elastic account password

curl -s --user ljradmin:S0m3pA5sw0Rd -XPUT "http://127.0.0.1:9200/_xpack/security/user/elastic/_password" -H 'Content-Type: application/json' -d'
{
"password" : "N3wPa5sw0Rd4ElasticU53r"
}
'

 

ElasticSearch ILM – Data Lifecycle

The following defines a simple data lifecycle policy we use for event log data.

Immediately, the data is in the “hot” phase.

After one day, it is moved to the “warm” phase where the number of segments is compressed to 1 (lots-o-segments are good for writing, but since we’re dealing with timescale stats & log data [i.e. something that’s not being written to the next day], there is no need to optimize write performance. The index will be read only, thus can be optimized for read performance). After seven days, the index is frozen (mostly moved out of memory) as in this use case, data generally isn’t used after a week. Thus, there is no need to fill up the server’s memory to speed up access to unused data elements. Since freeze is deprecated in a future version (due to improvements in memory utilization that should obsolete freezing indices), we’ll need to watch our memory usage after upgrading to ES8.

Finally, after fourteen days, the data is deleted.

To use the policy, set it as the template on an index:

Upon creating a new index (ljrlogs-5), the ILM policy has been applied:

Upgrading ElasticSearch – From 7.6 to 7.17

Before upgrading to 8, you must be running at least version 7.17 … so I am first upgrading my ES7 to a new enough version that upgrading to ES8 is possible.

Environment

Not master eligible nodes:
a6b30865c82c.example.com
a6b30865c83c.example.com

Master eligible nodes:
a6b30865c81c.example.com

 

  1. Disable shard allocation

PUT _cluster/settings{  "persistent": {    "cluster.routing.allocation.enable": "primaries"  }}

 

  1. Stop non-essential indexing and flush

POST _flush/synced

  1. Upgrade the non-master eligible nodes first then the master-eligible nodes. One at a time, SSH to the host and upgrade ES
    a. Stop ES

systemctl stop elasticsearch
b. Install the new RPM:
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.3-x86_64.rpm
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.3-x86_64.rpm.sha512
shasum -a 512 -c elasticsearch-7.17.3-x86_64.rpm.sha512


rpm -U elasticsearch-7.17.3-x86_64.rpm

c. Update configuration for new version
vi /usr/lib/tmpfiles.d/elasticsearch.conf


vi /etc/elasticsearch/elasticsearch.yml # Add the action.auto_create_index as required -- * for all, or you can restrict auto-creation to certain indices

d. Update unit file and start services
systemctl daemon-reload
systemctl enable elasticsearch
systemctl start elasticsearch.service

  1. On the Kibana server, upgrade Kibana to a matching version:systemctl stop kibana
    wget https://artifacts.elastic.co/downloads/kibana/kibana-7.17.3-x86_64.rpm
    rpm -U kibana-7.17.3-x86_64.rpm
    sytemctl daemon-reload
    systemctl enable kibana
    systemctl start kibana
  2. Access the Kibana console and ensure the upgraded node is back online

  1. Re-enable shard allocation

PUT _cluster/settings{"persistent": {"cluster.routing.allocation.enable": null }}