We have a number of logstash servers gathering data from various filebeat sources. We’ve recently experienced a problem where the pipeline stops getting data for some of those sources. Not all — and restarting the non-functional filebeat source sends data for ten minutes or so. We were able to rectify the immediate problem by restarting our logstash services (IT troubleshooting step #1 — we restarted all of the filebeats and, when that didn’t help, moved on to restarting the logstashes)
But we need to have a way to ensure this isn’t happening — losing days of log data from some sources is really bad. So I put together a Python script to verify there’s something coming in from each of the filebeat sources.
pip install elasticsearch==7.13.4
#!/usr/bin/env python3
#-*- coding: utf-8 -*-
# Disable warnings that not verifying SSL trust isn't a good idea
import requests
requests.packages.urllib3.disable_warnings()
from elasticsearch import Elasticsearch
import time
# Modules for email alerting
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
# Config variables
strSenderAddress = "devnull@example.com"
strRecipientAddress = "me@example.com"
strSMTPHostname = "mail.example.com"
iSMTPPort = 25
listSplunkRelayHosts = ['host293', 'host590', 'host591', 'host022', 'host014', 'host135']
iAgeThreashold = 3600 # Alert if last document is more than an hour old (3600 seconds)
strAlert = None
for strRelayHost in listSplunkRelayHosts:
iCurrentUnixTimestamp = time.time()
elastic_client = Elasticsearch("https://elasticsearchhost.example.com:9200", http_auth=('rouser','r0pAs5w0rD'), verify_certs=False)
query_body = {
"sort": {
"@timestamp": {
"order": "desc"
}
},
"query": {
"bool": {
"must": {
"term": {
"host.hostname": strRelayHost
}
},
"must_not": {
"term": {
"source": "/var/log/messages"
}
}
}
}
}
result = elastic_client.search(index="network_syslog*", body=query_body,size=1)
all_hits = result['hits']['hits']
iDocumentAge = None
for num, doc in enumerate(all_hits):
iDocumentAge = ( (iCurrentUnixTimestamp*1000) - doc.get('sort')[0]) / 1000.0
if iDocumentAge is not None:
if iDocumentAge > iAgeThreashold:
if strAlert is None:
strAlert = f"<tr><td>{strRelayHost}</td><td>{iDocumentAge}</td></tr>"
else:
strAlert = f"{strAlert}\n<tr><td>{strRelayHost}</td><td>{iDocumentAge}</td></tr>\n"
print(f"PROBLEM - For {strRelayHost}, document age is {iDocumentAge} second(s)")
else:
print(f"GOOD - For {strRelayHost}, document age is {iDocumentAge} second(s)")
else:
print(f"PROBLEM - For {strRelayHost}, no recent record found")
if strAlert is not None:
msg = MIMEMultipart('alternative')
msg['Subject'] = "ELK Filebeat Alert"
msg['From'] = strSenderAddress
msg['To'] = strRecipientAddress
strHTMLMessage = f"<html><body><table><tr><th>Server</th><th>Document Age</th></tr>{strAlert}</table></body></html>"
strTextMessage = strAlert
part1 = MIMEText(strTextMessage, 'plain')
part2 = MIMEText(strHTMLMessage, 'html')
msg.attach(part1)
msg.attach(part2)
s = smtplib.SMTP(strSMTPHostname)
s.sendmail(strSenderAddress, strRecipientAddress, msg.as_string())
s.quit()