Category: ELK

Kibana – Visualizations and Dashboards

Kibana – Creating Visualizations

General

Time Series Visualization Pipeline

Kibana – Creating a Dashboard

Kibana – Creating Visualizations

General

To create a new visualization, select the visualization icon from the left-hand navigation menu and click “Create visualization”. You’ll need to select the type of visualization you want to create.

TSVB (Time Series Visualization Builder)

The Time Series Visualization Pipeline is a GUI visualization builder to create graphs from time series data. This means the x-axis will be datetime values and the y-axis will the data you want to visualize over the time period. To create a new visualization of this type, select “TSVB” on the “New Visualization” menu.

Scroll down and select “Panel options” – here you specify the index you want to visualize. Select the field that will be used as the time for each document (e.g. if your document has a special field like eventOccuredAt, you’d select that here). I generally leave the time interval at ‘auto’ – although you might specifically want to present a daily or hourly report.

Once you have selected the index, return to the “Data” tab. First, select the type of aggregation you want to use. In this example, we are showing the number of documents for a variety of policies.

The “Group by” dropdown allows you to have chart lines for different categories (instead of just having the count of documents over the time series, which is what “Everything” produces) – to use document data to create the groupings, select “Terms”.

Select the field you want to group on – in this case, I want the count for each unique “policyname” value, so I selected “policyname.keyword” as the grouping term.

Voila – a time series chart showing how many documents are found for each policy name. Click “Save” at the top left of the chart to save the visualization.

Provide a name for the visualization, write a brief description, and click “Save”. The visualization will now be available for others to view or for inclusion in dashboards.

TimeLion

TimeLion looks like it is going away soon, but it’s what I’ve seen as the recommendation for drawing horizontal lines on charts.

This visualization type is a little cryptic – you need to enter Timelion expression — .es() retrieves data from ElasticSearch, .value(3500) draws a horizontal line at 3,500

If there is null data at a time value, TimeLion will draw a discontinuous line. You can modify this behavior by specifying a fit function.

Note that you’ll need to click “Update” to update the chart before you are able to save the visualization.

Vega

Vega is an experimental visualization type.

This is, by far, the most flexible but most complex approach to creating a visualization. I’ve used it to create the Sankey visualization showing the source and destination countries from our firewall logs. Both Vega and Vega-Lite grammars can be used. ElasticCo provides a getting started guide, and there are many example online that you can use as the basis for your visualization.

Kibana – Creating a Dashboard

To create a dashboard, select the “Dashboards” icon on the left-hand navigation bar. Click “Create dashboard”

Click “Add an existing” to add existing visualizations to the dashboard.

Select the dashboards you want added, then click “Save” to save your dashboard.

Provide a name and brief description, then click “Save”.

 

Kibana Sankey Visualization

Now that we’ve got a lot of data being ingested into our ELK platform, I am beginning to build out visualizations and dashboards. This Vega visualization (source below) shows the number of connections between source and destination countries.

