Kibana Sankey Visualization

Now that we’ve got a lot of data being ingested into our ELK platform, I am beginning to build out visualizations and dashboards. This Vega visualization (source below) shows the number of connections between source and destination countries.

{ 
  $schema: https://vega.github.io/schema/vega/v3.0.json
  data: [
    {
      // Respect currently selected time range and filter string
      name: rawData
      url: {
        %context%: true
        %timefield%: @timestamp
        index: firewall_logs*
        body: {
          size: 0
          aggs: {
            table: {
              composite: {
                size: 10000
                sources: [
                  {
                    source_country: {
                     terms: {field: "srccountry.keyword"}
                    }
                  }
                  {
                    dest_country: {
                     terms: {field: "dstcountry.keyword"}
                    }
                  }
                ]
              }
            }
          }
        }
      }
      format: {property: "aggregations.table.buckets"}
      // Add aliases for data.* elements
      transform: [
        {type: "formula", expr: "datum.key.source_country", as: "source_country"}
        {type: "formula", expr: "datum.key.dest_country", as: "dest_country"}
        {type: "formula", expr: "datum.doc_count", as: "size"}
      ]
    }
    {
      name: nodes
      source: rawData
      transform: [
        // Filter to selected country
        {
          type: filter
          expr: !groupSelector || groupSelector.source_country == datum.source_country || groupSelector.dest_country == datum.dest_country
        }
        {type: "formula", expr: "datum.source_country+datum.dest_country", as: "key"}
        {
          type: fold
          fields: ["source_country", "dest_country"]
          as: ["stack", "grpId"]
        }
        {
          type: formula
          expr: datum.stack == 'source_country' ? datum.source_country+' '+datum.dest_country : datum.dest_country+' '+datum.source_country
          as: sortField
        }
        {
          type: stack
          groupby: ["stack"]
          sort: {field: "sortField", order: "descending"}
          field: size
        }
        {type: "formula", expr: "(datum.y0+datum.y1)/2", as: "yc"}
      ]
    }
    {
      name: groups
      source: nodes
      transform: [
        // Aggregate country groups and include number of documents for each grouping
        {
          type: aggregate
          groupby: ["stack", "grpId"]
          fields: ["size"]
          ops: ["sum"]
          as: ["total"]
        }
        {
          type: stack
          groupby: ["stack"]
          sort: {field: "grpId", order: "descending"}
          field: total
        }
        {type: "formula", expr: "scale('y', datum.y0)", as: "scaledY0"}
        {type: "formula", expr: "scale('y', datum.y1)", as: "scaledY1"}
        {type: "formula", expr: "datum.stack == 'source_country'", as: "rightLabel"}
        {
          type: formula
          expr: datum.total/domain('y')[1]
          as: percentage
        }
      ]
    }
    {
      name: destinationNodes
      source: nodes
      transform: [
        {type: "filter", expr: "datum.stack == 'dest_country'"}
      ]
    }
    {
      name: edges
      source: nodes
      transform: [
        {type: "filter", expr: "datum.stack == 'source_country'"}
        {
          type: lookup
          from: destinationNodes
          key: key
          fields: ["key"]
          as: ["target"]
        }
        {
          type: linkpath
          orient: horizontal
          shape: diagonal
          sourceY: {expr: "scale('y', datum.yc)"}
          sourceX: {expr: "scale('x', 'source_country') + bandwidth('x')"}
          targetY: {expr: "scale('y', datum.target.yc)"}
          targetX: {expr: "scale('x', 'dest_country')"}
        }
        // Calculation to determine line thickness
        {
          type: formula
          expr: range('y')[0]-scale('y', datum.size)
          as: strokeWidth
        }
        {
          type: formula
          expr: datum.size/domain('y')[1]
          as: percentage
        }
      ]
    }
  ]
  scales: [
    {
      name: x
      type: band
      range: width
      domain: ["source_country", "dest_country"]
      paddingOuter: 0.05
      paddingInner: 0.95
    }
    {
      name: y
      type: linear
      range: height
      domain: {data: "nodes", field: "y1"}
    }
    {
      name: color
      type: ordinal
      range: category
      domain: {data: "rawData", fields: ["source_country", "dest_country"]}
    }
    {
      name: stackNames
      type: ordinal
      range: ["Source Country", "Destination Country"]
      domain: ["source_country", "dest_country"]
    }
  ]
  axes: [
    {
      orient: bottom
      scale: x
      encode: {
        labels: {
          update: {
            text: {scale: "stackNames", field: "value"}
          }
        }
      }
    }
    {orient: "left", scale: "y"}
  ]
  marks: [
    {
      type: path
      name: edgeMark
      from: {data: "edges"}
      clip: true
      encode: {
        update: {
          stroke: [
            {
              test: groupSelector && groupSelector.stack=='source_country'
              scale: color
              field: dest_country
            }
            {scale: "color", field: "source_country"}
          ]
          strokeWidth: {field: "strokeWidth"}
          path: {field: "path"}
          strokeOpacity: {
            signal: !groupSelector && (groupHover.source_country == datum.source_country || groupHover.dest_country == datum.dest_country) ? 0.9 : 0.3
          }
          zindex: {
            signal: !groupSelector && (groupHover.source_country == datum.source_country || groupHover.dest_country == datum.dest_country) ? 1 : 0
          }
          tooltip: {
            signal: datum.source_country + ' → ' + datum.dest_country + '    ' + format(datum.size, ',.0f') + '   (' + format(datum.percentage, '.1%') + ')'
          }
        }
        hover: {
          strokeOpacity: {value: 1}
        }
      }
    }
    {
      type: rect
      name: groupMark
      from: {data: "groups"}
      encode: {
        enter: {
          fill: {scale: "color", field: "grpId"}
          width: {scale: "x", band: 1}
        }
        update: {
          x: {scale: "x", field: "stack"}
          y: {field: "scaledY0"}
          y2: {field: "scaledY1"}
          fillOpacity: {value: 0.6}
          tooltip: {
            signal: datum.grpId + '   ' + format(datum.total, ',.0f') + '   (' + format(datum.percentage, '.1%') + ')'
          }
        }
        hover: {
          fillOpacity: {value: 1}
        }
      }
    }
    {
      type: text
      from: {data: "groups"}
      interactive: false
      encode: {
        update: {
          x: {
            signal: scale('x', datum.stack) + (datum.rightLabel ? bandwidth('x') + 8 : -8)
          }
          yc: {signal: "(datum.scaledY0 + datum.scaledY1)/2"}
          align: {signal: "datum.rightLabel ? 'left' : 'right'"}
          baseline: {value: "middle"}
          fontWeight: {value: "bold"}
          // Do not show labels on smaller items
          text: {signal: "abs(datum.scaledY0-datum.scaledY1) > 13 ? datum.grpId : ''"}
        }
      }
    }
    {
      type: group
      data: [
        {
          name: dataForShowAll
          values: [{}]
          transform: [{type: "filter", expr: "groupSelector"}]
        }
      ]
      // Set button size and positioning
      encode: {
        enter: {
          xc: {signal: "width/2"}
          y: {value: 30}
          width: {value: 80}
          height: {value: 30}
        }
      }
      marks: [
        {
          type: group
          name: groupReset
          from: {data: "dataForShowAll"}
          encode: {
            enter: {
              cornerRadius: {value: 6}
              fill: {value: "#f5f5f5"}
              stroke: {value: "#c1c1c1"}
              strokeWidth: {value: 2}
              // use parent group's size
              height: {
                field: {group: "height"}
              }
              width: {
                field: {group: "width"}
              }
            }
            update: {
              opacity: {value: 1}
            }
            hover: {
              opacity: {value: 0.7}
            }
          }
          marks: [
            {
              type: text
              interactive: false
              encode: {
                enter: {
                  xc: {
                    field: {group: "width"}
                    mult: 0.5
                  }
                  yc: {
                    field: {group: "height"}
                    mult: 0.5
                    offset: 2
                  }
                  align: {value: "center"}
                  baseline: {value: "middle"}
                  fontWeight: {value: "bold"}
                  text: {value: "Show All"}
                }
              }
            }
          ]
        }
      ]
    }
  ]
  signals: [
    {
      name: groupHover
      value: {}
      on: [
        {
          events: @groupMark:mouseover
          update: "{source_country:datum.stack=='source_country' && datum.grpId, dest_country:datum.stack=='dest_country' && datum.grpId}"
        }
        {events: "mouseout", update: "{}"}
      ]
    }
    {
      name: groupSelector
      value: false
      on: [
        {
          events: @groupMark:click!
          update: "{stack:datum.stack, source_country:datum.stack=='source_country' && datum.grpId, dest_country:datum.stack=='dest_country' && datum.grpId}"
        }
        {
          events: [
            {type: "click", markname: "groupReset"}
            {type: "dblclick"}
          ]
          update: "false"
        }
      ]
    }
  ]
}

