Category: Coding

QR Code Generation

I put together a quick program that creates a “fancy” QR code to a specified URL with the specified color and drops the desired “logo” file into the center of the code.

import qrcode
from PIL import Image

def generate_qr_code_with_custom_color_and_logo():
    url = input("Please enter the URL for which you want to generate a QR code: ")

    rgb_input = input("Please enter the RGB values for the QR code color (e.g. 0,0,0 for black): ")
    
    try:
        rgb_color = tuple(map(int, rgb_input.split(',')))
        if len(rgb_color) != 3 or not all(0 <= n <= 255 for n in rgb_color):
            raise ValueError("Invalid RGB color value.")
    except Exception:
        print("Error parsing RGB values. Please make sure to enter three integers separated by commas.")
        return

    qr = qrcode.QRCode(
        version=1,  # controls the size of the QR Code
        error_correction=qrcode.constants.ERROR_CORRECT_H,  # high error correction for image insertion
        box_size=10,
        border=4,
    )
    qr.add_data(url)
    qr.make(fit=True)

    # Generate the QR code with the specified RGB color
    img = qr.make_image(fill_color=rgb_color, back_color="white")

    # Load the logo image
    logo_image_path = input("Please enter the logo for the center of this QR code: ")

    try:
        logo = Image.open(logo_image_path)
    except FileNotFoundError:
        print(f"Logo image file '{logo_image_path}' not found. Proceeding without a logo.")
        img.save("qr_code_with_custom_color.png")
        print("QR code has been generated and saved as 'qr_code_with_custom_color.png'.")
        return

    # Resize the logo image to fit in the QR code
    img_width, img_height = img.size
    logo_size = int(img_width * 0.2)  # The logo will take up 20% of the QR code width
    logo = logo.resize((logo_size, logo_size), Image.ANTIALIAS)

    position = ((img_width - logo_size) // 2, (img_height - logo_size) // 2)

    img.paste(logo, position, mask=logo.convert("RGBA"))

    img.save("qr_code_with_custom_color_and_logo.png")

    print("QR code with a custom color and a logo image has been generated and saved as 'qr_code_with_custom_color_and_logo.png'.")

if __name__ == "__main__":
    generate_qr_code_with_custom_color_and_logo()

Voila!

JavaScript: Extracting Web Content You Cannot Copy

There are many times I need to copy “stuff” from a website that is structured in such a way that simply copy/pasting the table data is impossible. Screen prints work, but I usually want the table of data in Excel so I can add notations and such. In these cases, running JavaScript from the browser’s developers console lets you access the underlying text elements.

Right click on one of the text elements and select “Inspect”

Now copy the element’s XPath

Read the value — we don’t generally want just this one element … but the path down to the “tbody” tag looks like a reasonable place to find the values within the table.

/html/body/div[1]/div/div/div[2]/div[2]/div[2]/div/div[3]/div/div/div[3]/div/div/div/table/tbody/a[4]/td[2]/div/span[2]

Use JavaScript to grab all of the TD elements under the tbody:

// Define the XPath expression to select all <td> elements within the specific <tbody>
const xpathExpression = "/html/body/div[1]/div/div/div[2]/div[2]/div[2]/div/div[3]/div/div/div[3]/div/div/div/table/tbody//td";

// Use document.evaluate to get all matching <td> nodes
const nodesSnapshot = document.evaluate(xpathExpression, document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null);

// Log the number of nodes found (for debugging purposes)
console.log("Total <td> elements found:", nodesSnapshot.snapshotLength);

// Iterate over the nodes and log their text content
for (let i = 0; i < nodesSnapshot.snapshotLength; i++) {
    let node = nodesSnapshot.snapshotItem(i);
    if (node) {
        const textContent = node.textContent.trim();
        if (textContent) { // Only log non-empty content
            console.log(textContent);
        }
    }
}

Voila! I redacted some data below, but it’s just a list of values, one per line.

Python: Getting Active Directory Subnets

Like my script that pulls the AD site information – this lets me see what subnets are defined and which sites are assigned to those subnets. I was able to quickly confirm that the devices that had problems communicating with Active Directory don’t have a site defined. Way back in 2000, we created a “catch all” 10.0.0.0/8 subnet and assigned it to the user authentication site. New networks on a whole different addressing scheme don’t have a site assignment. It should still work, but the application in question has historically had issues with going the “Ok, list ’em all” route.

from ldap3 import Server, Connection, ALL, SUBTREE, Tls
import ssl
import getpass

# Attempt to import USERNAME and PASSWORD from config.py
try:
    from config import USERNAME, PASSWORD
except ImportError:
    USERNAME, PASSWORD = None, None

# Define constants
LDAP_SERVER = 'ad.example.com'
LDAP_PORT = 636

def get_subnets_and_sites(username, password):
    # Set up TLS configuration
    tls_configuration = Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1_2)

    # Connect to the LDAP server
    server = Server(LDAP_SERVER, port=LDAP_PORT, use_ssl=True, tls=tls_configuration, get_info=ALL)
    connection = Connection(server, user=username, password=password, authentication='SIMPLE', auto_bind=True)

    # Define the search base for subnets
    search_base = 'CN=Subnets,CN=Sites,CN=Configuration,DC=example,DC=com'  # Change this to match your domain's DN
    search_filter = '(objectClass=subnet)'  # Filter to find all subnet objects
    search_attributes = ['cn', 'siteObject']  # Retrieve the common name and site object references

    # Perform the search
    connection.search(search_base, search_filter, SUBTREE, attributes=search_attributes)

    # Extract and return subnets and their site assignments
    subnets_sites = []
    for entry in connection.entries:
        subnet_name = entry.cn.value
        site_dn = entry.siteObject.value if entry.siteObject else "No site assigned"
        subnets_sites.append((subnet_name, site_dn))

    return subnets_sites