{ 
  $schema: https://vega.github.io/schema/vega/v3.0.json
  data: [
    {
      // Respect currently selected time range and filter string
      name: rawData
      url: {
        %context%: true
        %timefield%: @timestamp
        index: firewall_logs*
        body: {
          size: 0
          aggs: {
            table: {
              composite: {
                size: 10000
                sources: [
                  {
                    source_country: {
                     terms: {field: "srccountry.keyword"}
                    }
                  }
                  {
                    dest_country: {
                     terms: {field: "dstcountry.keyword"}
                    }
                  }
                ]
              }
            }
          }
        }
      }
      format: {property: "aggregations.table.buckets"}
      // Add aliases for data.* elements
      transform: [
        {type: "formula", expr: "datum.key.source_country", as: "source_country"}
        {type: "formula", expr: "datum.key.dest_country", as: "dest_country"}
        {type: "formula", expr: "datum.doc_count", as: "size"}
      ]
    }
    {
      name: nodes
      source: rawData
      transform: [
        // Filter to selected country
        {
          type: filter
          expr: !groupSelector || groupSelector.source_country == datum.source_country || groupSelector.dest_country == datum.dest_country
        }
        {type: "formula", expr: "datum.source_country+datum.dest_country", as: "key"}
        {
          type: fold
          fields: ["source_country", "dest_country"]
          as: ["stack", "grpId"]
        }
        {
          type: formula
          expr: datum.stack == 'source_country' ? datum.source_country+' '+datum.dest_country : datum.dest_country+' '+datum.source_country
          as: sortField
        }
        {
          type: stack
          groupby: ["stack"]
          sort: {field: "sortField", order: "descending"}
          field: size
        }
        {type: "formula", expr: "(datum.y0+datum.y1)/2", as: "yc"}
      ]
    }
    {
      name: groups
      source: nodes
      transform: [
        // Aggregate country groups and include number of documents for each grouping
        {
          type: aggregate
          groupby: ["stack", "grpId"]
          fields: ["size"]
          ops: ["sum"]
          as: ["total"]
        }
        {
          type: stack
          groupby: ["stack"]
          sort: {field: "grpId", order: "descending"}
          field: total
        }
        {type: "formula", expr: "scale('y', datum.y0)", as: "scaledY0"}
        {type: "formula", expr: "scale('y', datum.y1)", as: "scaledY1"}
        {type: "formula", expr: "datum.stack == 'source_country'", as: "rightLabel"}
        {
          type: formula
          expr: datum.total/domain('y')[1]
          as: percentage
        }
      ]
    }
    {
      name: destinationNodes
      source: nodes
      transform: [
        {type: "filter", expr: "datum.stack == 'dest_country'"}
      ]
    }
    {
      name: edges
      source: nodes
      transform: [
        {type: "filter", expr: "datum.stack == 'source_country'"}
        {
          type: lookup
          from: destinationNodes
          key: key
          fields: ["key"]
          as: ["target"]
        }
        {
          type: linkpath
          orient: horizontal
          shape: diagonal
          sourceY: {expr: "scale('y', datum.yc)"}
          sourceX: {expr: "scale('x', 'source_country') + bandwidth('x')"}
          targetY: {expr: "scale('y', datum.target.yc)"}
          targetX: {expr: "scale('x', 'dest_country')"}
        }
        // Calculation to determine line thickness
        {
          type: formula
          expr: range('y')[0]-scale('y', datum.size)
          as: strokeWidth
        }
        {
          type: formula
          expr: datum.size/domain('y')[1]
          as: percentage
        }
      ]
    }
  ]
  scales: [
    {
      name: x
      type: band
      range: width
      domain: ["source_country", "dest_country"]
      paddingOuter: 0.05
      paddingInner: 0.95
    }
    {
      name: y
      type: linear
      range: height
      domain: {data: "nodes", field: "y1"}
    }
    {
      name: color
      type: ordinal
      range: category
      domain: {data: "rawData", fields: ["source_country", "dest_country"]}
    }
    {
      name: stackNames
      type: ordinal
      range: ["Source Country", "Destination Country"]
      domain: ["source_country", "dest_country"]
    }
  ]
  axes: [
    {
      orient: bottom
      scale: x
      encode: {
        labels: {
          update: {
            text: {scale: "stackNames", field: "value"}
          }
        }
      }
    }
    {orient: "left", scale: "y"}
  ]
  marks: [
    {
      type: path
      name: edgeMark
      from: {data: "edges"}
      clip: true
      encode: {
        update: {
          stroke: [
            {
              test: groupSelector && groupSelector.stack=='source_country'
              scale: color
              field: dest_country
            }
            {scale: "color", field: "source_country"}
          ]
          strokeWidth: {field: "strokeWidth"}
          path: {field: "path"}
          strokeOpacity: {
            signal: !groupSelector && (groupHover.source_country == datum.source_country || groupHover.dest_country == datum.dest_country) ? 0.9 : 0.3
          }
          zindex: {
            signal: !groupSelector && (groupHover.source_country == datum.source_country || groupHover.dest_country == datum.dest_country) ? 1 : 0
          }
          tooltip: {
            signal: datum.source_country + ' → ' + datum.dest_country + '    ' + format(datum.size, ',.0f') + '   (' + format(datum.percentage, '.1%') + ')'
          }
        }
        hover: {
          strokeOpacity: {value: 1}
        }
      }
    }
    {
      type: rect
      name: groupMark
      from: {data: "groups"}
      encode: {
        enter: {
          fill: {scale: "color", field: "grpId"}
          width: {scale: "x", band: 1}
        }
        update: {
          x: {scale: "x", field: "stack"}
          y: {field: "scaledY0"}
          y2: {field: "scaledY1"}
          fillOpacity: {value: 0.6}
          tooltip: {
            signal: datum.grpId + '   ' + format(datum.total, ',.0f') + '   (' + format(datum.percentage, '.1%') + ')'
          }
        }
        hover: {
          fillOpacity: {value: 1}
        }
      }
    }
    {
      type: text
      from: {data: "groups"}
      interactive: false
      encode: {
        update: {
          x: {
            signal: scale('x', datum.stack) + (datum.rightLabel ? bandwidth('x') + 8 : -8)
          }
          yc: {signal: "(datum.scaledY0 + datum.scaledY1)/2"}
          align: {signal: "datum.rightLabel ? 'left' : 'right'"}
          baseline: {value: "middle"}
          fontWeight: {value: "bold"}
          // Do not show labels on smaller items
          text: {signal: "abs(datum.scaledY0-datum.scaledY1) > 13 ? datum.grpId : ''"}
        }
      }
    }
    {
      type: group
      data: [
        {
          name: dataForShowAll
          values: [{}]
          transform: [{type: "filter", expr: "groupSelector"}]
        }
      ]
      // Set button size and positioning
      encode: {
        enter: {
          xc: {signal: "width/2"}
          y: {value: 30}
          width: {value: 80}
          height: {value: 30}
        }
      }
      marks: [
        {
          type: group
          name: groupReset
          from: {data: "dataForShowAll"}
          encode: {
            enter: {
              cornerRadius: {value: 6}
              fill: {value: "#f5f5f5"}
              stroke: {value: "#c1c1c1"}
              strokeWidth: {value: 2}
              // use parent group's size
              height: {
                field: {group: "height"}
              }
              width: {
                field: {group: "width"}
              }
            }
            update: {
              opacity: {value: 1}
            }
            hover: {
              opacity: {value: 0.7}
            }
          }
          marks: [
            {
              type: text
              interactive: false
              encode: {
                enter: {
                  xc: {
                    field: {group: "width"}
                    mult: 0.5
                  }
                  yc: {
                    field: {group: "height"}
                    mult: 0.5
                    offset: 2
                  }
                  align: {value: "center"}
                  baseline: {value: "middle"}
                  fontWeight: {value: "bold"}
                  text: {value: "Show All"}
                }
              }
            }
          ]
        }
      ]
    }
  ]
  signals: [
    {
      name: groupHover
      value: {}
      on: [
        {
          events: @groupMark:mouseover
          update: "{source_country:datum.stack=='source_country' && datum.grpId, dest_country:datum.stack=='dest_country' && datum.grpId}"
        }
        {events: "mouseout", update: "{}"}
      ]
    }
    {
      name: groupSelector
      value: false
      on: [
        {
          events: @groupMark:click!
          update: "{stack:datum.stack, source_country:datum.stack=='source_country' && datum.grpId, dest_country:datum.stack=='dest_country' && datum.grpId}"
        }
        {
          events: [
            {type: "click", markname: "groupReset"}
            {type: "dblclick"}
          ]
          update: "false"
        }
      ]
    }
  ]
}