Did you know … you can chat with yourself in Teams?

I’ll admit it — I send myself emails. And text messages. And, back before smart phones, I sent myself voice messages too. That was one item where Teams was as step backwards — I had to make my own Teams space (with just me as a member!) in order to send myself notes here. But not anymore …
You can finally chat with yourself! Note that retention policies may be shorter in chats than Teams spaces … so you might still want to go the route of creating your own Teams space to ensure that note you send yourself for an end-of-the-year task or a list of accomplishments for your annual review are still around when you need them. But for quick notes so you remember something tomorrow (or next week, or three weeks from now), chatting with yourself is perfect for holding short-term reminders.

 

Logstash, JRuby, and Private Temp

There’s a long-standing bug in logstash where the private temp folder created for jruby isn’t cleaned up when the logstash process exits. To avoid filling up the temp disk space, I put together a quick script to check the PID associated with each jruby temp folder, see if it’s an active process, and remove the temp folder if the associated process doesn’t exist.

When the PID has been re-used, this means we’ve got an extra /tmp/jruby-### folder hanging about … but each folder is only 10 meg. The impacting issue is when we’ve restarted logstash a thousand times and a thousand ten meg folders are hanging about.

This script can be cron’d to run periodically or it can be run when the logstash service launches.

import subprocess
import re

