Tag: python

Watching Redis Records for Strange Changes

I write a lot of things down to save myself time the next time I need to do the same sort of thing — and publish this to the Internet in case I can save someone else time too. But this one is so specific, I’m not sure it’s an “ever going to encounter this again” sort of thing. Just in case, though — I have device data being stored in redis — because the device doesn’t know its throughput values, you need the last time and last value paired with the current device metrics to calculate throughput. OK. But, sporadically, the cached data is updated insomuch as a new record is posted with a new timestamp. But the actual values, other than timestamp, remain unchanged. With millions of interfaces, it’s challenging to identify these situations by spot-checking the visualizations. Instead, I need to monitor redis and identify when the tstamp is updated but no other values change.

import redis
import time
import re
import json
import os

# Configuration
redis_host = 'redishost.example.net'
redis_port = 6379
redis_password = 'P@5sw0rDG03sH3r3'  # Replace with your Redis password
pattern = re.compile(r'INTERFACE_RAW_STATS_hostname\d\d\d\d_\d+_\d+')
output_file = 'changed_records.json'

# Connect to Redis
client = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_password, decode_responses=True)

# Dictionary to track records
records = {}
matching_keys = []

def get_matching_keys():
    """
    Retrieve keys from Redis matching the specified pattern.

    Returns:
        list: A list of keys that match the pattern.
    """
    all_keys = client.keys()
    matching_keys = [key for key in all_keys if pattern.match(key)]
    return matching_keys

def process_keys():
    """
    Process Redis keys to track changes in data.

    Retrieves keys matching the pattern, gets their data using HGETALL,
    and tracks changes. If only the 'tstamp' field has changed and all
    other fields remain the same, the record is written to a file.
    """
    global records
    i = 0

    for key in matching_keys:
        i += 1
        data = client.hgetall(key)
        if i == 1 or i % 1000 == 0:
            print(f"Processed {i} records")

        if not data:
            continue

        collector_name = data.get('collectorName')
        node_id = data.get('nodeId')
        if_index = data.get('ifIndex')
        tstamp = data.get('tstamp')

        if not collector_name or not node_id or not if_index or not tstamp:
            continue

        unique_key = f"{collector_name}_{node_id}_{if_index}"

        if unique_key in records:
            previous_data = records[unique_key]
            if previous_data['tstamp'] != tstamp:
                # Check if all other values are the same
                if all(data[k] == previous_data[k] for k in data if k != 'tstamp'):
                    print(f"***** Record changed: {json.dumps(data, indent=2)} *****")
                    write_to_file(data)
            records[unique_key] = data  # Update the record
        else:
            records[unique_key] = data

def write_to_file(data):
    """
    Write the given data to a file.

    Args:
        data (dict): The data to write to the file.
    """
    with open(output_file, 'a') as file:
        file.write(json.dumps(data) + '\n')

if __name__ == "__main__":
    # Ensure the output file is empty at the start
    if os.path.exists(output_file):
        os.remove(output_file)

    # Retrieve the list of matching keys once
    matching_keys = get_matching_keys()

    while True:
        process_keys()
        print("Sleeping ... ")
        time.sleep(300)  # Sleep for 5 minutes

Migrating Redis Data

So, I know that Redis should be a data cache that can be repopulated … but we use it to calculate deltas (what was the value last time) … so repopulating the information makes the first half hour or so of calculations rather slow as the application tries redis, gets nothing, and fails back to a database query. Then we get a backlog of data to churn through, and it would just be better if the Redis cache hadn’t gone away in the first place. And if you own both servers and the files are in the same format, you could just copy the cache db from the old server to the new one. But … when you cannot just copy the file and you would really prefer the data not disappear and need to be repopulated … there’s a script for that! This python script reads all of the data from the “old” server and populates it into the “new” server.

import redis

def migrate_data(redis_source_host, redis_source_port, redis_source_db, redis_source_password,
                 redis_dest_host, redis_dest_port, redis_dest_db, redis_dest_password):
    # Connect to the source Redis server
    source_client = redis.StrictRedis(host=redis_source_host, port=redis_source_port, db=redis_source_db, password=redis_source_password)

    # Connect to the destination Redis server
    dest_client = redis.StrictRedis(host=redis_dest_host, port=redis_dest_port, db=redis_dest_db, password=redis_dest_password)

    # Fetch all keys from the source Redis
    keys = source_client.keys('*')

    for key in keys:
        # Get the type of the key
        key_type = source_client.type(key).decode('utf-8')

        if key_type == 'string':
            value = source_client.get(key)
            print("Setting string value in dest")
            dest_client.set(key, value)
        elif key_type == 'list':
            values = source_client.lrange(key, 0, -1)
            print("Setting list value in dest")
            dest_client.delete(key)  # Ensure the list is empty before pushing
            for value in values:
                dest_client.rpush(key, value)
        elif key_type == 'set':
            values = source_client.smembers(key)
            print("Setting set value in dest")
            dest_client.delete(key)  # Ensure the set is empty before pushing
            for value in values:
                dest_client.sadd(key, value)
        elif key_type == 'zset':
            values = source_client.zrange(key, 0, -1, withscores=True)
            print("Setting zset value in dest")
            dest_client.delete(key)  # Ensure the zset is empty before pushing
            for value, score in values:
                dest_client.zadd(key, {value: score})
        elif key_type == 'hash':
            values = source_client.hgetall(key)
            print("Setting hash value in dest")
            dest_client.delete(key)  # Ensure the hash is empty before pushing
            dest_client.hmset(key, values)

    print("Data migration completed.")