Logstash, JRuby, and Private Temp

There’s a long-standing bug in logstash where the private temp folder created for jruby isn’t cleaned up when the logstash process exits. To avoid filling up the temp disk space, I put together a quick script to check the PID associated with each jruby temp folder, see if it’s an active process, and remove the temp folder if the associated process doesn’t exist.

When the PID has been re-used, this means we’ve got an extra /tmp/jruby-### folder hanging about … but each folder is only 10 meg. The impacting issue is when we’ve restarted logstash a thousand times and a thousand ten meg folders are hanging about.

This script can be cron’d to run periodically or it can be run when the logstash service launches.

import subprocess
import re

from shutil import rmtree

strResult = subprocess.check_output(f"ls /tmp", shell=True)

for strLine in strResult.decode("utf-8").split('\n'):
        if len(strLine) > 0 and strLine.startswith("jruby-"):
                listSplitFileNames = re.split("jruby-([0-9]*)", strLine)
                if listSplitFileNames[1] is not None:
                        try:
                                strCheckPID = subprocess.check_output(f"ps -efww |  grep {listSplitFileNames[1]} | grep -v grep", shell=True)
                                #print(f"PID check result is {strCheckPID}")
                        except:
                                print(f"I am deleting |{strLine}|")
                                rmtree(f"/tmp/{strLine}")

