We have a number of logstash servers gathering data from various filebeat sources. We’ve recently experienced a problem where the pipeline stops getting data for some of those sources. Not all — and restarting the non-functional filebeat source sends data for ten minutes or so. We were able to rectify the immediate problem by restarting our logstash services (IT troubleshooting step #1 — we restarted all of the filebeats and, when that didn’t help, moved on to restarting the logstashes)
But we need to have a way to ensure this isn’t happening — losing days of log data from some sources is really bad. So I put together a Python script to verify there’s something coming in from each of the filebeat sources.
pip install elasticsearch==7.13.4
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | #!/usr/bin/env python3 #-*- coding: utf-8 -*- # Disable warnings that not verifying SSL trust isn't a good idea import requests requests.packages.urllib3.disable_warnings() from elasticsearch import Elasticsearch import time # Modules for email alerting import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText # Config variables strSenderAddress = "devnull@example.com" strRecipientAddress = "me@example.com" strSMTPHostname = "mail.example.com" iSMTPPort = 25 listSplunkRelayHosts = [ 'host293' , 'host590' , 'host591' , 'host022' , 'host014' , 'host135' ] iAgeThreashold = 3600 # Alert if last document is more than an hour old (3600 seconds) strAlert = None for strRelayHost in listSplunkRelayHosts: iCurrentUnixTimestamp = time.time() elastic_client = Elasticsearch( "https://elasticsearchhost.example.com:9200" , http_auth = ( 'rouser' , 'r0pAs5w0rD' ), verify_certs = False ) query_body = { "sort" : { "@timestamp" : { "order" : "desc" } }, "query" : { "bool" : { "must" : { "term" : { "host.hostname" : strRelayHost } }, "must_not" : { "term" : { "source" : "/var/log/messages" } } } } } result = elastic_client.search(index = "network_syslog*" , body = query_body,size = 1 ) all_hits = result[ 'hits' ][ 'hits' ] iDocumentAge = None for num, doc in enumerate (all_hits): iDocumentAge = ( (iCurrentUnixTimestamp * 1000 ) - doc.get( 'sort' )[ 0 ]) / 1000.0 if iDocumentAge is not None : if iDocumentAge > iAgeThreashold: if strAlert is None : strAlert = f "<tr><td>{strRelayHost}</td><td>{iDocumentAge}</td></tr>" else : strAlert = f "{strAlert}\n<tr><td>{strRelayHost}</td><td>{iDocumentAge}</td></tr>\n" print (f "PROBLEM - For {strRelayHost}, document age is {iDocumentAge} second(s)" ) else : print (f "GOOD - For {strRelayHost}, document age is {iDocumentAge} second(s)" ) else : print (f "PROBLEM - For {strRelayHost}, no recent record found" ) if strAlert is not None : msg = MIMEMultipart( 'alternative' ) msg[ 'Subject' ] = "ELK Filebeat Alert" msg[ 'From' ] = strSenderAddress msg[ 'To' ] = strRecipientAddress strHTMLMessage = f "<html><body><table><tr><th>Server</th><th>Document Age</th></tr>{strAlert}</table></body></html>" strTextMessage = strAlert part1 = MIMEText(strTextMessage, 'plain' ) part2 = MIMEText(strHTMLMessage, 'html' ) msg.attach(part1) msg.attach(part2) s = smtplib.SMTP(strSMTPHostname) s.sendmail(strSenderAddress, strRecipientAddress, msg.as_string()) s.quit() |