if __name__ == "__main__":
    # Source Redis server details
    redis_source_host = 'oldredis.example.com'
    redis_source_port = 6379
    redis_source_db = 0
    redis_source_password = 'SourceRedisPassword'

    # Destination Redis server details
    redis_dest_host = 'newredis.example.com'
    redis_dest_port = 6379
    redis_dest_db = 0
    redis_dest_password = 'DestRedisPassword'

    # Migrate data
    migrate_data(redis_source_host, redis_source_port, redis_source_db, redis_source_password,
                 redis_dest_host, redis_dest_port, redis_dest_db, redis_dest_password)

SNMP Simulator

Background

As communication between development and production platforms is limited for security and data integrity reasons, this creates a challenge when testing changes in development: we cannot access “real world” data with which to perform tests. Having a limited set of data in development means testing may not illuminate issues that occur at high volume or on a large scale.

Solution

While limiting communication between the prod and dev systems is reasonable, it would be beneficial to be able to replay production-like data within our development systems for testing purposes. While it is not cost effective to buy large network devices with thousands of interfaces for testing, the Python module snmpsim provides “canned responses” that simulate real devise on the production network. For simplicity, I have a bash script that launches the SNMP responder.

server03:snmpsim # cat ../_playback.sh

#!/bin/bash

snmpsimd.py –data-dir=/opt/snmp/snmpsim/data –cache-dir=/opt/snmp/snmpsim/cache –agent-udpv4-endpoint=0.0.0.0:161 –process-user=ljrsnmp –process-group=ljrsnmp

This responder will replay data stored in the directory /opt/snmp/snmpsim/data – any file ending in snmprec will be included in the response, and the filename prior to .snmprec is the community string to access the response data. E.G. public.snmprec is the data for the public community string

The response files are in the format OID|TAG|VALUE where OID is the OID number of the SNMP object, TAG is an integer defined at https://pypi.org/project/snmpsim/0.2.3/

Valid tag values and their corresponding ASN.1/SNMP types are:

ASN.1/SNMP TypeTag Value
Integer322
Octet String4
Null5
Object Identifier6
IP Address64
Counter3265
Gauge3266
Time Ticks67
Opaque68
Counter6570

And the value is the data to be returned for the OID object. As an example:

1.3.6.1.2.1.1.3.0|67|2293092270

1.3.6.1.2.1.1.3.0 is the sysUpTime, the data type is TimeTicks, and the system up time is 2293092270 hundredths of a second. Or 6375 hours, 20 minutes, and 24 seconds.

Items within the response file need to be listed in ascending order.

Generating Response Data

There are two methods for creating the data provided to an SNMP GET request. A response file can be created manually, populated with OID objects that should be included in the response as well as sample data. Alternatively, a network trace can be gathered from the production network and parsed to create the response file.

Manually Generated Response File

While you can literally type data into a response file, but it is far easier to use a script to generate sample data. /opt/snmp/snmpsim/_genData.py is an example of creating a response file for about 1,000 interfaces

from datetime import datetime
import random

iRangeMax = 1000

dictTags = {'Integer': '2', 'OctetString': '4', 'NULL': '5', 'ObjectIdentifier': '6', 'IPAddress': '64', 'Counter32': '65', 'Gauge32': '66', 'TimeTicks': '67', 'Opaque': '68','Counter64': '70'}  # Valid tags per https://pypi.org/project/snmpsim/0.2.3/

today = datetime.now()