Logstash

General Info

Logstash is a pipeline based data processing service. Data comes into logstash, is manipulated, and is sent elsewhere. The source is maintained on GitHub by ElasticCo.

Installation

Logstash was downloaded from ElasticCo and installed from a gzipped tar archive to the /opt/elk/logstash folder.

Configuration

The Logstash server is configured using the logstash.yml file.

Logstash uses Log4J 2 for logging. Logging configuration is maintained in the log4j2.properties file

Logstash is java-based, and the JVM settings are maintained in the jvm.options file – this includes min heap space, garbage collection configuration, JRuby settings, etc.

Logstash loads the pipelines defined in /opt/elk/logstash/config/pipelines.yml – each pipeline needs an ID and a path to its configuration. The path can be to a config file or to a folder of config files for the pipeline. The number of workers for the pipeline defaults to the number of CPUs, so we normally define a worker count as well – this can be increased as load dictates.

– pipeline.id: LJR
pipeline.workers: 2
path.config: “/opt/elk/logstash/config/ljr.conf”

Each pipeline is configured in an individual config file that defines the input, any data manipulation to be performed, and the output.

Testing Configuration

As we have it configured, you must reload Logstash to implement any configuration changes. As errors in pipeline definitions will prevent the pipeline from loading, it is best to test the configuration prior to restarting Logstash.

/opt/elk/logstash/bin/logstash –config.test_and_exit -f ljr_firewall_logs_wip.conf

Some errors may occur – if the test ends with “Configuration OK”, then it’s OK!

Automatic Config Reload

The configuration can automatically be reloaded when changes to config files are detected. This doesn’t give you the opportunity to test a configuration prior to it going live on the server (once it’s saved, it will be loaded … or not loaded if there’s an error)

Input

Input instructs logstash about what format data the pipeline will receive – is JSON data being sent to the pipeline, is syslog sending log data to the pipeline, or does data come from STDIN? The types of data that can be received are defined by the input plugins. Each input has its own configuration parameters. We use Beats, syslog, JSON (a codec, not a filter type), Kafka, stuff

The input configuration also indicates which port to use for the pipeline – this needs to be unique!

Input for a pipeline on port 5055 receiving JSON formatted data

Input for a pipeline on port 5100 (both TCP and UDP) receiving syslog data

Output

Output is similarly simple – various output plugins define the systems to which data can be shipped. Each output has its own configuration parameters – ElasticSearch, Kafka, and file are the three output plug-ins we currently use.

ElasticSearch

Most of the data we ingest into logstash is processed and sent to ElasticSearch. The data is indexed and available to users through ES and Kibana.

Kafka

Some data is sent to Kafka basically as a holding queue. It is then picked up by the “aggregation” logstash server, processed some more, and relayed to the ElasticSearch system.

File

File output is generally used for debugging – seeing the output data allows you to verify your data manipulations are working property (as well as just make sure you see data transiting the pipeline without resorting to tcpdump!).

Filter

Filtering allows data to be removed, attributes to be added to records, and parses data into fields. The types of filters that can be applied are defined by the filter plugins. Each plugin has its own documentation. Most of our data streams are filtered using Grok – see below for more details on that.

Conditional rules can be used in filters. This example filters out messages that contain the string “FIREWALL”, “id=firewall”, or “FIREWALL_VRF” as the business need does not require these messages, so there’s no reason to waste disk space and I/O processing, indexing, and storing these messages.

This example adds a field, ‘sourcetype’, with a value that is based on the log file path.

Grok

The grok filter is a Logstash plugin that is used to extract data from log records – this allows us to pull important information into distinct fields within the ElasticSearch record. Instead of having the full message in the ‘message’ field, you can have success/failure in its own field, the logon user in its own field, or the source IP in its own field. This allows more robust reporting. If the use case just wants to store data, parsing the record may not be required. But, if they want to report on the number of users logged in per hour or how much data is sent to each IP address, we need to have the relevant fields available in the document.

Patterns used by the grok filter are maintained in a Git repository – the grok-patterns contains the data types like ‘DATA’ in %{DATA:fieldname}

The following are the ones I’ve used most frequently:

