I encountered an oddity in a Java application that uses Kafka Streams to implement a scalable application that reads data from Kafka topics. Data is broken out into multiple topics, and there are Kubernetes pods (“workers”) reading from each topic. The pods have different numbers of replicas defined. But it appears that no one ever aligned the topic partitions with the number of workers being deployed.
Kafka Streams assigns “work” to group members by partition. If you have ten partitions and five workers, each worker processes the data from two partitions. However, when the numbers don’t line up … some workers get more partitions than others. Were you to have eleven partitions and five workers, four workers would get data from two partitions and the fifth worker gets data from three.
Worse – in some cases we have more workers than partitions. Those extra workers are using up some resources, but they’re not actually processing data.
It’s a quick fix — partitions can be added mostly invisibly (the consumer group will be re-balanced, write operations won’t really change. New data just starts getting placed in the new partitions), so I increased our partition counts to be 2x the number of workers. This allows us to add a few workers to a topic if it gets backlogged, but the configuration evenly distributes the work across all of the normally running pods.
I’ve got replicated PostgreSQL database pairs that each have some 50TB of data. The server operating systems need to be upgraded, but there is a constraint: no in-place upgrades. I don’t get to veto that constraint (i.e. the fact that we could just cross our fingers and upgrade a replica … and, if it fails, built new and pull the data again doesn’t matter). Unfortunately, trying to add a second replica delays the existing replication. Since all write operations to to the RW server and reads to to the read-only replica … having the read-only copy a day or two out of sync whilst this secondary replica comes online is a non-starter.
Fortunately, you can cascade replication — seed the new replica from the current read-only replica. Create a new replication slot — here new-pg-ro-replica-pgdata. You need to verify the new server is in the pg_hba.conf file to authenticate with the replication account.
In my dev OpenSearch 2.x environment, I get a strange error indicating that the application cannot read the cacerts file — except the file is world readable, selinux is disabled, and there’s nothing actually preventing access from the OS level.
[2024-09-17T12:48:52,666][ERROR][c.a.d.a.h.j.AbstractHTTPJwtAuthenticator] [linux1569.mgmt.windstream.net] Error creating JWT authenticator. JWT authentication will not work
com.amazon.dlic.util.SettingsBasedSSLConfigurator$SSLConfigException: Error loading trust store from /opt/elk/opensearch/jdk/lib/security/cacerts
at com.amazon.dlic.util.SettingsBasedSSLConfigurator.initFromKeyStore(SettingsBasedSSLConfigurator.java:338) ~[opensearch-security-2.15.0.0.jar:2.15.0.0]
at com.amazon.dlic.util.SettingsBasedSSLConfigurator.configureWithSettings(SettingsBasedSSLConfigurator.java:196) ~[opensearch-security-2.15.0.0.jar:2.15.0.0]
at com.amazon.dlic.util.SettingsBasedSSLConfigurator.buildSSLContext(SettingsBasedSSLConfigurator.java:117) ~[opensearch-security-2.15.0.0.jar:2.15.0.0]
at com.amazon.dlic.util.SettingsBasedSSLConfigurator.buildSSLConfig(SettingsBasedSSLConfigurator.java:131) ~[opensearch-security-2.15.0.0.jar:2.15.0.0]
at com.amazon.dlic.auth.http.jwt.keybyoidc.HTTPJwtKeyByOpenIdConnectAuthenticator.getSSLConfig(HTTPJwtKeyByOpenIdConnectAuthenticator.java:65) ~[opensearch-security-2.15.0.0.jar:2.15.0.0]
at com.amazon.dlic.auth.http.jwt.keybyoidc.HTTPJwtKeyByOpenIdConnectAuthenticator.initKeyProvider(HTTPJwtKeyByOpenIdConnectAuthenticator.java:47) ~[opensearch-security-2.15.0.0.jar:2.15.0.0]
at com.amazon.dlic.auth.http.jwt.AbstractHTTPJwtAuthenticator.<init>(AbstractHTTPJwtAuthenticator.java:89) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at com.amazon.dlic.auth.http.jwt.keybyoidc.HTTPJwtKeyByOpenIdConnectAuthenticator.<init>(HTTPJwtKeyByOpenIdConnectAuthenticator.java:26) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62) ~[?:?]
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502) ~[?:?]
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486) ~[?:?]
at org.opensearch.security.support.ReflectionHelper.instantiateAAA(ReflectionHelper.java:62) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at org.opensearch.security.securityconf.DynamicConfigModelV7.lambda$newInstance$1(DynamicConfigModelV7.java:432) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at java.base/java.security.AccessController.doPrivileged(AccessController.java:319) [?:?]
at org.opensearch.security.securityconf.DynamicConfigModelV7.newInstance(DynamicConfigModelV7.java:430) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at org.opensearch.security.securityconf.DynamicConfigModelV7.buildAAA(DynamicConfigModelV7.java:329) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at org.opensearch.security.securityconf.DynamicConfigModelV7.<init>(DynamicConfigModelV7.java:102) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at org.opensearch.security.securityconf.DynamicConfigFactory.onChange(DynamicConfigFactory.java:288) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at org.opensearch.security.configuration.ConfigurationRepository.notifyAboutChanges(ConfigurationRepository.java:570) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at org.opensearch.security.configuration.ConfigurationRepository.notifyConfigurationListeners(ConfigurationRepository.java:559) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at org.opensearch.security.configuration.ConfigurationRepository.reloadConfiguration0(ConfigurationRepository.java:554) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at org.opensearch.security.configuration.ConfigurationRepository.loadConfigurationWithLock(ConfigurationRepository.java:538) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at org.opensearch.security.configuration.ConfigurationRepository.reloadConfiguration(ConfigurationRepository.java:531) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at org.opensearch.security.configuration.ConfigurationRepository.initalizeClusterConfiguration(ConfigurationRepository.java:284) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at org.opensearch.security.configuration.ConfigurationRepository.lambda$initOnNodeStart$10(ConfigurationRepository.java:439) [opensearch-security-2.15.0.0.jar:2.15.0.0]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: java.security.AccessControlException: access denied ("java.io.FilePermission" "/opt/elk/opensearch/jdk/lib/security/cacerts" "read")
at java.base/java.security.AccessControlContext.checkPermission(AccessControlContext.java:488) ~[?:?]
at java.base/java.security.AccessController.checkPermission(AccessController.java:1071) ~[?:?]
at java.base/java.lang.SecurityManager.checkPermission(SecurityManager.java:411) ~[?:?]
at java.base/java.lang.SecurityManager.checkRead(SecurityManager.java:742) ~[?:?]
at java.base/sun.nio.fs.UnixPath.checkRead(UnixPath.java:789) ~[?:?]
at java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:49) ~[?:?]
at java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:171) ~[?:?]
at java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99) ~[?:?]
at java.base/java.nio.file.spi.FileSystemProvider.readAttributesIfExists(FileSystemProvider.java:1270) ~[?:?]
at java.base/sun.nio.fs.UnixFileSystemProvider.readAttributesIfExists(UnixFileSystemProvider.java:191) ~[?:?]
at java.base/java.nio.file.Files.isDirectory(Files.java:2319) ~[?:?]
at org.opensearch.security.support.PemKeyReader.checkPath(PemKeyReader.java:214) ~[opensearch-security-2.15.0.0.jar:2.15.0.0]
at org.opensearch.security.support.PemKeyReader.resolve(PemKeyReader.java:290) ~[opensearch-security-2.15.0.0.jar:2.15.0.0]
at org.opensearch.security.support.PemKeyReader.resolve(PemKeyReader.java:276) ~[opensearch-security-2.15.0.0.jar:2.15.0.0]
at com.amazon.dlic.util.SettingsBasedSSLConfigurator.initFromKeyStore(SettingsBasedSSLConfigurator.java:327) ~[opensearch-security-2.15.0.0.jar:2.15.0.0]
... 25 more
Looks like Java has its own security mechanism — the java.policy needed to be updated to allow read access to cacerts (why!?!?!?)
vi /opt/elk/opensearch/jdk/conf/security/java.policy
# Add this grant:
permission java.io.FilePermission "/opt/elk/opensearch/jdk/lib/security/cacerts", "read";
In testing out various ways to achieve disk compression on our PostgreSQL servers, I ended up with a server build with a version of ZFS newer that the package distribution. Which means I needed to recreate the pool to use an older version of ZFS that would be updated as part of the routine patching. Beyond backing up and restoring the data …
# Get rid of existing pool
zpool export pgpool zpool destroy pgpool zpool list # this still shows a pool on sdb
# Clear the label
zpool labelclear /dev/sdb
# Didn’t work, so blow away everything on sdb
dd if=/dev/zero of=/dev/sdb bs=1M count=10 wipefs -a /dev/sdb
I inherited a Java application that is actually five applications — and the build pipeline had a lot of repetition. Tell maven to use this POM file, now use that one, and now the other one. It wasn’t great, but it got even more cumbersome when I needed to split the production and development builds to use a different pool (network rule: prod and dev servers may not communicate … so the dev agent talks to the dev image repo which is used by the dev deployment. The prod agent talks to the prod image repo which is used by the prod deployment). Instead of having five “hey, maven, do this build” blocks, I now have ten.
So I created a template for the build step — jdk-path and maven-path are pipeline variables. The rest is the Maven build task with parameters to supply the step display name, pom file to use, and environment flag.
I wanted a quick way to verify that Docker images have actually been pushed to the registry … I’m using Distribution, and only wanted to report on images that start with sample (because the repository is shared & I don’t want to read through the very long list of other people’s images)
#!/bin/bash
registry="registryhost.example.net:5443"
authHeader="Authorization: Basic AUTHSTRINGHERE"
# List all repositories
repositories=$(curl -s -H "$authHeader" https://$registry/v2/_catalog | jq -r '.repositories[]')
for repo in $repositories; do
# Check if the repository name starts with "npm"
if [[ $repo == sample* ]]; then
# List all tags for the repository
tags=$(curl -s -H "$authHeader" https://$registry/v2/$repo/tags/list | jq -r '.tags[]')
for tag in $tags; do
# Get the manifest for the tag
manifest=$(curl -s -H "$authHeader" -H "Accept: application/vnd.docker.distribution.manifest.v2+json" https://$registry/v2/$repo/manifests/$tag)
# Extract the digest for the config
configDigest=$(echo $manifest | jq -r '.config.digest')
# Get the config blob
configBlob=$(curl -s -H "$authHeader" https://$registry/v2/$repo/blobs/$configDigest)
# Extract the last modified date from the config history
lastModifiedDate=$(echo $configBlob | jq -r '[.history[].created] | max')
echo -e "$repo\t$tag\t$lastModifiedDate"
done
fi
done
I have no idea how exactly I managed this — but I was renewing certificates on a group of servers and had one that would not work. It’s a Java app, and it just threw a generic handshake error. Even adding debugging didn’t add any useful information. It just didn’t work. Turns out my pubilc key and private key files didn’t go together. I didn’t bother figuring out which one I got wrong — I just downloaded the zip file from our cert provider again.
Using openssl to check the modulus of the cert and key — by getting an md5 checksum of the value, it’s a little easier to compare. This public private key pair go together — they’ve got the same modulus. My original files? Not so much — two different values!
So, I know that Redis should be a data cache that can be repopulated … but we use it to calculate deltas (what was the value last time) … so repopulating the information makes the first half hour or so of calculations rather slow as the application tries redis, gets nothing, and fails back to a database query. Then we get a backlog of data to churn through, and it would just be better if the Redis cache hadn’t gone away in the first place. And if you own both servers and the files are in the same format, you could just copy the cache db from the old server to the new one. But … when you cannot just copy the file and you would really prefer the data not disappear and need to be repopulated … there’s a script for that! This python script reads all of the data from the “old” server and populates it into the “new” server.
import redis
def migrate_data(redis_source_host, redis_source_port, redis_source_db, redis_source_password,
redis_dest_host, redis_dest_port, redis_dest_db, redis_dest_password):
# Connect to the source Redis server
source_client = redis.StrictRedis(host=redis_source_host, port=redis_source_port, db=redis_source_db, password=redis_source_password)
# Connect to the destination Redis server
dest_client = redis.StrictRedis(host=redis_dest_host, port=redis_dest_port, db=redis_dest_db, password=redis_dest_password)
# Fetch all keys from the source Redis
keys = source_client.keys('*')
for key in keys:
# Get the type of the key
key_type = source_client.type(key).decode('utf-8')
if key_type == 'string':
value = source_client.get(key)
print("Setting string value in dest")
dest_client.set(key, value)
elif key_type == 'list':
values = source_client.lrange(key, 0, -1)
print("Setting list value in dest")
dest_client.delete(key) # Ensure the list is empty before pushing
for value in values:
dest_client.rpush(key, value)
elif key_type == 'set':
values = source_client.smembers(key)
print("Setting set value in dest")
dest_client.delete(key) # Ensure the set is empty before pushing
for value in values:
dest_client.sadd(key, value)
elif key_type == 'zset':
values = source_client.zrange(key, 0, -1, withscores=True)
print("Setting zset value in dest")
dest_client.delete(key) # Ensure the zset is empty before pushing
for value, score in values:
dest_client.zadd(key, {value: score})
elif key_type == 'hash':
values = source_client.hgetall(key)
print("Setting hash value in dest")
dest_client.delete(key) # Ensure the hash is empty before pushing
dest_client.hmset(key, values)
print("Data migration completed.")
if __name__ == "__main__":
# Source Redis server details
redis_source_host = 'oldredis.example.com'
redis_source_port = 6379
redis_source_db = 0
redis_source_password = 'SourceRedisPassword'
# Destination Redis server details
redis_dest_host = 'newredis.example.com'
redis_dest_port = 6379
redis_dest_db = 0
redis_dest_password = 'DestRedisPassword'
# Migrate data
migrate_data(redis_source_host, redis_source_port, redis_source_db, redis_source_password,
redis_dest_host, redis_dest_port, redis_dest_db, redis_dest_password)
A quick query to get the max and min timestamp values of an index:
#Find the date range of records within an index
curl -X GET "https://opensearch.example.com:9200/INDEX_NAME/_search" -H 'Content-Type: application/json' -d'
{
"size": 0,
"aggs": {
"oldest_timestamp": {
"min": {
"field": "@timestamp"
}
},
"newest_timestamp": {
"max": {
"field": "@timestamp"
}
}
}
}'
As communication between development and production platforms is limited for security and data integrity reasons, this creates a challenge when testing changes in development: we cannot access “real world” data with which to perform tests. Having a limited set of data in development means testing may not illuminate issues that occur at high volume or on a large scale.
Solution
While limiting communication between the prod and dev systems is reasonable, it would be beneficial to be able to replay production-like data within our development systems for testing purposes. While it is not cost effective to buy large network devices with thousands of interfaces for testing, the Python module snmpsim provides “canned responses” that simulate real devise on the production network. For simplicity, I have a bash script that launches the SNMP responder.
This responder will replay data stored in the directory /opt/snmp/snmpsim/data – any file ending in snmprec will be included in the response, and the filename prior to .snmprec is the community string to access the response data. E.G. public.snmprec is the data for the public community string
The response files are in the format OID|TAG|VALUE where OID is the OID number of the SNMP object, TAG is an integer defined at https://pypi.org/project/snmpsim/0.2.3/
Valid tag values and their corresponding ASN.1/SNMP types are:
ASN.1/SNMP Type
Tag Value
Integer32
2
Octet String
4
Null
5
Object Identifier
6
IP Address
64
Counter32
65
Gauge32
66
Time Ticks
67
Opaque
68
Counter65
70
And the value is the data to be returned for the OID object. As an example:
1.3.6.1.2.1.1.3.0|67|2293092270
1.3.6.1.2.1.1.3.0 is the sysUpTime, the data type is TimeTicks, and the system up time is 2293092270 hundredths of a second. Or 6375 hours, 20 minutes, and 24 seconds.
Items within the response file need to be listed in ascending order.
Generating Response Data
There are two methods for creating the data provided to an SNMP GET request. A response file can be created manually, populated with OID objects that should be included in the response as well as sample data. Alternatively, a network trace can be gathered from the production network and parsed to create the response file.
Manually Generated Response File
While you can literally type data into a response file, but it is far easier to use a script to generate sample data. /opt/snmp/snmpsim/_genData.py is an example of creating a response file for about 1,000 interfaces
from datetime import datetime
import random
iRangeMax = 1000
dictTags = {'Integer': '2', 'OctetString': '4', 'NULL': '5', 'ObjectIdentifier': '6', 'IPAddress': '64', 'Counter32': '65', 'Gauge32': '66', 'TimeTicks': '67', 'Opaque': '68','Counter64': '70'} # Valid tags per https://pypi.org/project/snmpsim/0.2.3/
today = datetime.now()
iftable_snmp_objects = [
('1.3.6.1.2.1.2.2.1.1', 'Integer', lambda i: i), # ifIndex
('1.3.6.1.2.1.2.2.1.2', 'OctetString', lambda i: f"SampleInterface{i}"), # ifDescr
('1.3.6.1.2.1.2.2.1.3', 'Integer', lambda i: 6), # ifType
('1.3.6.1.2.1.2.2.1.4', 'Integer', lambda i: 1500), # ifMtu
('1.3.6.1.2.1.2.2.1.5', 'Gauge32', lambda i: 100000000), # ifSpeed
('1.3.6.1.2.1.2.2.1.6', 'OctetString', lambda i: f"00:00:00:00:{format(i, '02x')[:2]}:{format(i, '02x')[-2:]}"), # ifPhysAddress
('1.3.6.1.2.1.2.2.1.7', 'Integer', lambda i: 1), # ifAdminStatus
('1.3.6.1.2.1.2.2.1.8', 'Integer', lambda i: 1), # ifOperStatus
('1.3.6.1.2.1.2.2.1.9', 'TimeTicks', lambda i: int((datetime.now() - datetime(2024, random.randint(1, today.month), random.randint(1, today.day))).total_seconds()) * 100), # ifLastChange
('1.3.6.1.2.1.2.2.1.10', 'Counter32', lambda i: random.randint(3, i*50000)), # ifInOctets
('1.3.6.1.2.1.2.2.1.11', 'Counter32', lambda i: random.randint(3, i*50000)), # ifInUcastPkts
('1.3.6.1.2.1.2.2.1.12', 'Counter32', lambda i: random.randint(0, 80)), # ifInNUcastPkts
('1.3.6.1.2.1.2.2.1.13', 'Counter32', lambda i: random.randint(0, 80)), # ifInDiscards
('1.3.6.1.2.1.2.2.1.14', 'Counter32', lambda i: random.randint(0, 80)), # ifInErrors
('1.3.6.1.2.1.2.2.1.15', 'Counter32', lambda i: random.randint(3, i*50000)), # ifInUnknownProtos
('1.3.6.1.2.1.2.2.1.16', 'Counter32', lambda i: random.randint(3, i*50000)), # ifOutOctets
('1.3.6.1.2.1.2.2.1.17', 'Counter32', lambda i: random.randint(3, i*50000)), # ifOutUcastPkts
('1.3.6.1.2.1.2.2.1.18', 'Counter32', lambda i: random.randint(3, i*50000)), # ifOutNUcastPkts
('1.3.6.1.2.1.2.2.1.19', 'Counter32', lambda i: random.randint(0, 80)), # ifOutDiscards
('1.3.6.1.2.1.2.2.1.20', 'Counter32', lambda i: random.randint(0, 80)), # ifOutErrors
]
ifxtable_snmp_objects = [
('1.3.6.1.2.1.31.1.1.1.1', 'OctetString', lambda i: f"SampleInterface{i}"), # ifName
('1.3.6.1.2.1.31.1.1.1.15', 'Gauge32', lambda i: "100"), # ifHighSpeed
('1.3.6.1.2.1.31.1.1.1.6', 'Counter32', lambda i: random.randint(3, i*50000)), # ifHCInOctets
('1.3.6.1.2.1.31.1.1.1.10', 'Counter32', lambda i: random.randint(3, i*60000)), # ifHCOutOctets
]
# Print IFTable data
for oid_base, tag_type, value_func in iftable_snmp_objects:
for i in range(1, iRangeMax+1):
value = value_func(i)
print(f"{oid_base}.{i}|{dictTags.get(tag_type)}|{value}")
# IP-MIB objects for managing IP addressing
# ipAdEntAddr: The IP address to which this entry's addressing information pertains
print(f"1.3.6.1.2.1.4.20.1.1|{dictTags.get('IPAddress')}|10.5.5.5")
# ipAdEntIfIndex: The index value which uniquely identifies the interface to which this entry is applicable
print(f"1.3.6.1.2.1.4.20.1.2|{dictTags.get('OctetString')}|1")
# ipAdEntNetMask: The subnet mask associated with the IP address of this entry
print(f"1.3.6.1.2.1.4.20.1.3|{dictTags.get('OctetString')}|255.255.255.0")
# hrSWRunIndex: An index uniquely identifying a row in the hrSWRun table
print(f"1.3.6.1.2.1.25.4.2.1.1.1|{dictTags.get('Integer')}|1")
# hrSWRunName: The name of the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.2.1|{dictTags.get('OctetString')}|LJRSNMPAgent")
# hrSWRunID: The product ID of the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.3.1|{dictTags.get('ObjectIdentifier')}|1.3.6.1.4.1.25709.55")
# hrSWRunPath: The path of the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.4.1|{dictTags.get('OctetString')}|/opt/snmp/snmpsim/_agent.sh")
# hrSWRunParameters: Operational parameters for the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.5.1|{dictTags.get('OctetString')}|-L")
# hrSWRunType: The type of software running (e.g., operating system, application)
print(f"1.3.6.1.2.1.25.4.2.1.6.1|{dictTags.get('Integer')}|4")
# hrSWRunStatus: The status of this software (running, runnable, notRunnable, invalid)
print(f"1.3.6.1.2.1.25.4.2.1.7.1|{dictTags.get('Integer')}|1")
for oid_base, tag_type, value_func in ifxtable_snmp_objects:
for i in range(1, iRangeMax+1):
value = value_func(i)
print(f"{oid_base}.{i}|{dictTags.get(tag_type)}|{value}")
Network Capture
Even better, parse a network capture file.
Capture Data
On the server that gathers SNMP data from the host we want to simulate, use a network capture utility to gather the SNMP communication between the server and the desired device.
tcpdump -i <interface> -w <filename>.pcap
E.G. to record the communication with 10.5.171.114
tcpdump ‘host 10.5.171.114 and (tcp port 161 or tcp port 162 or udp port 161 or udp port 162)’ -w /tmp/ar.pcap
Note – there Is no benefit to capturing more than one cycle of SNMP responses. If data is captured immediately, that means the devices were in the middle of a cycle. End the capture and start a new one shortly. There should be no packets captured for a bit, then packets during the SNMP polling cycle, and then another pause until the next cycle.
Parsing The Capture Data Into A Response File
The following script parses the capture file into an snmprec response file – note, I needed to use 2.6.0rc1 of scapy to parse SNMP data. The 2.5.0 release version failed to parse most of the packets which I believe is related to https://github.com/secdev/scapy/issues/3900
from scapy.all import rdpcap, SNMP
from scapy.layers.inet import UDP
from scapy.packet import Raw
from scapy.layers.snmp import SNMP, SNMPvarbind, SNMPresponse, SNMPbulk
from scapy.all import conf, load_layer
from scapy.utils import hexdump
from scapy.all import UDP, load_contrib
from scapy.packet import bind_layers
import os
from datetime import datetime
import argparse
# Ensure Scapy's SNMP contributions are loaded
load_contrib("snmp")
def sort_by_oid(listSNMPResponses):
"""
Sorts a list of "OID|TAG|Value" strings by the OID numerically and hierarchically.
:param listSNMPResponses: A list of "OID|TAG|Value" strings.
:return: A list of "OID|TAG|Value" strings sorted by OID.
"""
# Split each element into a tuple of (OID list, original string), converting OID to integers for proper comparison
oid_tuples = [(list(map(int, element.split('|')[0].split('.'))), element) for element in listSNMPResponses]
# Sort the list of tuples by the OID part (the list of integers)
sorted_oid_tuples = sorted(oid_tuples, key=lambda x: x[0])
# Extract the original strings from the sorted list of tuples
sorted_listSNMPResponses = [element[1] for element in sorted_oid_tuples]
return sorted_listSNMPResponses
parser = argparse.ArgumentParser(description='This script converts an SNMP packet capture into a snmpsim response file')
parser.add_argument('--filename', '-f', help='The capture file to process', required=True)
args = parser.parse_args()
strFullCaptureFilePath = args.filename
strCaptureFilePath, strCaptureFileName = os.path.split(strFullCaptureFilePath)
# Valid tags per https://pypi.org/project/snmpsim/0.2.3/
dictTags = {'ASN1_INTEGER': '2', 'ASN1_STRING': '4', 'ASN1_NULL': '5', 'ASN1_OID': '6', 'ASN1_IPADDRESS': '64', 'ASN1_COUNTER32': '65', 'ASN1_GAUGE32': '66', 'ASN1_TIME_TICKS': '67', 'Opaque': '68','ASN1_COUNTER64': '70'}
listSNMPResponses = []
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.1.1|2|1")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.2.1|4|LJRSNMPAgent")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.3.1|6|1.3.6.1.4.1.25709.55")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.4.1|4|/opt/snmp/snmpsim/_agent.sh")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.5.1|4|-L")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.6.1|2|4")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.7.1|2|1")
i = 0
if True:
packets = rdpcap(strFullCaptureFilePath)
# Packets are zero indexed, so packet 1 in script is packet 2 in Wireshark GUI
#for i in range(0,4):
for packet in packets:
print(f"Working on packet {i}")
i = i + 1
if SNMP in packet:
snmp_layer = packet[SNMP]
if isinstance(packet[SNMP].PDU,SNMPresponse):
snmp_response = snmp_layer.getfield_and_val('PDU')[1]
if hasattr(snmp_response, 'varbindlist') and snmp_response.varbindlist is not None:
for varbind in snmp_response.varbindlist:
strOID = varbind.oid.val if hasattr(varbind.oid, 'val') else str(varbind.oid)
strValue = varbind.value.val if hasattr(varbind.value, 'val') else str(varbind.value)
strType = type(varbind.value).__name__
if dictTags.get(strType):
iType = dictTags.get(strType)
else:
iType = strType
if isinstance(strValue, bytes):
print(f"Decoding {strValue}")
strValue = strValue.decode('utf-8',errors='ignore')
print(f"OID: {strOID}, Type: {strType}, Tag: {iType}, Value: {strValue}")
listSNMPResponses.append(f"{strOID}|{iType}|{strValue}")
else:
print(f"Not a response -- type is {type(packet[SNMP].PDU)}")
elif Raw in packet:
print(f"I have a raw packet at {i}")
else:
print(dir(packet))
print(f"No SNMP or Raw in {i}: {packet}")
# Sort by OID numbers
listSortedSNMPResponses = sort_by_oid(listSNMPResponses)
f = open(f'/opt/snmp/snmpsim/data/{datetime.now().strftime("%Y%m%d")}-{strCaptureFileName.rsplit(".", 1)[0]}.deactivated', "w")
for strSNMPResponse in listSortedSNMPResponses:
print(strSNMPResponse)
f.write(strSNMPResponse)
f.write("\n")
f.close()
This will create an snmpsim response file at /opt/snmp/snmpsim/data named as the capture file prefixed with the current year, month, and date. I.E. My ar.cap file results are /opt/snmp/snmpsim/data/20240705-ar.deactivated – you can then copy the file to whatever community string you want – cp 20240705-ar.deactivated CommunityString.snmprec