iftable_snmp_objects = [
    ('1.3.6.1.2.1.2.2.1.1', 'Integer', lambda i: i),  # ifIndex
    ('1.3.6.1.2.1.2.2.1.2', 'OctetString', lambda i: f"SampleInterface{i}"),  # ifDescr
    ('1.3.6.1.2.1.2.2.1.3', 'Integer', lambda i: 6),  # ifType
    ('1.3.6.1.2.1.2.2.1.4', 'Integer', lambda i: 1500),  # ifMtu
    ('1.3.6.1.2.1.2.2.1.5', 'Gauge32', lambda i: 100000000),  # ifSpeed
    ('1.3.6.1.2.1.2.2.1.6', 'OctetString', lambda i: f"00:00:00:00:{format(i, '02x')[:2]}:{format(i, '02x')[-2:]}"),  # ifPhysAddress
    ('1.3.6.1.2.1.2.2.1.7', 'Integer', lambda i: 1),  # ifAdminStatus
    ('1.3.6.1.2.1.2.2.1.8', 'Integer', lambda i: 1),  # ifOperStatus
    ('1.3.6.1.2.1.2.2.1.9', 'TimeTicks', lambda i: int((datetime.now() - datetime(2024, random.randint(1, today.month), random.randint(1, today.day))).total_seconds()) * 100),  # ifLastChange
    ('1.3.6.1.2.1.2.2.1.10', 'Counter32', lambda i: random.randint(3, i*50000)),  # ifInOctets
    ('1.3.6.1.2.1.2.2.1.11', 'Counter32', lambda i: random.randint(3, i*50000)),  # ifInUcastPkts
    ('1.3.6.1.2.1.2.2.1.12', 'Counter32', lambda i: random.randint(0, 80)),  # ifInNUcastPkts
    ('1.3.6.1.2.1.2.2.1.13', 'Counter32', lambda i: random.randint(0, 80)),  # ifInDiscards
    ('1.3.6.1.2.1.2.2.1.14', 'Counter32', lambda i: random.randint(0, 80)),  # ifInErrors
    ('1.3.6.1.2.1.2.2.1.15', 'Counter32', lambda i: random.randint(3, i*50000)),  # ifInUnknownProtos
    ('1.3.6.1.2.1.2.2.1.16', 'Counter32', lambda i: random.randint(3, i*50000)),  # ifOutOctets
    ('1.3.6.1.2.1.2.2.1.17', 'Counter32', lambda i: random.randint(3, i*50000)),  # ifOutUcastPkts
    ('1.3.6.1.2.1.2.2.1.18', 'Counter32', lambda i: random.randint(3, i*50000)),  # ifOutNUcastPkts
    ('1.3.6.1.2.1.2.2.1.19', 'Counter32', lambda i: random.randint(0, 80)),  # ifOutDiscards
    ('1.3.6.1.2.1.2.2.1.20', 'Counter32', lambda i: random.randint(0, 80)),  # ifOutErrors
]

ifxtable_snmp_objects = [
    ('1.3.6.1.2.1.31.1.1.1.1', 'OctetString', lambda i: f"SampleInterface{i}"),  # ifName
    ('1.3.6.1.2.1.31.1.1.1.15', 'Gauge32', lambda i: "100"),  # ifHighSpeed
    ('1.3.6.1.2.1.31.1.1.1.6', 'Counter32', lambda i: random.randint(3, i*50000)),  # ifHCInOctets
    ('1.3.6.1.2.1.31.1.1.1.10', 'Counter32', lambda i: random.randint(3, i*60000)),  # ifHCOutOctets
]

# Print IFTable data
for oid_base, tag_type, value_func in iftable_snmp_objects:
    for i in range(1, iRangeMax+1):
        value = value_func(i)
        print(f"{oid_base}.{i}|{dictTags.get(tag_type)}|{value}")

# IP-MIB objects for managing IP addressing
# ipAdEntAddr: The IP address to which this entry's addressing information pertains
print(f"1.3.6.1.2.1.4.20.1.1|{dictTags.get('IPAddress')}|10.5.5.5")

# ipAdEntIfIndex: The index value which uniquely identifies the interface to which this entry is applicable
print(f"1.3.6.1.2.1.4.20.1.2|{dictTags.get('OctetString')}|1")

# ipAdEntNetMask: The subnet mask associated with the IP address of this entry
print(f"1.3.6.1.2.1.4.20.1.3|{dictTags.get('OctetString')}|255.255.255.0")

# hrSWRunIndex: An index uniquely identifying a row in the hrSWRun table
print(f"1.3.6.1.2.1.25.4.2.1.1.1|{dictTags.get('Integer')}|1")

# hrSWRunName: The name of the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.2.1|{dictTags.get('OctetString')}|LJRSNMPAgent")
# hrSWRunID: The product ID of the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.3.1|{dictTags.get('ObjectIdentifier')}|1.3.6.1.4.1.25709.55")

# hrSWRunPath: The path of the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.4.1|{dictTags.get('OctetString')}|/opt/snmp/snmpsim/_agent.sh")

# hrSWRunParameters: Operational parameters for the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.5.1|{dictTags.get('OctetString')}|-L")

# hrSWRunType: The type of software running (e.g., operating system, application)
print(f"1.3.6.1.2.1.25.4.2.1.6.1|{dictTags.get('Integer')}|4")

# hrSWRunStatus: The status of this software (running, runnable, notRunnable, invalid)
print(f"1.3.6.1.2.1.25.4.2.1.7.1|{dictTags.get('Integer')}|1")


for oid_base, tag_type, value_func in ifxtable_snmp_objects:
    for i in range(1, iRangeMax+1):
        value = value_func(i)
        print(f"{oid_base}.{i}|{dictTags.get(tag_type)}|{value}")

Network Capture

Even better, parse a network capture file.

Capture Data

On the server that gathers SNMP data from the host we want to simulate, use a network capture utility to gather the SNMP communication between the server and the desired device.

tcpdump -i <interface> -w <filename>.pcap

E.G. to record the communication with 10.5.171.114