Name Field Type Pattern Notes Notes
DATA Text data .*? This does not expand to the most matching characters – so looking for foo.*?bar in “foobar is not really a word, but foobar gets used a lot in IT documentation” will only match the bold portion of the text
GREEDYDATA Text data .* Whereas this matches the most matching characters – so foo.*bar in “foobar is not really a word, but foobar gets used a lot in IT documentation” matches the whole bold portion of the text
IPV4 IPv4 address
IPV6 IPv6 address
IP IP address – either v4 or v6 (?:%{IPV6}|%{IPV4}) This provides some flexibility as groups move to IPv6 – but it’s a more complex filter, so I’ve been using IPV4 with the understanding that we may need to adjust some parsing rules in the future
LOGLEVEL Text data Regex to match list of standard log level strings – provides data validation over using DATA (i.e. if someone sets their log level to “superawful”, it won’t match)
SYSLOGBASE Text data This matches the standard start of a syslog record. Often used as “%{SYSLOGBASE} %{GREEDYDATA:msgtext}” to parse out the timestamp, facility, host, and program – the remainder of the text is mapped to “msgtext”
URI Text data protocol://stuff text is parsed into the protocol, user, host, path, and query parameters
INT Numeric data (?:[+-]?(?:[0-9]+)) Signed or unsigned integer
NUMBER Numeric data Can include a casting like %{NUMBER:fieldname;int} or %{NUMBER:fieldname;float}
TIMESTAMP_ISO8601 DateTime %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}? There are various other date patterns depending on how the string will be formatted. This is the one that matches YYYYMMDDThh:mm:ss

Parsing an entire log string

In a system with a set format for log data, parsing the entire line is reasonable – and, often, there will be a filter for well-known log types. I.E. if you are using the default Apache HTTPD log format, you don’t need to write a filter for each component of the log line – just match either the HTTPD_COMBINEDLOG or HTTPD_COMMONLOG pattern.

match => { “message” => “%{HTTPD_COMMONLOG}” }

But you can create your own filter as well – internally developed applications and less common vendor applications won’t have prebuilt filter rules.

match => { “message” => “%{TIMESTAMP_ISO8601:logtime} – %{IPV4:srcip} – %{IPV4:dstip} – %{DATA:result}” }

Extracting an array of data

Instead of trying to map an entire line at once, you can extract individual data elements by matching an array of patterns within the message.

match => { “message” => [“srcip=%{IPV4:src_ip}”
, “srcport=%{NUMBER:srcport:int}”
,”dstip=%{IPV4:dst_ip}”
,”dstport=%{NUMBER:dstport:int}”] }

This means the IP and port information will be extracted regardless of the order in which the fields are written in the log record. This also allows you to parse data out of log records where multiple different formats are used (as an example, the NSS Firewall logs) instead of trying to write different parsers for each of the possible string combinations.

Logstash, by default, breaks when a match is found. This means you can ‘stack’ different filters instead of using if tests. Sometimes, though, you don’t want to break when a match is found – maybe you are extracting a bit of data that gets used in another match. In these cases, you can set break_on_match to ‘false’ in the grok rule.

I have also had to set break_on_match when extracting an array of values from a message.

Troubleshooting

Log Files

Logstash logs output to /opt/elk/logstash/logs/logstash-plain.log – the logging level is defined in the /opt/elk/logstash/config/log4j2.properties configuration file.

Viewing Data Transmitted to a Pipeline

There are several ways to confirm that data is being received by a pipeline – tcpdump can be used to verify information is being received on the port. If no data is being received, the port may be offline (if there is an error in the pipeline config, the pipeline will not load – grep /opt/elk/logstash/logs/logstash-plain.log for the pipeline name to view errors), there may be a firewall preventing communication, or the sender could not be transmitting data.

tcpdump dst port 5100 -vv

If data is confirmed to be coming into the pipeline port, add a “file” output filter to the pipeline.

Issues

Data from filebeat servers not received in ElasticSearch

We have encountered a scenario were data from the filebeat servers was not being transmitted to ElasticSearch. Monitoring the filebeat server did not show any data being sent. Restarting the Logstash servers allowed data to be transmitted as expected.

 

Simulating Syslog Data

After creating a syslog pipeline, it is convenient to be able to test that data is being received and parsed as expected. You can use the logger utility (from the util-linux package) using “-n” to specify the target server, -P to specify the target port, either -d for udp or -T for tcp, -i with the process name, -p with the log priority, and the message content in quotes.