def print_subnets_and_sites(subnets_sites):
    if subnets_sites:
        print("\nSubnets and their Site Assignments:")
        for subnet, site in subnets_sites:
            print(f"Subnet: {subnet}, Site: {site}")
    else:
        print("No subnets found in the domain.")

def main():
    # Prompt for username and password if not available in config.py
    username = USERNAME if USERNAME else input("Enter your LDAP username: ")
    password = PASSWORD if PASSWORD else getpass.getpass("Enter your LDAP password: ")

    subnets_sites = get_subnets_and_sites(username, password)
    print_subnets_and_sites(subnets_sites)

if __name__ == "__main__":
    main()

Python: Get Active Directory Sites

One down side of not administering the Active Directory domain anymore is that I don’t have the quick GUI tools that show you how “stuff” is set up. Luckily, the sites are all reflected in AD objects that can be read by authenticated users:

from ldap3 import Server, Connection, ALL, SIMPLE, SUBTREE, Tls
import ssl
import getpass

# Attempt to import USERNAME and PASSWORD from config.py
try:
    from config import USERNAME, PASSWORD
except ImportError:
    USERNAME, PASSWORD = None, None

# Define constants
LDAP_SERVER = 'ad.example.com'
LDAP_PORT = 636

def get_all_sites(username, password):
    # Set up TLS configuration
    tls_configuration = Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1_2)

    # Connect to the LDAP server
    server = Server(LDAP_SERVER, port=LDAP_PORT, use_ssl=True, tls=tls_configuration, get_info=ALL)
    connection = Connection(server, user=username, password=password, authentication='SIMPLE', auto_bind=True)

    # Define the search base for sites
    search_base = 'CN=Sites,CN=Configuration,DC=example,DC=com'  # Update to match your domain's DN structure
    search_filter = '(objectClass=site)'  # Filter to find all site objects
    search_attributes = ['cn']  # We only need the common name (cn) of the sites

    # Perform the search
    connection.search(search_base, search_filter, SUBTREE, attributes=search_attributes)

    # Extract and return site names
    site_names = [entry['cn'].value for entry in connection.entries]
    return site_names

def print_site_names(site_names):
    if site_names:
        print("\nAD Sites:")
        for site in site_names:
            print(f"- {site}")
    else:
        print("No sites found in the domain.")

def main():
    # Prompt for username and password if not available in config.py
    username = USERNAME if USERNAME else input("Enter your LDAP username: ")
    password = PASSWORD if PASSWORD else getpass.getpass("Enter your LDAP password: ")

    site_names = get_all_sites(username, password)
    print_site_names(site_names)

if __name__ == "__main__":
    main()

Kafka Consumer and Producer Example – Java

Producer

package com.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.InputStream;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

public class SimpleKafkaProducer {