tcpdump ‘host 10.5.171.114 and (tcp port 161 or tcp port 162 or udp port 161 or udp port 162)’ -w /tmp/ar.pcap

Note – there Is no benefit to capturing more than one cycle of SNMP responses. If data is captured immediately, that means the devices were in the middle of a cycle. End the capture and start a new one shortly. There should be no packets captured for a bit, then packets during the SNMP polling cycle, and then another pause until the next cycle.

Parsing The Capture Data Into A Response File

The following script parses the capture file into an snmprec response file – note, I needed to use 2.6.0rc1 of scapy to parse SNMP data. The 2.5.0 release version failed to parse most of the packets which I believe is related to https://github.com/secdev/scapy/issues/3900

from scapy.all import rdpcap, SNMP
from scapy.layers.inet import UDP
from scapy.packet import Raw
from scapy.layers.snmp import SNMP, SNMPvarbind, SNMPresponse, SNMPbulk
from scapy.all import conf, load_layer
from scapy.utils import hexdump

from scapy.all import UDP, load_contrib
from scapy.packet import bind_layers

import os
from datetime import datetime
import argparse

# Ensure Scapy's SNMP contributions are loaded
load_contrib("snmp")

def sort_by_oid(listSNMPResponses):
    """
    Sorts a list of "OID|TAG|Value" strings by the OID numerically and hierarchically.

    :param listSNMPResponses: A list of "OID|TAG|Value" strings.
    :return: A list of "OID|TAG|Value" strings sorted by OID.
    """
    # Split each element into a tuple of (OID list, original string), converting OID to integers for proper comparison
    oid_tuples = [(list(map(int, element.split('|')[0].split('.'))), element) for element in listSNMPResponses]

    # Sort the list of tuples by the OID part (the list of integers)
    sorted_oid_tuples = sorted(oid_tuples, key=lambda x: x[0])

    # Extract the original strings from the sorted list of tuples
    sorted_listSNMPResponses = [element[1] for element in sorted_oid_tuples]

    return sorted_listSNMPResponses

parser = argparse.ArgumentParser(description='This script converts an SNMP packet capture into a snmpsim response file')
parser.add_argument('--filename', '-f', help='The capture file to process', required=True)

args = parser.parse_args()
strFullCaptureFilePath = args.filename
strCaptureFilePath, strCaptureFileName = os.path.split(strFullCaptureFilePath)


# Valid tags per https://pypi.org/project/snmpsim/0.2.3/
dictTags = {'ASN1_INTEGER': '2', 'ASN1_STRING': '4', 'ASN1_NULL': '5', 'ASN1_OID': '6', 'ASN1_IPADDRESS': '64', 'ASN1_COUNTER32': '65', 'ASN1_GAUGE32': '66', 'ASN1_TIME_TICKS': '67', 'Opaque': '68','ASN1_COUNTER64': '70'}

listSNMPResponses = []
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.1.1|2|1")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.2.1|4|LJRSNMPAgent")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.3.1|6|1.3.6.1.4.1.25709.55")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.4.1|4|/opt/snmp/snmpsim/_agent.sh")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.5.1|4|-L")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.6.1|2|4")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.7.1|2|1")
i = 0

if True:
    packets = rdpcap(strFullCaptureFilePath)
    # Packets are zero indexed, so packet 1 in script is packet 2 in Wireshark GUI
    #for i in range(0,4):
    for packet in packets:
        print(f"Working on packet {i}")
        i = i + 1
        if SNMP in packet:
            snmp_layer = packet[SNMP]
            if isinstance(packet[SNMP].PDU,SNMPresponse):
                snmp_response = snmp_layer.getfield_and_val('PDU')[1]
                if hasattr(snmp_response, 'varbindlist') and snmp_response.varbindlist is not None:
                    for varbind in snmp_response.varbindlist:
                        strOID = varbind.oid.val if hasattr(varbind.oid, 'val') else str(varbind.oid)
                        strValue = varbind.value.val if hasattr(varbind.value, 'val') else str(varbind.value)
                        strType = type(varbind.value).__name__
                        if dictTags.get(strType):
                            iType = dictTags.get(strType)
                        else:
                            iType = strType

                        if isinstance(strValue, bytes):
                            print(f"Decoding {strValue}")
                            strValue = strValue.decode('utf-8',errors='ignore')

                        print(f"OID: {strOID}, Type: {strType}, Tag: {iType}, Value: {strValue}")
                        listSNMPResponses.append(f"{strOID}|{iType}|{strValue}")
            else:
                print(f"Not a response -- type is {type(packet[SNMP].PDU)}")
        elif Raw in packet:
            print(f"I have a raw packet at {i}")
        else:
            print(dir(packet))
            print(f"No SNMP or Raw in {i}: {packet}")

# Sort by OID numbers
listSortedSNMPResponses = sort_by_oid(listSNMPResponses)
f = open(f'/opt/snmp/snmpsim/data/{datetime.now().strftime("%Y%m%d")}-{strCaptureFileName.rsplit(".", 1)[0]}.deactivated', "w")
for strSNMPResponse in listSortedSNMPResponses:
    print(strSNMPResponse)
    f.write(strSNMPResponse)
    f.write("\n")