As an example, this command sends a sample log record to the logstash server. If the pipeline is working properly, the document will appear in ElasticSearch.

logger -n logstash.example.com -P 5101 -d -i ljrtest -p user.notice '<date=2022-06-22 time=09:09:28 devname="fcd01" \
      devid="AB123DEF45601874" eventtime=1655914168555429048 tz="-0700" logid="0001000014" type="traffic" subtype="local" \ 
      level="notice" vd="EXAMPLE-CORP" srcip=10.4.5.10 srcport=56317 srcintf="VLAN1" srcintfrole="wan" dstip=10.2.3.212 \ 
      dstport=61234 dstintf="EXAMPLE-CORP" dstintfrole="undefined" srccountry="United States" dstcountry="United States" sessionid=3322792 \ 
      proto=6 action="deny" policyid=0 policytype="local-in-policy" service="tcp/61234" trandisp="noop" app="tcp/61234" duration=0 \ 
      sentbyte=0 rcvdbyte=0 sentpkt=0 rcvdpkt=0'

ElasticSearch – Deleting Documents by Criterion

We have an index that was created without a lifecycle policy — and it’s taking up about 300GB of our 1.5T on the dev server. I don’t want to delete it — mostly because I don’t know why it’s there. But cleaning up old data seemed like a

POST /metricbeat_kafka-/_delete_by_query
{
  "query": {
    "range" : {
        "@timestamp" : {
            "lte" : "2021-02-04T01:47:44.880Z"
        }
    }
  }
}

ElasticSearch Search API – Script Fields

I’ve been playing around with script fields to manipulate data returned by ElasticSearch queries. As an example, data where there are a few nested objects with values that need to be multiplied together:

{
	"order": {
		"item1": {
			"cost": 31.55,
			"count": 111
		},
		"item2": {
			"cost": 62.55,
			"count": 222
		},
		"item3": {
			"cost": 93.55,
			"count": 333
		}
	}
}

And to retrieve records and multiply cost by count:

{
  "query"  : { "match_all" : {} },
	"_source": ["order.item*.item", "order.item*.count", "order.item*.cost"],
  "script_fields" : {
    "total_cost_1" : {
      "script" : 
      {
        "lang": "painless",
        "source": "return doc['order.item1.cost'].value * doc['order.item1.count'].value;"
      }
    },
    "total_cost_2" : {
      "script" : 
      {
        "lang": "painless",
        "source": "return doc['order.item2.cost'].value * doc['order.item2.count'].value;"
      }
    },
    "total_cost_3" : {
      "script" : 
      {
        "lang": "painless",
        "source": "return doc['order.item3.cost'].value * doc['order.item3.count'].value;"
      }
    }    
  }
}

Unfortunately, I cannot find any way to iterate across an arbitrary number of item# objects nested in the order object. So, for now, I think the data manipulation will be done in the program using the API to retrieve data. Still, it was good to learn how to address values in the doc record.

Upgrading Logstash

The process to upgrade minor releases of LogStash is quite simple — stop service, drop the binaries in place, and start service. In this case, my upgrade process is slightly complicated by the fact our binaries aren’t installed to the “normal” location from the RPM. I am upgrading from 7.7.0 => 7.17.4

The first step is, obviously, to download the LogStash release you want – in this case, it is 7.17.4 as upgrading across major releases is not supported.

 

cd /tmp
mkdir logstash
mv logstash-7.17.4-x86_64.rpm ./logstash

cd /tmp/logstash
rpm2cpio logstash-7.17.4-x86_64.rpm | cpio -idmv

systemctl stop logstash
mv /opt/elk/logstash /opt/elk/logstash-7.7.0
mv /tmp/logstash/usr/share/logstash /opt/elk/
mkdir /var/log/logstash
mkdir /var/lib/logstash