    public static void main(String[] args) {
        if (args.length < 2) {
            System.err.println("Please provide Kafka server and port. Usage: SimpleKafkaProducer <kafka-server> <port>");
            System.exit(1);
        }

        String kafkaHost = args[0];
        String kafkaPort = args[1];
        String kafkaServer = kafkaHost + ":" + kafkaPort;
        String topicName = "LJRJavaTest";
        String truststorePassword = "truststore-password";

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Load truststore from resources
        try (InputStream truststoreStream = SimpleKafkaProducer.class.getResourceAsStream("/kafka.client.truststore.jks")) {
            if (truststoreStream == null) {
                throw new RuntimeException("Truststore not found in resources");
            }

            // Create a temporary file to hold the truststore
            java.nio.file.Path tempTruststore = java.nio.file.Files.createTempFile("kafka.client.truststore", ".jks");
            java.nio.file.Files.copy(truststoreStream, tempTruststore, java.nio.file.StandardCopyOption.REPLACE_EXISTING);

            props.put("security.protocol", "SSL");
            props.put("ssl.truststore.location", tempTruststore.toString());
            props.put("ssl.truststore.password", truststorePassword);

            KafkaProducer<String, String> producer = new KafkaProducer<>(props);

            // Define the date-time formatter
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd 'at' HH:mm:ss");

            try {
                for (int i = 0; i < 10; i++) {
                    String key = "Key-" + i;
                    
                    // Get the current timestamp
                    String timestamp = LocalDateTime.now().format(formatter);
                    
                    // Include the timestamp, Kafka host, and port in the message value
                    String value = String.format("Message-%d (Test from %s on server %s:%s)", i, timestamp, kafkaHost, kafkaPort);
                    
                    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
                    producer.send(record);
                    System.out.println("Sent message: (" + key + ", " + value + ")");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Consumer

package com.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.InputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleKafkaConsumer {

    public static void main(String[] args) {
        if (args.length < 2) {
            System.err.println("Please provide Kafka server and port. Usage: SimpleKafkaConsumer <kafka-server> <port>");
            System.exit(1);
        }

        String kafkaServer = args[0] + ":" + args[1];
        String topicName = "LJRJavaTest";
        String groupId = "test-consumer-group";
        String truststorePassword = "truststore-password";

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Load truststore from resources
        try (InputStream truststoreStream = SimpleKafkaConsumer.class.getResourceAsStream("/kafka.client.truststore.jks")) {
            if (truststoreStream == null) {
                throw new RuntimeException("Truststore not found in resources");
            }

            // Create a temporary file to hold the truststore
            java.nio.file.Path tempTruststore = java.nio.file.Files.createTempFile("kafka.client.truststore", ".jks");
            java.nio.file.Files.copy(truststoreStream, tempTruststore, java.nio.file.StandardCopyOption.REPLACE_EXISTING);

            props.put("security.protocol", "SSL");
            props.put("ssl.truststore.location", tempTruststore.toString());
            props.put("ssl.truststore.password", truststorePassword);

            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList(topicName));

            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("Consumed message: (key: %s, value: %s, offset: %d, partition: %d)%n",
                                record.key(), record.value(), record.offset(), record.partition());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Keystore for Trusts

The cert on my Kafka server is signed by a CA that has both a root and intermediary signing certificate. To trust the chain, I need to populate rootCA.crt and intermediateCA.crt files with the correct base64 encoded public key and import both files into a new trust store:

keytool -importcert -file rootCA.crt -keystore kafka.client.truststore.jks -alias rootCA -storepass truststore-password

keytool -importcert -file intermediateCA.crt -keystore kafka.client.truststore.jks -alias intermediateCA -storepass truststore-password

Then list certs in the trust store to verify the import was successful:

keytool -list -keystore kafka.client.truststore.jks -storepass truststore-password

And place the jks keystore with the CA certs in project:

C:.
│   pom.xml
│
├───src
│   └───main
│       ├───java
│       │   └───com
│       │       └───example
│       │               SimpleKafkaProducer.java
│       │
│       └───resources
│               kafka.client.truststore.jks

Build and Run

Create a pom.xml for the build

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>kafka-producer-example</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Kafka client dependency -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Compiler plugin to ensure Java 8 compatibility -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!-- Exec Maven Plugin to run the Java class -->
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <mainClass>com.example.SimpleKafkaProducer</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Build using `mvn clean package`

Run the producer using ` mvn exec:java -Dexec.mainClass=”com.example.SimpleKafkaProducer” -Dexec.args=”kafkahost.example.net 9095″`

Run the consumer using ` mvn exec:java -Dexec.mainClass=”com.example.SimpleKafkaConsumer” -Dexec.args=”kafkahost.example.net 9095″`

Watching Redis Records for Strange Changes

I write a lot of things down to save myself time the next time I need to do the same sort of thing — and publish this to the Internet in case I can save someone else time too. But this one is so specific, I’m not sure it’s an “ever going to encounter this again” sort of thing. Just in case, though — I have device data being stored in redis — because the device doesn’t know its throughput values, you need the last time and last value paired with the current device metrics to calculate throughput. OK. But, sporadically, the cached data is updated insomuch as a new record is posted with a new timestamp. But the actual values, other than timestamp, remain unchanged. With millions of interfaces, it’s challenging to identify these situations by spot-checking the visualizations. Instead, I need to monitor redis and identify when the tstamp is updated but no other values change.

import redis
import time
import re
import json
import os

# Configuration
redis_host = 'redishost.example.net'
redis_port = 6379
redis_password = 'P@5sw0rDG03sH3r3'  # Replace with your Redis password
pattern = re.compile(r'INTERFACE_RAW_STATS_hostname\d\d\d\d_\d+_\d+')
output_file = 'changed_records.json'

# Connect to Redis
client = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_password, decode_responses=True)

# Dictionary to track records
records = {}
matching_keys = []

def get_matching_keys():
    """
    Retrieve keys from Redis matching the specified pattern.

    Returns:
        list: A list of keys that match the pattern.
    """
    all_keys = client.keys()
    matching_keys = [key for key in all_keys if pattern.match(key)]
    return matching_keys

def process_keys():
    """
    Process Redis keys to track changes in data.

    Retrieves keys matching the pattern, gets their data using HGETALL,
    and tracks changes. If only the 'tstamp' field has changed and all
    other fields remain the same, the record is written to a file.
    """
    global records
    i = 0

    for key in matching_keys:
        i += 1
        data = client.hgetall(key)
        if i == 1 or i % 1000 == 0:
            print(f"Processed {i} records")

        if not data:
            continue

        collector_name = data.get('collectorName')
        node_id = data.get('nodeId')
        if_index = data.get('ifIndex')
        tstamp = data.get('tstamp')

        if not collector_name or not node_id or not if_index or not tstamp:
            continue

        unique_key = f"{collector_name}_{node_id}_{if_index}"

        if unique_key in records:
            previous_data = records[unique_key]
            if previous_data['tstamp'] != tstamp:
                # Check if all other values are the same
                if all(data[k] == previous_data[k] for k in data if k != 'tstamp'):
                    print(f"***** Record changed: {json.dumps(data, indent=2)} *****")
                    write_to_file(data)
            records[unique_key] = data  # Update the record
        else:
            records[unique_key] = data

def write_to_file(data):
    """
    Write the given data to a file.

    Args:
        data (dict): The data to write to the file.
    """
    with open(output_file, 'a') as file:
        file.write(json.dumps(data) + '\n')

if __name__ == "__main__":
    # Ensure the output file is empty at the start
    if os.path.exists(output_file):
        os.remove(output_file)

    # Retrieve the list of matching keys once
    matching_keys = get_matching_keys()

    while True:
        process_keys()
        print("Sleeping ... ")
        time.sleep(300)  # Sleep for 5 minutes

Kafka Streams, Consumer Groups, and Stickiness

The Java application I recently inherited had a lot of … quirks. One of the strangest was that it calculated throughput statistics based on ‘start’ values in a cache that was only refreshed every four hours. So at a minute past the data refresh, the throughput is averaged out over that minute. At three hours and fifty nine minutes past the data refresh, the throughput is averaged out over three hours and fifty nine minutes. In the process of correcting this (reading directly from the cached data rather than using an in-memory copy of the cached data), I noticed that the running application paused a lot as the Kafka group was re-balanced.

Which is especially odd because I’ve got a stable number of clients in each consumer group. But pods restart occasionally, and there was nothing done to attempt to stabilize partition assignment.

Which was odd because Kafka has had mechanisms to reduce re-balancing — StickyAssignor added in 0.11

        // Set the partition assignment strategy to StickyAssignor
        config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");

And groupInstanceId in 2.3.0

        // Set the group instance ID
        String groupInstanceId = UUID.randomUUID().toString();
        config.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);

Now, I’m certain that a UUID isn’t the best way to go about crafting your group instance ID name … but it produces a “name” that isn’t likely to be duplicated. Since deploying this change, I went from seeing three or four re-balance operations an hour to zero.

Kafka Streams Group Members and Topic Partitions

I encountered an oddity in a Java application that uses Kafka Streams to implement a scalable application that reads data from Kafka topics. Data is broken out into multiple topics, and there are Kubernetes pods (“workers”) reading from each topic. The pods have different numbers of replicas defined. But it appears that no one ever aligned the topic partitions with the number of workers being deployed.

Kafka Streams assigns “work” to group members by partition. If you have ten partitions and five workers, each worker processes the data from two partitions. However, when the numbers don’t line up … some workers get more partitions than others. Were you to have eleven partitions and five workers, four workers would get data from two partitions and the fifth worker gets data from three.

Worse – in some cases we have more workers than partitions. Those extra workers are using up some resources, but they’re not actually processing data.

It’s a quick fix — partitions can be added mostly invisibly (the consumer group will be re-balanced, write operations won’t really change. New data just starts getting placed in the new partitions), so I increased our partition counts to be 2x the number of workers. This allows us to add a few workers to a topic if it gets backlogged, but the configuration evenly distributes the work across all of the normally running pods.

Migrating Redis Data

So, I know that Redis should be a data cache that can be repopulated … but we use it to calculate deltas (what was the value last time) … so repopulating the information makes the first half hour or so of calculations rather slow as the application tries redis, gets nothing, and fails back to a database query. Then we get a backlog of data to churn through, and it would just be better if the Redis cache hadn’t gone away in the first place. And if you own both servers and the files are in the same format, you could just copy the cache db from the old server to the new one. But … when you cannot just copy the file and you would really prefer the data not disappear and need to be repopulated … there’s a script for that! This python script reads all of the data from the “old” server and populates it into the “new” server.

import redis

def migrate_data(redis_source_host, redis_source_port, redis_source_db, redis_source_password,
                 redis_dest_host, redis_dest_port, redis_dest_db, redis_dest_password):
    # Connect to the source Redis server
    source_client = redis.StrictRedis(host=redis_source_host, port=redis_source_port, db=redis_source_db, password=redis_source_password)

    # Connect to the destination Redis server
    dest_client = redis.StrictRedis(host=redis_dest_host, port=redis_dest_port, db=redis_dest_db, password=redis_dest_password)

    # Fetch all keys from the source Redis
    keys = source_client.keys('*')

    for key in keys:
        # Get the type of the key
        key_type = source_client.type(key).decode('utf-8')

        if key_type == 'string':
            value = source_client.get(key)
            print("Setting string value in dest")
            dest_client.set(key, value)
        elif key_type == 'list':
            values = source_client.lrange(key, 0, -1)
            print("Setting list value in dest")
            dest_client.delete(key)  # Ensure the list is empty before pushing
            for value in values:
                dest_client.rpush(key, value)
        elif key_type == 'set':
            values = source_client.smembers(key)
            print("Setting set value in dest")
            dest_client.delete(key)  # Ensure the set is empty before pushing
            for value in values:
                dest_client.sadd(key, value)
        elif key_type == 'zset':
            values = source_client.zrange(key, 0, -1, withscores=True)
            print("Setting zset value in dest")
            dest_client.delete(key)  # Ensure the zset is empty before pushing
            for value, score in values:
                dest_client.zadd(key, {value: score})
        elif key_type == 'hash':
            values = source_client.hgetall(key)
            print("Setting hash value in dest")
            dest_client.delete(key)  # Ensure the hash is empty before pushing
            dest_client.hmset(key, values)

    print("Data migration completed.")

if __name__ == "__main__":
    # Source Redis server details
    redis_source_host = 'oldredis.example.com'
    redis_source_port = 6379
    redis_source_db = 0
    redis_source_password = 'SourceRedisPassword'

    # Destination Redis server details
    redis_dest_host = 'newredis.example.com'
    redis_dest_port = 6379
    redis_dest_db = 0
    redis_dest_password = 'DestRedisPassword'

    # Migrate data
    migrate_data(redis_source_host, redis_source_port, redis_source_db, redis_source_password,
                 redis_dest_host, redis_dest_port, redis_dest_db, redis_dest_password)

SNMP Simulator

Background

As communication between development and production platforms is limited for security and data integrity reasons, this creates a challenge when testing changes in development: we cannot access “real world” data with which to perform tests. Having a limited set of data in development means testing may not illuminate issues that occur at high volume or on a large scale.

Solution

While limiting communication between the prod and dev systems is reasonable, it would be beneficial to be able to replay production-like data within our development systems for testing purposes. While it is not cost effective to buy large network devices with thousands of interfaces for testing, the Python module snmpsim provides “canned responses” that simulate real devise on the production network. For simplicity, I have a bash script that launches the SNMP responder.

server03:snmpsim # cat ../_playback.sh

#!/bin/bash

snmpsimd.py –data-dir=/opt/snmp/snmpsim/data –cache-dir=/opt/snmp/snmpsim/cache –agent-udpv4-endpoint=0.0.0.0:161 –process-user=ljrsnmp –process-group=ljrsnmp

This responder will replay data stored in the directory /opt/snmp/snmpsim/data – any file ending in snmprec will be included in the response, and the filename prior to .snmprec is the community string to access the response data. E.G. public.snmprec is the data for the public community string

The response files are in the format OID|TAG|VALUE where OID is the OID number of the SNMP object, TAG is an integer defined at https://pypi.org/project/snmpsim/0.2.3/

Valid tag values and their corresponding ASN.1/SNMP types are:

ASN.1/SNMP TypeTag Value
Integer322
Octet String4
Null5
Object Identifier6
IP Address64
Counter3265
Gauge3266
Time Ticks67
Opaque68
Counter6570

And the value is the data to be returned for the OID object. As an example:

1.3.6.1.2.1.1.3.0|67|2293092270

1.3.6.1.2.1.1.3.0 is the sysUpTime, the data type is TimeTicks, and the system up time is 2293092270 hundredths of a second. Or 6375 hours, 20 minutes, and 24 seconds.

Items within the response file need to be listed in ascending order.

Generating Response Data

There are two methods for creating the data provided to an SNMP GET request. A response file can be created manually, populated with OID objects that should be included in the response as well as sample data. Alternatively, a network trace can be gathered from the production network and parsed to create the response file.

Manually Generated Response File

While you can literally type data into a response file, but it is far easier to use a script to generate sample data. /opt/snmp/snmpsim/_genData.py is an example of creating a response file for about 1,000 interfaces

from datetime import datetime
import random

iRangeMax = 1000

dictTags = {'Integer': '2', 'OctetString': '4', 'NULL': '5', 'ObjectIdentifier': '6', 'IPAddress': '64', 'Counter32': '65', 'Gauge32': '66', 'TimeTicks': '67', 'Opaque': '68','Counter64': '70'}  # Valid tags per https://pypi.org/project/snmpsim/0.2.3/

today = datetime.now()

iftable_snmp_objects = [
    ('1.3.6.1.2.1.2.2.1.1', 'Integer', lambda i: i),  # ifIndex
    ('1.3.6.1.2.1.2.2.1.2', 'OctetString', lambda i: f"SampleInterface{i}"),  # ifDescr
    ('1.3.6.1.2.1.2.2.1.3', 'Integer', lambda i: 6),  # ifType
    ('1.3.6.1.2.1.2.2.1.4', 'Integer', lambda i: 1500),  # ifMtu
    ('1.3.6.1.2.1.2.2.1.5', 'Gauge32', lambda i: 100000000),  # ifSpeed
    ('1.3.6.1.2.1.2.2.1.6', 'OctetString', lambda i: f"00:00:00:00:{format(i, '02x')[:2]}:{format(i, '02x')[-2:]}"),  # ifPhysAddress
    ('1.3.6.1.2.1.2.2.1.7', 'Integer', lambda i: 1),  # ifAdminStatus
    ('1.3.6.1.2.1.2.2.1.8', 'Integer', lambda i: 1),  # ifOperStatus
    ('1.3.6.1.2.1.2.2.1.9', 'TimeTicks', lambda i: int((datetime.now() - datetime(2024, random.randint(1, today.month), random.randint(1, today.day))).total_seconds()) * 100),  # ifLastChange
    ('1.3.6.1.2.1.2.2.1.10', 'Counter32', lambda i: random.randint(3, i*50000)),  # ifInOctets
    ('1.3.6.1.2.1.2.2.1.11', 'Counter32', lambda i: random.randint(3, i*50000)),  # ifInUcastPkts
    ('1.3.6.1.2.1.2.2.1.12', 'Counter32', lambda i: random.randint(0, 80)),  # ifInNUcastPkts
    ('1.3.6.1.2.1.2.2.1.13', 'Counter32', lambda i: random.randint(0, 80)),  # ifInDiscards
    ('1.3.6.1.2.1.2.2.1.14', 'Counter32', lambda i: random.randint(0, 80)),  # ifInErrors
    ('1.3.6.1.2.1.2.2.1.15', 'Counter32', lambda i: random.randint(3, i*50000)),  # ifInUnknownProtos
    ('1.3.6.1.2.1.2.2.1.16', 'Counter32', lambda i: random.randint(3, i*50000)),  # ifOutOctets
    ('1.3.6.1.2.1.2.2.1.17', 'Counter32', lambda i: random.randint(3, i*50000)),  # ifOutUcastPkts
    ('1.3.6.1.2.1.2.2.1.18', 'Counter32', lambda i: random.randint(3, i*50000)),  # ifOutNUcastPkts
    ('1.3.6.1.2.1.2.2.1.19', 'Counter32', lambda i: random.randint(0, 80)),  # ifOutDiscards
    ('1.3.6.1.2.1.2.2.1.20', 'Counter32', lambda i: random.randint(0, 80)),  # ifOutErrors
]

ifxtable_snmp_objects = [
    ('1.3.6.1.2.1.31.1.1.1.1', 'OctetString', lambda i: f"SampleInterface{i}"),  # ifName
    ('1.3.6.1.2.1.31.1.1.1.15', 'Gauge32', lambda i: "100"),  # ifHighSpeed
    ('1.3.6.1.2.1.31.1.1.1.6', 'Counter32', lambda i: random.randint(3, i*50000)),  # ifHCInOctets
    ('1.3.6.1.2.1.31.1.1.1.10', 'Counter32', lambda i: random.randint(3, i*60000)),  # ifHCOutOctets
]

# Print IFTable data
for oid_base, tag_type, value_func in iftable_snmp_objects:
    for i in range(1, iRangeMax+1):
        value = value_func(i)
        print(f"{oid_base}.{i}|{dictTags.get(tag_type)}|{value}")

# IP-MIB objects for managing IP addressing
# ipAdEntAddr: The IP address to which this entry's addressing information pertains
print(f"1.3.6.1.2.1.4.20.1.1|{dictTags.get('IPAddress')}|10.5.5.5")

# ipAdEntIfIndex: The index value which uniquely identifies the interface to which this entry is applicable
print(f"1.3.6.1.2.1.4.20.1.2|{dictTags.get('OctetString')}|1")

# ipAdEntNetMask: The subnet mask associated with the IP address of this entry
print(f"1.3.6.1.2.1.4.20.1.3|{dictTags.get('OctetString')}|255.255.255.0")

# hrSWRunIndex: An index uniquely identifying a row in the hrSWRun table
print(f"1.3.6.1.2.1.25.4.2.1.1.1|{dictTags.get('Integer')}|1")

# hrSWRunName: The name of the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.2.1|{dictTags.get('OctetString')}|LJRSNMPAgent")
# hrSWRunID: The product ID of the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.3.1|{dictTags.get('ObjectIdentifier')}|1.3.6.1.4.1.25709.55")

# hrSWRunPath: The path of the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.4.1|{dictTags.get('OctetString')}|/opt/snmp/snmpsim/_agent.sh")

# hrSWRunParameters: Operational parameters for the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.5.1|{dictTags.get('OctetString')}|-L")

# hrSWRunType: The type of software running (e.g., operating system, application)
print(f"1.3.6.1.2.1.25.4.2.1.6.1|{dictTags.get('Integer')}|4")

# hrSWRunStatus: The status of this software (running, runnable, notRunnable, invalid)
print(f"1.3.6.1.2.1.25.4.2.1.7.1|{dictTags.get('Integer')}|1")


for oid_base, tag_type, value_func in ifxtable_snmp_objects:
    for i in range(1, iRangeMax+1):
        value = value_func(i)
        print(f"{oid_base}.{i}|{dictTags.get(tag_type)}|{value}")

Network Capture

Even better, parse a network capture file.

Capture Data

On the server that gathers SNMP data from the host we want to simulate, use a network capture utility to gather the SNMP communication between the server and the desired device.

tcpdump -i <interface> -w <filename>.pcap

E.G. to record the communication with 10.5.171.114

tcpdump ‘host 10.5.171.114 and (tcp port 161 or tcp port 162 or udp port 161 or udp port 162)’ -w /tmp/ar.pcap

Note – there Is no benefit to capturing more than one cycle of SNMP responses. If data is captured immediately, that means the devices were in the middle of a cycle. End the capture and start a new one shortly. There should be no packets captured for a bit, then packets during the SNMP polling cycle, and then another pause until the next cycle.

Parsing The Capture Data Into A Response File

The following script parses the capture file into an snmprec response file – note, I needed to use 2.6.0rc1 of scapy to parse SNMP data. The 2.5.0 release version failed to parse most of the packets which I believe is related to https://github.com/secdev/scapy/issues/3900

from scapy.all import rdpcap, SNMP
from scapy.layers.inet import UDP
from scapy.packet import Raw
from scapy.layers.snmp import SNMP, SNMPvarbind, SNMPresponse, SNMPbulk
from scapy.all import conf, load_layer
from scapy.utils import hexdump

from scapy.all import UDP, load_contrib
from scapy.packet import bind_layers

import os
from datetime import datetime
import argparse

# Ensure Scapy's SNMP contributions are loaded
load_contrib("snmp")

def sort_by_oid(listSNMPResponses):
    """
    Sorts a list of "OID|TAG|Value" strings by the OID numerically and hierarchically.

    :param listSNMPResponses: A list of "OID|TAG|Value" strings.
    :return: A list of "OID|TAG|Value" strings sorted by OID.
    """
    # Split each element into a tuple of (OID list, original string), converting OID to integers for proper comparison
    oid_tuples = [(list(map(int, element.split('|')[0].split('.'))), element) for element in listSNMPResponses]

    # Sort the list of tuples by the OID part (the list of integers)
    sorted_oid_tuples = sorted(oid_tuples, key=lambda x: x[0])

    # Extract the original strings from the sorted list of tuples
    sorted_listSNMPResponses = [element[1] for element in sorted_oid_tuples]

    return sorted_listSNMPResponses

parser = argparse.ArgumentParser(description='This script converts an SNMP packet capture into a snmpsim response file')
parser.add_argument('--filename', '-f', help='The capture file to process', required=True)

args = parser.parse_args()
strFullCaptureFilePath = args.filename
strCaptureFilePath, strCaptureFileName = os.path.split(strFullCaptureFilePath)


# Valid tags per https://pypi.org/project/snmpsim/0.2.3/
dictTags = {'ASN1_INTEGER': '2', 'ASN1_STRING': '4', 'ASN1_NULL': '5', 'ASN1_OID': '6', 'ASN1_IPADDRESS': '64', 'ASN1_COUNTER32': '65', 'ASN1_GAUGE32': '66', 'ASN1_TIME_TICKS': '67', 'Opaque': '68','ASN1_COUNTER64': '70'}

listSNMPResponses = []
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.1.1|2|1")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.2.1|4|LJRSNMPAgent")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.3.1|6|1.3.6.1.4.1.25709.55")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.4.1|4|/opt/snmp/snmpsim/_agent.sh")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.5.1|4|-L")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.6.1|2|4")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.7.1|2|1")
i = 0

if True:
    packets = rdpcap(strFullCaptureFilePath)
    # Packets are zero indexed, so packet 1 in script is packet 2 in Wireshark GUI
    #for i in range(0,4):
    for packet in packets:
        print(f"Working on packet {i}")
        i = i + 1
        if SNMP in packet:
            snmp_layer = packet[SNMP]
            if isinstance(packet[SNMP].PDU,SNMPresponse):
                snmp_response = snmp_layer.getfield_and_val('PDU')[1]
                if hasattr(snmp_response, 'varbindlist') and snmp_response.varbindlist is not None:
                    for varbind in snmp_response.varbindlist:
                        strOID = varbind.oid.val if hasattr(varbind.oid, 'val') else str(varbind.oid)
                        strValue = varbind.value.val if hasattr(varbind.value, 'val') else str(varbind.value)
                        strType = type(varbind.value).__name__
                        if dictTags.get(strType):
                            iType = dictTags.get(strType)
                        else:
                            iType = strType

                        if isinstance(strValue, bytes):
                            print(f"Decoding {strValue}")
                            strValue = strValue.decode('utf-8',errors='ignore')

                        print(f"OID: {strOID}, Type: {strType}, Tag: {iType}, Value: {strValue}")
                        listSNMPResponses.append(f"{strOID}|{iType}|{strValue}")
            else:
                print(f"Not a response -- type is {type(packet[SNMP].PDU)}")
        elif Raw in packet:
            print(f"I have a raw packet at {i}")
        else:
            print(dir(packet))
            print(f"No SNMP or Raw in {i}: {packet}")

# Sort by OID numbers
listSortedSNMPResponses = sort_by_oid(listSNMPResponses)
f = open(f'/opt/snmp/snmpsim/data/{datetime.now().strftime("%Y%m%d")}-{strCaptureFileName.rsplit(".", 1)[0]}.deactivated', "w")
for strSNMPResponse in listSortedSNMPResponses:
    print(strSNMPResponse)
    f.write(strSNMPResponse)
    f.write("\n")
f.close()

This will create an snmpsim response file at /opt/snmp/snmpsim/data named as the capture file prefixed with the current year, month, and date. I.E. My ar.cap file results are /opt/snmp/snmpsim/data/20240705-ar.deactivated – you can then copy the file to whatever community string you want – cp 20240705-ar.deactivated CommunityString.snmprec