f.close()

This will create an snmpsim response file at /opt/snmp/snmpsim/data named as the capture file prefixed with the current year, month, and date. I.E. My ar.cap file results are /opt/snmp/snmpsim/data/20240705-ar.deactivated – you can then copy the file to whatever community string you want – cp 20240705-ar.deactivated CommunityString.snmprec

Python Script: Alert for pending SAML IdP Certificate Expiry

I got a rather last minute notice from our security department that the SSL certificate used in the IdP partnership between my application and their identity provider would be expiring soon and did I want to renew it Monday, Tuesday, or Wednesday. Being that this was Friday afternoon … “none of the above” would have been my preference to avoid filing the “emergency change” paperwork, but Wednesday was the least bad of the three options. Of course, an emergency requires paperwork as to why you didn’t plan two weeks in advance. And how you’ll do better next time.

Sometimes that is a bit of a stretch — next time someone is working on the electrical system and drops a half-inch metal plate into the building wiring, I’m probably still going to have a problem when the power drops. But, in this case, there are two perfectly rational solutions. One, of course, would be that the people planning the certificate renewals start contacting partner applications more promptly. But that’s not within my purview. The thing I can do is watch the metadata on the identity provider and tell myself when the certificates will be expiring soon.

So I now have a little python script that has a list of all of our SAML-authenticated applications. It pulls the metadata from PingID, loads the X509 certificate, checks how far in the future the expiry date is. In my production version, anything < 30 days sends an e-mail alert. Next time, we can contact security ahead of time, find out when they’re planning on doing the renewal, and get the change request approved well in advance.

import requests
import xml.etree.ElementTree as ET
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from datetime import datetime, date

strIDPMetadataURLBase = 'https://login.example.com/pf/federation_metadata.ping?PartnerSpId='
listSPIDs = ["https://tableau.example.com", "https://email.example.com", "https://internal.example.com", "https://salestool.example.com"]

for strSPID in listSPIDs:
    objResults = requests.get(f"{strIDPMetadataURLBase}{strSPID}")
    if objResults.status_code == 200:
        try:
            root = ET.fromstring(objResults.text)

            for objX509Cert in root.findall("./{urn:oasis:names:tc:SAML:2.0:metadata}IDPSSODescriptor/{urn:oasis:names:tc:SAML:2.0:metadata}KeyDescriptor/{http://www.w3.org/2000/09/xmldsig#}KeyInfo/{http://www.w3.org/2000/09/xmldsig#}X509Data/{http://www.w3.org/2000/09/xmldsig#}X509Certificate"):
                strX509Cert = f"-----BEGIN CERTIFICATE-----\n{objX509Cert.text}\n-----END CERTIFICATE-----"

                cert = x509.load_pem_x509_certificate(bytes(strX509Cert,'utf8'), default_backend())
                iDaysUntilExpiry = cert.not_valid_after - datetime.today()
                print(f"{strSPID}\t{iDaysUntilExpiry.days}")
        except:
            print(f"{strSPID}\tFailed to decode X509 Certficate")
    else:
        print(f"{strSPID}\tFailed to retrieve metadata XML")

Python: Listing XML tags