from shutil import rmtree

strResult = subprocess.check_output(f"ls /tmp", shell=True)

for strLine in strResult.decode("utf-8").split('\n'):
        if len(strLine) > 0 and strLine.startswith("jruby-"):
                listSplitFileNames = re.split("jruby-([0-9]*)", strLine)
                if listSplitFileNames[1] is not None:
                        try:
                                strCheckPID = subprocess.check_output(f"ps -efww |  grep {listSplitFileNames[1]} | grep -v grep", shell=True)
                                #print(f"PID check result is {strCheckPID}")
                        except:
                                print(f"I am deleting |{strLine}|")
                                rmtree(f"/tmp/{strLine}")

Duck Egg Spaetzle

Duck Egg Spaetzle

Recipe by LisaCourse: SidesDifficulty: Easy
Servings

4

servings
Prep time

15

minutes
Cooking time

5

minutes

Ingredients

  • 1 cup all purpose flour

  • 1 Tbsp buttermilk

  • 1 tsp salt

  • 2 large duck eggs

Method

  • Combine dry ingredients in a bowl and mix.
  • From a depression in the flour, add eggs and buttermilk to hole.
  • Mix the egg and buttermilk together, then fold in flour to create a wet dough.
  • Drop small pieces into boiling water — when they float to the top, they are done.

Logstash

General Info

Logstash is a pipeline based data processing service. Data comes into logstash, is manipulated, and is sent elsewhere. The source is maintained on GitHub by ElasticCo.

Installation

Logstash was downloaded from ElasticCo and installed from a gzipped tar archive to the /opt/elk/logstash folder.

Configuration

The Logstash server is configured using the logstash.yml file.

Logstash uses Log4J 2 for logging. Logging configuration is maintained in the log4j2.properties file

Logstash is java-based, and the JVM settings are maintained in the jvm.options file – this includes min heap space, garbage collection configuration, JRuby settings, etc.

Logstash loads the pipelines defined in /opt/elk/logstash/config/pipelines.yml – each pipeline needs an ID and a path to its configuration. The path can be to a config file or to a folder of config files for the pipeline. The number of workers for the pipeline defaults to the number of CPUs, so we normally define a worker count as well – this can be increased as load dictates.

– pipeline.id: LJR
pipeline.workers: 2
path.config: “/opt/elk/logstash/config/ljr.conf”

Each pipeline is configured in an individual config file that defines the input, any data manipulation to be performed, and the output.

Testing Configuration

As we have it configured, you must reload Logstash to implement any configuration changes. As errors in pipeline definitions will prevent the pipeline from loading, it is best to test the configuration prior to restarting Logstash.

/opt/elk/logstash/bin/logstash –config.test_and_exit -f ljr_firewall_logs_wip.conf

Some errors may occur – if the test ends with “Configuration OK”, then it’s OK!

Automatic Config Reload

The configuration can automatically be reloaded when changes to config files are detected. This doesn’t give you the opportunity to test a configuration prior to it going live on the server (once it’s saved, it will be loaded … or not loaded if there’s an error)

Input

Input instructs logstash about what format data the pipeline will receive – is JSON data being sent to the pipeline, is syslog sending log data to the pipeline, or does data come from STDIN? The types of data that can be received are defined by the input plugins. Each input has its own configuration parameters. We use Beats, syslog, JSON (a codec, not a filter type), Kafka, stuff

The input configuration also indicates which port to use for the pipeline – this needs to be unique!