mv /tmp/logstash/etc/logstash /etc/logstash
cd /etc/logstash
mkdir rpmnew
mv jvm.options ./rpmnew/
mv log* ./rpmnew/
mv pipelines.yml ./rpmnew/
mv startup.options ./rpmnew/
cp -r /opt/elk/logstash-7.7.0/config/* ./

ln -s /opt/elk/logstash /usr/share/logstash
ln -s /etc/logstash /opt/elk/logstash/config

chown -R elasticsearch:elasticsearch /opt/elk/logstash
chown -R elasticsearch:elasticsearch /var/log/logstash
chown -R elasticsearch:elasticsearch /var/lib/logstash
chown -R elasticsearch:elasticsearch /etc/logstash

systemctl start logstash
systemctl status logstash
/opt/elk/logstash/bin/logstash --version

Using FileBeat to Send Data to ElasticSearch via Logstash

Before sending data, you need a pipleline on logstash to accept the data. If you are using an existing pipeline, you just need the proper host and port for the pipeline to use in the Filebeat configuration. If you need a new pipeline, the input needs to be of type ‘beats’

# Sample Pipeline Config:
input {
  beats   {
    host => "logstashserver.example.com"
    port => 5057
    client_inactivity_timeout => "3000"
  }
}

filter {
  grok{
     match => {"message"=>"\[%{TIMESTAMP_ISO8601:timestamp}] %{DATA:LOGLEVEL} \[Log partition\=%{DATA:LOGPARTITION}, dir\=%{DATA:KAFKADIR}\] %{DATA:MESSAGE} \(%{DATA:LOGSOURCE}\)"}
  }
}

output {
  elasticsearch {
    action => "index"
    hosts => ["https://eshost.example.com:9200"]
    ssl => true
    cacert => ["/path/to/certs/CA_Chain.pem"]
    ssl_certificate_verification => true
    user =>"us3r1d"
    password => "p@s5w0rd"
    index => "ljrkafka-%{+YYYY.MM.dd}"
  }
}

 

Download the appropriate version from https://www.elastic.co/downloads/past-releases#filebeat – I am currently using 7.17.4 as we have a few CentOS + servers.

Install the package (rpm -ihv filebeat-7.17.4-x86_64.rpm) – the installation package places the configuration files in /etc/filebeat and the binaries and other “stuff” in /usr/share/filebeat

Edit /etc/filebeat/filebeat.yml

    • Add inputs for log paths you want to monitor (this may be done under the module config if using a module config instead)
    • Add an output for Logstash to the appropriate port for your pipeline:
      output.logstash:
      hosts: [“logstashhost.example.com:5055”]

Run filebeat in debug mode from the command line and watch for success or failure.
filebeat -e -c /etc/filebeat/filebeat.yml -d "*"

Assuming everything is running well, use systemctl start filebeat to run the service and systemctl enable filebeat to set it to launch on boot.

Filebeats will attempt to parse the log data and send a JSON object to the LogStash server. When you view the record in Kibana, you should see any fields parsed out with your grok rule – in this case, we have KAFKADIR, LOGLEVEL, LOGPARTITION, LOGSOURCE, and MESSAGE fields.

Using Logstash to Send Data to ElasticSearch

Create a logstash pipeline

  1. The quickest thing to do is copy the config of a similar use case and adjusted the pipeline port (and adjusted the ES destination index). But, if this is a unique scenario, build a new pipeline configuration. I am creating a TCP listener that receives data from Python using the python-logstash module. In this configuration, logstash will create the index as needed with YYYY-MM-dd appended to the base index name.
    Text

Description automatically generated
  2. Edit the pipelines.yml to register the config you just created
  3. Restart logstash to activate the new pipeline
  4. Use netstat -nap | grep `pidof java` to ensure the server is listening on the new port
  5. Add the port to the runtime firewalld rules and test that the port is functional (firewall-cmd –zone=public –add-port=5055/tcp)
  6. Assuming the runtime rule has not had any unexpected results, register a permanent firewalld rule (firewall-cmd –permanent –zone=public –add-port=5055/tcp)

We now have a logstash data collector ready. We next need to create the index templates in ES

  1. Log into Kibana
  2. Create an ILM policy – this policy rolls indices into the warm phase after 2 days and forces merge. It also deletes records after 20 days.
    { “policy”: { “phases”: { “hot”: { “min_age”: “0ms”, “actions”: { “set_priority”: { “priority”: 100 } } }, “warm”: { “min_age”: “2d”, “actions”: { “forcemerge”: { “max_num_segments”: 1 }, “set_priority”: { “priority”: 50 } } }, “delete”: { “min_age”: “20d”, “actions”: { “delete”: {} } } } } }
  3. Create an index template — define the number of replicas
  4. Send data through the pipeline – the index will get created per the template definitions and document(s) added to the index