I was having a lot of trouble using find/findall when parsing an XML document — turns out the namespace prefixed the tag name … so I needed to find {http://maven.apache.org/POM/4.0.0}groupId instead of just groupId

How do you figure that out? Quickest way, for me, was just to print out all of the tag names.

from lxml import etree
# Load POM XML into tree
tree = etree.parse( strXMLFile )

# # List all element names in XML document
for element in tree.iter():
     print(element.tag)

Python: Generate Transcript of Video File

There’s a speech_recognition module in Python that transcribes an audio file — since ffmpeg can convert a video file to mp3, that means you can also use Python to transcribe a video file.

# requires pocketsphinx from CMU if using sphinx for speech to text recognition
import os
import speech_recognition as sr
import ffmpeg

strFFMPEGBinaryLocation = 'c:/tmp/ffmpeg/bin/ffmpeg.exe'
strCurrentDirectory = os.getcwd()

strInputVideo = "\"Z:/Path To/My Video/file.MP4\""
strOutputFileName = "converted.wav"
# Convert mp4 to wav file
strffmpeg_convert_mp4_to_wav = f'{strFFMPEGBinaryLocation} -i {strInputVideo} {strCurrentDirectory}/{strOutputFileName}'
os.system(strffmpeg_convert_mp4_to_wav)

# Run converted wav file through speech recognizer
r = sr.Recognizer()
audio = sr.AudioFile(f'{strCurrentDirectory}/{strOutputFileName}')

with audio as source:
	#audio = r.record(source, 90)				# Would need API key to process longer audio?
	#text = r.recognize_google(audio)
	audio = r.record(source)
	text = r.recognize_sphinx(audio)
print(text)

Python Code — Creating Title Images

Instead of allowing YouTube to randomly pick a frame to use as the preview image, I have always made a title image for the Township meetings I post to YouTube. At first, this was a manual (and thus time consuming for a lot of videos). In the interim, I have created a script that generates the color gradient background and overlays text including the meeting type and date.

# Valid meeting types: "TrusteeRegular",  "TrusteeSpecial", "TrusteeEmer", "TrusteeHearing", "BZAReg", "BZAHearing", "ZCReg", "ZCHearing"
strMeetingListSpreadsheet = 'MeetingList.xlsx'

from PIL import Image, ImageDraw, ImageFont
import pandas as pd

BLACK= (0,0,0)
WHITE = (255,255,255)

TRUSTEE_COLOR_PALETTE = [(156,12,12), (92,7,7), (0,0,0)]
BZA_COLOR_PALETTE = [(253,139,1), (91,51,0), (0,0,0)]
ZC_COLOR_PALETTE = [(24,113,56), (8,41,20), (0,0,0)]
MISC_COLOR_PALETTE = [(175,28,195), (55,9,61), (0,0,0)]

objFontMeetingTitle = ImageFont.truetype("/usr/share/fonts/liberation-sans/LiberationSans-Regular.ttf",115)
objFontMeetingTopic = ImageFont.truetype("/usr/share/fonts/liberation-sans/LiberationSans-Regular.ttf",115)
objFontMeetingDate = ImageFont.truetype("/usr/share/fonts/liberation-sans/LiberationSans-Italic.ttf",95)

class Point(object):
    def __init__(self, x, y):
        self.x, self.y = x, y

class Rect(object):
    def __init__(self, x1, y1, x2, y2):
        minx, maxx = (x1,x2) if x1 < x2 else (x2,x1)
        miny, maxy = (y1,y2) if y1 < y2 else (y2,y1)
        self.min = Point(minx, miny)
        self.max = Point(maxx, maxy)

    width  = property(lambda self: self.max.x - self.min.x)
    height = property(lambda self: self.max.y - self.min.y)

def gradient_color(minval, maxval, val, color_palette):
    """ Computes intermediate RGB color of a value in the range of minval
        to maxval (inclusive) based on a color_palette representing the range.
    """
    max_index = len(color_palette)-1
    delta = maxval - minval
    if delta == 0:
        delta = 1
    v = float(val-minval) / delta * max_index
    i1, i2 = int(v), min(int(v)+1, max_index)
    (r1, g1, b1), (r2, g2, b2) = color_palette[i1], color_palette[i2]
    f = v - i1
    return int(r1 + f*(r2-r1)), int(g1 + f*(g2-g1)), int(b1 + f*(b2-b1))

def horz_gradient(draw, rect, color_func, color_palette):
    minval, maxval = 1, len(color_palette)
    delta = maxval - minval
    width = float(rect.width)  # Cache.
    for x in range(rect.min.x, rect.max.x+1):
        f = (x - rect.min.x) / width
        val = minval + f * delta
        color = color_func(minval, maxval, val, color_palette)
        draw.line([(x, rect.min.y), (x, rect.max.y)], fill=color)

def vert_gradient(draw, rect, color_func, color_palette):
    minval, maxval = 1, len(color_palette)
    delta = maxval - minval
    height = float(rect.height)  # Cache.
    for y in range(rect.min.y, rect.max.y+1):
        f = (y - rect.min.y) / height
        val = minval + f * delta
        color = color_func(minval, maxval, val, color_palette)
        draw.line([(rect.min.x, y), (rect.max.x, y)], fill=color)


if __name__ == '__main__':
    df = pd.read_excel(strMeetingListSpreadsheet, sheet_name="Sheet1")

    df = df.reset_index()  # make sure indexes pair with number of rows

    for index, row in df.iterrows():
        strGraphicName = f"{row['Date'].strftime('%Y%d%m')}-{row['Type']}.png"
        strMeetingType = row['Type']

        # Draw a three color horizontal gradient.
        region = Rect(0, 0, 1920, 1080)
        width, height = region.max.x+1, region.max.y+1
        image = Image.new("RGB", (width, height), BLACK)
        draw = ImageDraw.Draw(image)

        # Add meeting title
        if strMeetingType == "TrusteeRegular":
            horz_gradient(draw, region, gradient_color, TRUSTEE_COLOR_PALETTE)
            draw.text((1670, 525),"Trustee Regular Meeting",WHITE,font=objFontMeetingTopic, anchor="rm")
        elif strMeetingType == "TrusteeSpecial":
            horz_gradient(draw, region, gradient_color, TRUSTEE_COLOR_PALETTE)
            draw.text((1670, 525),"Trustee Special Meeting",WHITE,font=objFontMeetingTopic, anchor="rm")
        elif strMeetingType == "TrusteeEmer":
            horz_gradient(draw, region, gradient_color, TRUSTEE_COLOR_PALETTE)
            draw.text((1670, 525),"Trustee Emergency Meeting",WHITE,font=objFontMeetingTopic, anchor="rm")
        elif strMeetingType == "TrusteeHearing":
            horz_gradient(draw, region, gradient_color, TRUSTEE_COLOR_PALETTE)
            draw.text((1670, 525),"Trustee Public Hearing",WHITE,font=objFontMeetingTopic, anchor="rm")
        elif strMeetingType == "BZAReg":
            horz_gradient(draw, region, gradient_color, BZA_COLOR_PALETTE)
            draw.text((1670, 525),"BZA Regular Meeting",WHITE,font=objFontMeetingTopic, anchor="rm")
        elif strMeetingType == "BZAHearing":
            horz_gradient(draw, region, gradient_color, BZA_COLOR_PALETTE)
            draw.text((1670, 525),"BZA Public Hearing",WHITE,font=objFontMeetingTopic, anchor="rm")
        elif strMeetingType == "ZCReg":
            horz_gradient(draw, region, gradient_color, ZC_COLOR_PALETTE)
            draw.text((1670, 525),"Zoning Commission Meeting",WHITE,font=objFontMeetingTopic, anchor="rm")
        elif strMeetingType == "ZCHearing":
            horz_gradient(draw, region, gradient_color, ZC_COLOR_PALETTE)
            draw.text((1670, 525),"Zoning Commission Hearing",WHITE,font=objFontMeetingTopic, anchor="rm")
        else:
            horz_gradient(draw, region, gradient_color, MISC_COLOR_PALETTE)
            draw.text((1670, 525),"Township Meeting",WHITE,font=objFontMeetingTopic, anchor="rm")

        # Add township and date
        draw.text((1070, 225),"Hinckley Township",WHITE,font=objFontMeetingTitle, anchor="rm")
        draw.text((1770, 825),row['Date'].strftime('%B %d, %Y'),WHITE,font=objFontMeetingDate, anchor="rm")

        image.save(strGraphicName, "PNG")
        print(f"image saved as {strGraphicName}")


I have an Excel file which contains the meeting type code, a long meeting title that is used as the second line of the image, a date (and a MeetingDate that I use in my concat formulae that create the title and description for YouTube). To use an Excel date in concat, you need to use a TEXT formula with the text formatting string.

This allows me to have a consistent preview image for all of our postings without actually making dozens of files by hand.

Bulk Download of YouTube Videos from Channel

Several years ago, I started recording our Township meetings and posting them to YouTube. This was very helpful — even our government officials used the recordings to refresh their memory about what happened in a meeting. But it also led people to ask “why, exactly, are we relying on some random citizen to provide this service? What if they are busy? Or move?!” … and the Township created their own channel and posted their meeting recordings. This was a great way to promote transparency however they’ve got retention policies. Since we have absolutely been at meetings where it would be very helpful to know what happened five, ten, forty!! years ago … my expectation is that these videos will be useful far beyond the allotted document retention period.

We decided to keep our channel around with the historic archive of government meeting recordings. There’s no longer time criticality — anyone who wants to see a current meeting can just use the township’s channel. We have a script that lists all of the videos from the township’s channel and downloads them — once I complete back-filling our archive, I will modify the script to stop once it reaches a video series we already have. But this quick script will list all videos published to a channel and download the highest quality MP4 file associated with that video.

# API key for my Google Developer project
strAPIKey = '<CHANGEIT>'

# Youtube account channel ID
strChannelID = '<CHANGEIT>'

import os
from time import sleep
import urllib
from urllib.request import urlopen
import json
from pytube import YouTube
import datetime

from config import dateLastDownloaded

os.chdir(os.path.dirname(os.path.abspath(__file__)))
print(os.getcwd())

strBaseVideoURL = 'https://www.youtube.com/watch?v='
strSearchAPIv3URL= 'https://www.googleapis.com/youtube/v3/search?'

iStart = 0		# Not used -- included to allow skipping first N files when batch fails midway
iProcessed = 0		# Just a counter

strStartURL = f"{strSearchAPIv3URL}key={strAPIKey}&channelId={strChannelID}&part=snippet,id&order=date&maxResults=50"
strYoutubeURL = strStartURL

while True:
    inp = urllib.request.urlopen(strYoutubeURL)
    resp = json.load(inp)

    for i in resp['items']:
        if i['id']['kind'] == "youtube#video":
            iDaysSinceLastDownload = datetime.datetime.strptime(i['snippet']['publishTime'], "%Y-%m-%dT%H:%M:%SZ") - dateLastDownloaded
            # If video was posted since last run time, download the video
            if iDaysSinceLastDownload.days >= 0:
                strFileName = (i['snippet']['title']).replace('/','-').replace(' ','_')
                print(f"{iProcessed}\tDownloading file {strFileName} from {strBaseVideoURL}{i['id']['videoId']}")
                # Need to retrieve a youtube object and filter for the *highest* resolution otherwise we get blurry videos
                if iProcessed >= iStart:
                    yt = YouTube(f"{strBaseVideoURL}{i['id']['videoId']}")
                    yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first().download(filename=f"{strFileName}.mp4")
                    sleep(90)
                iProcessed = iProcessed + 1
    try:
        next_page_token = resp['nextPageToken']
        strYoutubeURL = strStartURL + '&pageToken={}'.format(next_page_token)
        print(f"Now getting next page from {strYoutubeURL}")
    except:
        break

# Update config.py with last run date
f = open("config.py","w")
f.write("import datetime\n")
f.write(f"dateLastDownloaded = datetime.datetime({datetime.datetime.now().year},{datetime.datetime.now().month},{datetime.datetime.now().day},0,0,0)")
f.close

Maintaining an /etc/hosts record

I encountered an oddity at work — there’s a server on an internally located public IP space. Because it’s public space, it is not allowed to communicate with the internal interface of some of our security group’s servers. It has to use their public interface (not technically, just a policy on which they will not budge). I cannot just use a DNS server that resolves the public copy of our zone because then we’d lose access to everything else, so we are stuck making an /etc/hosts entry. Except this thing changes IPs fairly regularly (hey, we’re moving from AWS to Azure; hey, let’s try CloudFlare; nope, that is expensive so change it back) and the service it provides is application authentication so not something you want randomly falling over every couple of months.

So I’ve come up with a quick script to maintain the /etc/hosts record for the endpoint.

# requires: dnspython, subprocess

import dns.resolver
import subprocess

strHostToCheck = 'hostname.example.com' # PingID endpoint for authentication
strDNSServer = "8.8.8.8"         # Google's public DNS server
listStrIPs = []

# Get current assignement from hosts file
listCurrentAssignment = [ line for line in open('/etc/hosts') if strHostToCheck in line]

if len(listCurrentAssignment) >= 1:
        strCurrentAssignment = listCurrentAssignment[0].split("\t")[0]

        # Get actual assignment from DNS
        objResolver = dns.resolver.Resolver()
        objResolver.nameservers = [strDNSServer]
        objHostResolution = objResolver.query(strHostToCheck)

        for objARecord in objHostResolution:
                listStrIPs.append(objARecord.to_text())

        if len(listStrIPs) >= 1:
                # Fix /etc/hosts if the assignment there doesn't match DNS
                if strCurrentAssignment in listStrIPs:
                        print(f"Nothing to do -- hosts file record {strCurrentAssignment} is in {listStrIPs}")
                else:
                        print(f"I do not find {strCurrentAssignment} here, so now fix it!")
                        subprocess.call([f"sed -i -e 's/{strCurrentAssignment}\t{strHostToCheck}/{listStrIPs[0]}\t{strHostToCheck}/g' /etc/hosts"], shell=True)
        else:
                print("No resolution from DNS ... that's not great")
else:
        print("No assignment found in /etc/hosts ... that's not great either")

Python Script: Checking Certificate Expiry Dates

A long time ago, before the company I work for paid for an external SSL certificate management platform that does nice things like clue you into the fact your cert is expiring tomorrow night, we had an outage due to an expired certificate. One. I put together a perl script that parsed output from the openssl client and proactively alerted us of any pending expiration events.

That script went away quite some time ago — although I still have a copy running at home that ensures our SMTP and web server certs are current. This morning, though, our K8s environment fell over due to expired certificates. Digging into it, the certs are in the management platform (my first guess was self-signed certificates that wouldn’t have been included in the pending expiry notices) but were not delivered to those of us who actually manage the servers. Luckily it was our dev k8s environment and we now know the prod one will be expiring in a week or so. But I figured it was a good impetus to resurrect the old script. Unfortunately, none of the modules I used for date calculation were installed on our script server. Seemed like a hint that I should rewrite the script in Python. So … here is a quick Python script that gets certificates from hosts and calculates how long until the certificate expires. Add on a “if” statement and a notification function, and we shouldn’t come in to failed environments needing certificate renewals.

from cryptography import x509
from cryptography.hazmat.backends import default_backend
import socket
import ssl
from datetime import datetime, timedelta

# Dictionary of hosts:port combinations to check for expiry
dictHostsToCheck = {
"tableau.example.com": 443       # Tableau 
,"kibana.example.com": 5601      # ELK Kibana
,"elkmaster.example.com": 9200   # ELK Master
,"kafka.example.com": 9093       # Kafka server
}
for strHostName in dictHostsToCheck:
    iPort = dictHostsToCheck[strHostName]

    datetimeNow = datetime.utcnow()

    # create default context
    context = ssl.create_default_context()

    # Do not verify cert chain or hostname so we ensure we always check the certificate
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE

    with socket.create_connection((strHostName, iPort)) as sock:
        with context.wrap_socket(sock, server_hostname=strHostName) as ssock:
            objDERCert = ssock.getpeercert(True)
            objPEMCert = ssl.DER_cert_to_PEM_cert(objDERCert)
            objCertificate = x509.load_pem_x509_certificate(str.encode(objPEMCert),backend=default_backend())

            print(f"{strHostName}\t{iPort}\t{objCertificate.not_valid_after}\t{(objCertificate.not_valid_after - datetimeNow).days} days")