Input for a pipeline on port 5055 receiving JSON formatted data

Input for a pipeline on port 5100 (both TCP and UDP) receiving syslog data

Output

Output is similarly simple – various output plugins define the systems to which data can be shipped. Each output has its own configuration parameters – ElasticSearch, Kafka, and file are the three output plug-ins we currently use.

ElasticSearch

Most of the data we ingest into logstash is processed and sent to ElasticSearch. The data is indexed and available to users through ES and Kibana.

Kafka

Some data is sent to Kafka basically as a holding queue. It is then picked up by the “aggregation” logstash server, processed some more, and relayed to the ElasticSearch system.

File

File output is generally used for debugging – seeing the output data allows you to verify your data manipulations are working property (as well as just make sure you see data transiting the pipeline without resorting to tcpdump!).

Filter

Filtering allows data to be removed, attributes to be added to records, and parses data into fields. The types of filters that can be applied are defined by the filter plugins. Each plugin has its own documentation. Most of our data streams are filtered using Grok – see below for more details on that.

Conditional rules can be used in filters. This example filters out messages that contain the string “FIREWALL”, “id=firewall”, or “FIREWALL_VRF” as the business need does not require these messages, so there’s no reason to waste disk space and I/O processing, indexing, and storing these messages.

This example adds a field, ‘sourcetype’, with a value that is based on the log file path.

Grok

The grok filter is a Logstash plugin that is used to extract data from log records – this allows us to pull important information into distinct fields within the ElasticSearch record. Instead of having the full message in the ‘message’ field, you can have success/failure in its own field, the logon user in its own field, or the source IP in its own field. This allows more robust reporting. If the use case just wants to store data, parsing the record may not be required. But, if they want to report on the number of users logged in per hour or how much data is sent to each IP address, we need to have the relevant fields available in the document.

Patterns used by the grok filter are maintained in a Git repository – the grok-patterns contains the data types like ‘DATA’ in %{DATA:fieldname}

The following are the ones I’ve used most frequently:

Name Field Type Pattern Notes Notes
DATA Text data .*? This does not expand to the most matching characters – so looking for foo.*?bar in “foobar is not really a word, but foobar gets used a lot in IT documentation” will only match the bold portion of the text
GREEDYDATA Text data .* Whereas this matches the most matching characters – so foo.*bar in “foobar is not really a word, but foobar gets used a lot in IT documentation” matches the whole bold portion of the text
IPV4 IPv4 address
IPV6 IPv6 address
IP IP address – either v4 or v6 (?:%{IPV6}|%{IPV4}) This provides some flexibility as groups move to IPv6 – but it’s a more complex filter, so I’ve been using IPV4 with the understanding that we may need to adjust some parsing rules in the future
LOGLEVEL Text data Regex to match list of standard log level strings – provides data validation over using DATA (i.e. if someone sets their log level to “superawful”, it won’t match)
SYSLOGBASE Text data This matches the standard start of a syslog record. Often used as “%{SYSLOGBASE} %{GREEDYDATA:msgtext}” to parse out the timestamp, facility, host, and program – the remainder of the text is mapped to “msgtext”
URI Text data protocol://stuff text is parsed into the protocol, user, host, path, and query parameters
INT Numeric data (?:[+-]?(?:[0-9]+)) Signed or unsigned integer
NUMBER Numeric data Can include a casting like %{NUMBER:fieldname;int} or %{NUMBER:fieldname;float}
TIMESTAMP_ISO8601 DateTime %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}? There are various other date patterns depending on how the string will be formatted. This is the one that matches YYYYMMDDThh:mm:ss

Parsing an entire log string

In a system with a set format for log data, parsing the entire line is reasonable – and, often, there will be a filter for well-known log types. I.E. if you are using the default Apache HTTPD log format, you don’t need to write a filter for each component of the log line – just match either the HTTPD_COMBINEDLOG or HTTPD_COMMONLOG pattern.

match => { “message” => “%{HTTPD_COMMONLOG}” }

But you can create your own filter as well – internally developed applications and less common vendor applications won’t have prebuilt filter rules.

match => { “message” => “%{TIMESTAMP_ISO8601:logtime} – %{IPV4:srcip} – %{IPV4:dstip} – %{DATA:result}” }

Extracting an array of data

Instead of trying to map an entire line at once, you can extract individual data elements by matching an array of patterns within the message.

match => { “message” => [“srcip=%{IPV4:src_ip}”
, “srcport=%{NUMBER:srcport:int}”
,”dstip=%{IPV4:dst_ip}”
,”dstport=%{NUMBER:dstport:int}”] }

This means the IP and port information will be extracted regardless of the order in which the fields are written in the log record. This also allows you to parse data out of log records where multiple different formats are used (as an example, the NSS Firewall logs) instead of trying to write different parsers for each of the possible string combinations.

Logstash, by default, breaks when a match is found. This means you can ‘stack’ different filters instead of using if tests. Sometimes, though, you don’t want to break when a match is found – maybe you are extracting a bit of data that gets used in another match. In these cases, you can set break_on_match to ‘false’ in the grok rule.

I have also had to set break_on_match when extracting an array of values from a message.

Troubleshooting

Log Files

Logstash logs output to /opt/elk/logstash/logs/logstash-plain.log – the logging level is defined in the /opt/elk/logstash/config/log4j2.properties configuration file.

Viewing Data Transmitted to a Pipeline

There are several ways to confirm that data is being received by a pipeline – tcpdump can be used to verify information is being received on the port. If no data is being received, the port may be offline (if there is an error in the pipeline config, the pipeline will not load – grep /opt/elk/logstash/logs/logstash-plain.log for the pipeline name to view errors), there may be a firewall preventing communication, or the sender could not be transmitting data.

tcpdump dst port 5100 -vv

If data is confirmed to be coming into the pipeline port, add a “file” output filter to the pipeline.

Issues

Data from filebeat servers not received in ElasticSearch

We have encountered a scenario were data from the filebeat servers was not being transmitted to ElasticSearch. Monitoring the filebeat server did not show any data being sent. Restarting the Logstash servers allowed data to be transmitted as expected.

 

OHM v/s Survey Results – Word Clouds

I was extremely suspect when the term ‘bucolic’ made it to the tag cloud presented by OHM — it’s a great word, sure. But with a thousand responses … I highly doubt a significant number of people used the term (five did, based on the raw survey results). So I generated a few word clouds of my own for comparison. This is the image presented in the Township meeting — now the idea of a word cloud is that the size of the word increases related to the frequency in which the word occurs.

Compared to the tag cloud generated by separating responses on commas — not great because some people space-delimited their three words.

And separating on word boundaries — also not great because some people’s “words” were actually phrases, but it gets those who space delimited their three words.

And separating on word boundaries and aligning similar-meaning words (e.g. farm, farms, farmland => farm) which becomes a little subjective — does “wooded” and “trees” mean the same thing? Maybe and maybe not. Same with ‘country’ and ‘countryside’ … or even the difference between ‘farm’ and ‘agriculture’.

Simulating Syslog Data

After creating a syslog pipeline, it is convenient to be able to test that data is being received and parsed as expected. You can use the logger utility (from the util-linux package) using “-n” to specify the target server, -P to specify the target port, either -d for udp or -T for tcp, -i with the process name, -p with the log priority, and the message content in quotes.

As an example, this command sends a sample log record to the logstash server. If the pipeline is working properly, the document will appear in ElasticSearch.

logger -n logstash.example.com -P 5101 -d -i ljrtest -p user.notice '<date=2022-06-22 time=09:09:28 devname="fcd01" \
      devid="AB123DEF45601874" eventtime=1655914168555429048 tz="-0700" logid="0001000014" type="traffic" subtype="local" \ 
      level="notice" vd="EXAMPLE-CORP" srcip=10.4.5.10 srcport=56317 srcintf="VLAN1" srcintfrole="wan" dstip=10.2.3.212 \ 
      dstport=61234 dstintf="EXAMPLE-CORP" dstintfrole="undefined" srccountry="United States" dstcountry="United States" sessionid=3322792 \ 
      proto=6 action="deny" policyid=0 policytype="local-in-policy" service="tcp/61234" trandisp="noop" app="tcp/61234" duration=0 \ 
      sentbyte=0 rcvdbyte=0 sentpkt=0 rcvdpkt=0'

Quick Docker Test

I’m building a quick image to test permissions to a folder structure for a friend … so I’m making a quick note about how I’m building this test image so I don’t have to keep re-typing it all.

FROM python:3.7-slim

RUN apt-get update && apt-get install -y curl --no-install-recommends

RUN mkdir -p /srv/ljr/test

# Create service account
RUN useradd --comment "my service account" -u 30001 -g 30001 --shell /sbin/nologin --no-create-home webuser
# Copy a bunch of stuff various places under /srv
# Set ownership on /srv folder tree
RUN chown -R 30001:30001 /srv

USER 30001:30001
ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]

build the image

docker build -t “ljr:latest” .

Run a transient container that is deleted on exit

docker run –rm -it –entrypoint “/bin/bash” ljr