Category: Kafka

Kafka Metadata Mismatch

I’ll prefix this with a caveat — when Zookeeper and the Kafka metadata properties topic ID values don’t match up … there’s something wrong beyond “hey, make this string the same as that string and try again”. Look for communication issues, disk issues, etc that might lead to bad data. And, lacking any cause, the general recommendation is to drop and recreate the topic.

But! “Hey, I am going to delete all the data in this Kafka topic. Everyone good with that?” is seldom answered with a resounding “of course, we don’t actually want to keep the data we specifically configured a system to keep”. And you need to keep the topic around even though something has clearly gone sideways. Fortunately, you can manually update either the zookeeper topic ID or the one on disk. I find it easier to update the one on disk in Kafka because this change can be reverted. Stop all of the Kafka servers. The following script runs on each Kafka server – identifies all of the folders within the Kafka data for the partition and changes the wrong topic ID to the right one. Then start the Kafka servers again, and the partition should be functional. For now.

#!/bin/bash

# Function to check if Kafka is stopped
check_kafka_stopped() {
    if systemctl is-active --quiet kafka; then
        echo "Error: Kafka service is still running. Please stop Kafka before proceeding."
        exit 1
    else
        echo "Kafka service is stopped. Proceeding with backup and update."
    fi
}

# Define the base directory where Kafka stores its data
DATA_DIR="/path/todata-kafka"

# Define the topic prefix to search for
TOPIC_PREFIX="MY_TOPIC_NAME-"

# Old and new topic IDs
OLD_TOPIC_ID="2xSmlPMBRv2_ihuWgrvQNA"
NEW_TOPIC_ID="57kBnenRTo21_VhEhWFOWg"

# Date string for backup file naming
DATE_STR=$(date +%Y%m%d)

# Check if Kafka is stopped
check_kafka_stopped

# Find all directories matching the topic pattern and process each one
for dir in $DATA_DIR/${TOPIC_PREFIX}*; do
    # Construct the path to the partition.metadata file
    metadata_file="$dir/partition.metadata"

    # Check if the metadata file exists
    if [[ -f "$metadata_file" ]]; then
        # Backup the existing partition.metadata file
        backup_file="$dir/partmetadata.$DATE_STR"
        echo "Backing up $metadata_file to $backup_file"
        cp "$metadata_file" "$backup_file"

        # Use sed to replace the line containing the old topic_id with the new one
        echo "Updating topic_id in $metadata_file"
        sed -i "s/^topic_id: $OLD_TOPIC_ID/topic_id: $NEW_TOPIC_ID/" "$metadata_file"
    else
        echo "No partition.metadata file found in $dir"
    fi
done

echo "Backup and topic ID update complete."

How do you get the Kafka metadata and Zookeeper topic IDs? In Zookeeper, ask it:

kafkaserver::fixTopicID # bin/zookeeper-shell.sh $(hostname):2181
get /brokers/topics/MY_TOPIC_NAME
{"removing_replicas":{},"partitions":{"2":[251,250],"5":[249,250],"27":[248,247],"12":[248,251],"8":[247,248],"15":[251,250],"21":[247,250],"18":[249,248],"24":[250,248],"7":[251,247],"1":[250,249],"17":[248,247],"23":[249,247],"26":[247,251],"4":[248,247],"11":[247,250],"14":[250,248],"20":[251,249],"29":[250,249],"6":[250,251],"28":[249,248],"9":[248,249],"0":[249,248],"22":[248,251],"16":[247,251],"19":[250,249],"3":[247,251],"10":[251,249],"25":[251,250],"13":[249,247]},"topic_id":"57kBnenRTo21_VhEhWFOWg","adding_replicas":{},"version":3}

Kafka’s is stored on disk in each folder for a topic partition:

kafkaserver::fixTopicID # cat /path/to/data-kafka/MY_TOPIC_NAME-29/partition.metadata
version: 0
topic_id: 2xSmlPMBRv2_ihuWgrvQNA

Once the data is processed, it would be a good idea to recreate the topic … but ensuring the two topic ID values match up will get you back online.

Kafka Consumer and Producer Example – Java

Producer

package com.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.InputStream;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

public class SimpleKafkaProducer {

    public static void main(String[] args) {
        if (args.length < 2) {
            System.err.println("Please provide Kafka server and port. Usage: SimpleKafkaProducer <kafka-server> <port>");
            System.exit(1);
        }

        String kafkaHost = args[0];
        String kafkaPort = args[1];
        String kafkaServer = kafkaHost + ":" + kafkaPort;
        String topicName = "LJRJavaTest";
        String truststorePassword = "truststore-password";

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Load truststore from resources
        try (InputStream truststoreStream = SimpleKafkaProducer.class.getResourceAsStream("/kafka.client.truststore.jks")) {
            if (truststoreStream == null) {
                throw new RuntimeException("Truststore not found in resources");
            }

            // Create a temporary file to hold the truststore
            java.nio.file.Path tempTruststore = java.nio.file.Files.createTempFile("kafka.client.truststore", ".jks");
            java.nio.file.Files.copy(truststoreStream, tempTruststore, java.nio.file.StandardCopyOption.REPLACE_EXISTING);

            props.put("security.protocol", "SSL");
            props.put("ssl.truststore.location", tempTruststore.toString());
            props.put("ssl.truststore.password", truststorePassword);

            KafkaProducer<String, String> producer = new KafkaProducer<>(props);

            // Define the date-time formatter
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd 'at' HH:mm:ss");

            try {
                for (int i = 0; i < 10; i++) {
                    String key = "Key-" + i;
                    
                    // Get the current timestamp
                    String timestamp = LocalDateTime.now().format(formatter);
                    
                    // Include the timestamp, Kafka host, and port in the message value
                    String value = String.format("Message-%d (Test from %s on server %s:%s)", i, timestamp, kafkaHost, kafkaPort);
                    
                    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
                    producer.send(record);
                    System.out.println("Sent message: (" + key + ", " + value + ")");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Consumer

package com.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.InputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleKafkaConsumer {

    public static void main(String[] args) {
        if (args.length < 2) {
            System.err.println("Please provide Kafka server and port. Usage: SimpleKafkaConsumer <kafka-server> <port>");
            System.exit(1);
        }

        String kafkaServer = args[0] + ":" + args[1];
        String topicName = "LJRJavaTest";
        String groupId = "test-consumer-group";
        String truststorePassword = "truststore-password";

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Load truststore from resources
        try (InputStream truststoreStream = SimpleKafkaConsumer.class.getResourceAsStream("/kafka.client.truststore.jks")) {
            if (truststoreStream == null) {
                throw new RuntimeException("Truststore not found in resources");
            }

            // Create a temporary file to hold the truststore
            java.nio.file.Path tempTruststore = java.nio.file.Files.createTempFile("kafka.client.truststore", ".jks");
            java.nio.file.Files.copy(truststoreStream, tempTruststore, java.nio.file.StandardCopyOption.REPLACE_EXISTING);

            props.put("security.protocol", "SSL");
            props.put("ssl.truststore.location", tempTruststore.toString());
            props.put("ssl.truststore.password", truststorePassword);

            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList(topicName));

            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("Consumed message: (key: %s, value: %s, offset: %d, partition: %d)%n",
                                record.key(), record.value(), record.offset(), record.partition());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Keystore for Trusts

The cert on my Kafka server is signed by a CA that has both a root and intermediary signing certificate. To trust the chain, I need to populate rootCA.crt and intermediateCA.crt files with the correct base64 encoded public key and import both files into a new trust store:

keytool -importcert -file rootCA.crt -keystore kafka.client.truststore.jks -alias rootCA -storepass truststore-password

keytool -importcert -file intermediateCA.crt -keystore kafka.client.truststore.jks -alias intermediateCA -storepass truststore-password

Then list certs in the trust store to verify the import was successful:

keytool -list -keystore kafka.client.truststore.jks -storepass truststore-password

And place the jks keystore with the CA certs in project:

C:.
│   pom.xml
│
├───src
│   └───main
│       ├───java
│       │   └───com
│       │       └───example
│       │               SimpleKafkaProducer.java
│       │
│       └───resources
│               kafka.client.truststore.jks

Build and Run

Create a pom.xml for the build

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>kafka-producer-example</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Kafka client dependency -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Compiler plugin to ensure Java 8 compatibility -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!-- Exec Maven Plugin to run the Java class -->
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <mainClass>com.example.SimpleKafkaProducer</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Build using `mvn clean package`

Run the producer using ` mvn exec:java -Dexec.mainClass=”com.example.SimpleKafkaProducer” -Dexec.args=”kafkahost.example.net 9095″`

Run the consumer using ` mvn exec:java -Dexec.mainClass=”com.example.SimpleKafkaConsumer” -Dexec.args=”kafkahost.example.net 9095″`

Kafka Streams, Consumer Groups, and Stickiness

The Java application I recently inherited had a lot of … quirks. One of the strangest was that it calculated throughput statistics based on ‘start’ values in a cache that was only refreshed every four hours. So at a minute past the data refresh, the throughput is averaged out over that minute. At three hours and fifty nine minutes past the data refresh, the throughput is averaged out over three hours and fifty nine minutes. In the process of correcting this (reading directly from the cached data rather than using an in-memory copy of the cached data), I noticed that the running application paused a lot as the Kafka group was re-balanced.

Which is especially odd because I’ve got a stable number of clients in each consumer group. But pods restart occasionally, and there was nothing done to attempt to stabilize partition assignment.

Which was odd because Kafka has had mechanisms to reduce re-balancing — StickyAssignor added in 0.11

        // Set the partition assignment strategy to StickyAssignor
        config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");

And groupInstanceId in 2.3.0

        // Set the group instance ID
        String groupInstanceId = UUID.randomUUID().toString();
        config.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);

Now, I’m certain that a UUID isn’t the best way to go about crafting your group instance ID name … but it produces a “name” that isn’t likely to be duplicated. Since deploying this change, I went from seeing three or four re-balance operations an hour to zero.

Kafka Streams Group Members and Topic Partitions

I encountered an oddity in a Java application that uses Kafka Streams to implement a scalable application that reads data from Kafka topics. Data is broken out into multiple topics, and there are Kubernetes pods (“workers”) reading from each topic. The pods have different numbers of replicas defined. But it appears that no one ever aligned the topic partitions with the number of workers being deployed.

Kafka Streams assigns “work” to group members by partition. If you have ten partitions and five workers, each worker processes the data from two partitions. However, when the numbers don’t line up … some workers get more partitions than others. Were you to have eleven partitions and five workers, four workers would get data from two partitions and the fifth worker gets data from three.

Worse – in some cases we have more workers than partitions. Those extra workers are using up some resources, but they’re not actually processing data.

It’s a quick fix — partitions can be added mostly invisibly (the consumer group will be re-balanced, write operations won’t really change. New data just starts getting placed in the new partitions), so I increased our partition counts to be 2x the number of workers. This allows us to add a few workers to a topic if it gets backlogged, but the configuration evenly distributes the work across all of the normally running pods.

Communicating With Kafka Server Using SSL

Update the Client Configuration

Use the keytool command to create a trust store with the CA chain used in your certificates. I am using Venafi, so I need to import two CA public keys:

keytool -keystore kafka.truststore.jks -alias SectigoRoot -import -file "Sectigo RSA Organization Validation Secure Server CA.crt"
keytool -keystore kafka.truststore.jks -alias UserTrustRoot -import -file "USERTrust RSA Certification Authority.crt"

Update the Client Configuration

Create a producer-ssl.properties or consumer-ssl.properties based on your current producer/consumer properties file. Update the port – 9095 is used for SSL – and append the following lines

security.protocol=SSLssl.truststore.location=/path/to/kafka.truststore.jks
ssl.truststore.password=<WhateverYouSetInThePreviousStep>

Using the CLI Client Tools

Once you have a property configured properties file, you can invoke either the kafka-console-consumer.sh or kafka-console-producer.sh scripts indicating your new properties file:

/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka1586.example.net:9095 --topic LJRTest --consumer.config /kafka/config/consumer-ssl.properties --group LJR5

/kafka/bin/kafka-console-producer.sh --bootstrap-server kafka1586.example.net:9095 --topic LJRTest --producer.config /kafka/config/producer-ssl.properties

To debug SSL communication, set the following KAFKA_OPTS prior to invoking the command line producer/consumer utilities:

export KAFKA_OPTS="-Djavax.net.debug=ssl,handshake"

Adding SSL To Kafka Server

Obtain SSL Certificates for Each Server

The following process was used to enable SSL communication with the Kakfa servers. Firstly, generate certificates for each server in the environment. I am using a third-party certificate provider, Venafi. When you download the certificates, make sure to select the “PEM (OpenSSL)” format and check the box to “Extract PEM content into separate files (.crt, .key)”

Upload each zip file to the appropriate server under /tmp/ named in the $(hostname).zip format. The following series of commands creates the files needed in the Kafka server configuration. You will be asked to set passwords for the keystore and truststore JKS files. Don’t forget what you use — we’ll need them later.

# Assumes Venafi certificates downloaded as OpenSSL zip files with separate public/private keys are present in /tmp/$(hostname).zip
mkdir /kafka/config/ssl/$(date +%Y)
cd /kafka/config/ssl/$(date +%Y)
mv /tmp/$(hostname).zip ./
unzip $(hostname).zip

# Create keystore for Kakfa
openssl pkcs12 -export -in $(hostname).crt -inkey $(hostname).key -out $(hostname).p12 -name $(hostname) -CAfile ./ca.crt -caname root
keytool -importkeystore -destkeystore $(hostname).keystore.jks -srckeystore $(hostname).p12 -srcstoretype pkcs12 -alias $(hostname)

# Create truststore from CA certs
keytool -keystore kafka.server.truststore.jks -alias SectigoRoot -import -file "Sectigo RSA Organization Validation Secure Server CA.crt"
keytool -keystore kafka.server.truststore.jks -alias UserTrustRoot -import -file "USERTrust RSA Certification Authority.crt"

# Fix permissions
chown -R kafkauser:kafkagroup /kafka/config/ssl

# Create symlinks for current-year certs
cd ..
ln -s /kafka/config/ssl/$(date +%Y)/$(hostname).keystore.jks /kafka/config/ssl/kafka.keystore.jks
ln -s /kafka/config/ssl/$(date +%Y)/kafka.server.truststore.jks /kafka/config/ssl/kafka.truststore.jks

By creating symlinks to the active certs, you can renew the certificates by creating a new /kafka/config/ssl/$(date +%Y) folder and updating the symlink. No change to the configuration files is needed.

Update Kafka server.properties to Use SSL

Append a listener prefixed with SSL:// to the existing listeners – as an example:

#2024-03-27 LJR Adding SSL port on 9095
#listeners=PLAINTEXT://kafka1587.example.net:9092
#advertised.listeners=PLAINTEXT://kafka1587.example.net:9092
listeners=PLAINTEXT://kafka1587.example.net:9092,,SSL://kafka1587.example.net:9095
advertised.listeners=PLAINTEXT://kafka1587.example.net:9092,SSL://kafka1587.example.net:9095

Then add configuration values to use the keystore and truststore, specify which SSL protocols will be permitted, and set whatever client auth requirements you want:

ssl.keystore.location=/kafka/config/ssl/kafka.keystore.jks
ssl.keystore.password=<WhateverYouSetEarlier>
ssl.truststore.location=/kafka/config/ssl/kafka.truststore.jks
ssl.truststore.password=<WhateverYouSetForThisOne>
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.client.auth=none # Or whatever auth setting you require

Save the server.properties file and use “systemctl restart kafka” to restart the Kafka service.

Update Firewall Rules to Permit Traffic on New Port

firewall-cmd –add-port=9095/tcp
firewall-cmd –add-port=9095/tcp –permanent

Counting Messages in All Kafka Topics

For some reason, I am given a lot of Kafka instances that no one knows what they are or what they do. The first step, generally, is figuring out if it does anything. Because a server that no one has sent a message to in a year or two … well, there’s not much point in bringing it up to standard, monitoring it, and such. My first glance analysis has been just counting all of the messages in all of the topics to see which topics are actually used — quick bash script to accomplish this (presuming a Kafka broker is on port 9092 of the host running the script)

strTopics=$(./kafka-topics.sh --list --bootstrap-server $(hostname):9092)

SAVEIFS=$IFS   
IFS=$'\n'      
arrayTopics=($strTopics)
IFS=$SAVEIFS   

for i in "${arrayTopics[@]}"; do iMessages=`./kafka-console-consumer.sh --bootstrap-server $(hostname):9092 --topic $i --property print.timestamp=true --from-beginning --timeout-ms=10000 2>&1 | grep "Processed a total of"`;         echo "$i     $iMessages"; done

Kafka Manager SSL Issue

We renewed the certificate on our Kafka Manager (now called CMAK, but we haven’t upgraded yet so it’s still ‘manager’), but the site wouldn’t come up. It did, however, dump a bunch of java ick into the log file

Jan 16 14:01:52 kafkamanager kafka-manager: [^[[31merror^[[0m] p.c.s.NettyServer$PlayPipelineFactory - cannot load SSL context
Jan 16 14:01:52 kafkamanager kafka-manager: java.lang.reflect.InvocationTargetException: null
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createScalaSSLEngineProvider(ServerSSLEngine.scala:96) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createSSLEngineProvider(ServerSSLEngine.scala:32) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.liftedTree1$1(NettyServer.scala:113) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.sslEngineProvider$lzycompute(NettyServer.scala:112) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.sslEngineProvider(NettyServer.scala:111) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.getPipeline(NettyServer.scala:90) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: Caused by: java.lang.Exception: Error loading HTTPS keystore from /path/to/kafkamgr.example.net.jks
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.DefaultSSLEngineProvider.createSSLContext(DefaultSSLEngineProvider.scala:47) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.DefaultSSLEngineProvider.<init>(DefaultSSLEngineProvider.scala:21) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createScalaSSLEngineProvider(ServerSSLEngine.scala:96) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createSSLEngineProvider(ServerSSLEngine.scala:32) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.liftedTree1$1(NettyServer.scala:113) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.sslEngineProvider$lzycompute(NettyServer.scala:112) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: Caused by: java.security.UnrecoverableKeyException: Cannot recover key
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.KeyProtector.recover(KeyProtector.java:315) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.JavaKeyStore.engineGetKey(JavaKeyStore.java:141) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.JavaKeyStore$JKS.engineGetKey(JavaKeyStore.java:56) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.KeyStoreDelegator.engineGetKey(KeyStoreDelegator.java:96) ~[na:1.8.0_251]

Elsewhere in the log file, we got output that looks like not-decrypted stuff …

Jan 16 14:01:52 kafkamanager kafka-manager: java.lang.IllegalArgumentException: invalid version format:  ̄G^H▒~A�▒~Zᆴ▒~@▒~A:U▒~HP▒~W5▒~W▒D¬ᄡ^K/↓▒
￧^S▒L
Jan 16 14:01:52 kafkamananger kafka-manager: "^S^A^S^C^S^B▒~@+▒~@/▒~Lᄅ▒~Lᄄ▒~@,▒~@0▒~@

Which led me to hypothesize that either the keystore password wasn’t right (it was, I could use keytool to view the jks file) or the key password wasn’t right. It wasn’t — there isn’t actually a way to configure the key password in Kafka Manager, just a parameter to configure the keystore password. You’ve got to re-use that password for the key password.

To change the key password in a JKS file, use keytool, enter the keystore and key password when prompted, then enter the new key password when prompted.

keytool --keypasswd -alias kafkamanager.example.net -keystore ljr.jks

Voila — once both the key and keystore matched the password configured in play.server.https.keyStore.password … the Kafka Manager service started up and worked properly.

 

Automatically Adding “Extra” JMX Ports to Firewalld

A few months ago, I had dug into a mystery at work — even though JMX was configured to use port 9999, port 9999 was open from the client to the server, and the client was configured to use port 9999 … our Kafka manager tool could only report statistics from the local Kafka server. It failed to retrieve data for the remote ones — saying it was unable to connect. Long story short, JMX uses “it’s” port and two other randomly selected (and not readily configurable) ports. To automate getting JMX working when Kafka is restarted, I built this shell script. It identifies which ports are in use by Java, and it transiently adds them to the firewall rules (since the ports change on each service start, transient firewall rules made sense here). My plan is to link the script to the Kafka unit file as an ExecStartPost directive.

# Get ports linked to java
mapfile -t array_of_ports < <( ss -6 -l -t -p -n | grep java | cut -d ":" -f 4 | grep -v "10.166" | sed -e 's/\ *$//g')

declare -p array_of_ports

for i in "${array_of_ports[@]}"
do
   : 
   if (( i > 20000 )); then
      echo "/bin/firewall-cmd --zone=public --add-port=$i/tcp"
      output=`/bin/firewall-cmd --zone=public --add-port=$i/tcp`
      echo $output
   fi
done

mapfile -t array_of_ports < <( ss -4 -l -t -p -n | grep java | cut -d ":" -f 2 | cut -d " " -f 1 | sed -e 's/\ *$//g')

declare -p array_of_ports

for i in "${array_of_ports[@]}"
do
   : 
   if (( i > 20000 )); then
      echo "/bin/firewall-cmd --zone=public --add-port=$i/tcp"
      output=`/bin/firewall-cmd --zone=public --add-port=$i/tcp`
      echo $output
   fi
done

Kafka Producer – Sending a Message with a Key

I needed to test sending messages into a topic where the cleanup policy is compact (keep the most recent data for each key) … which means I needed a quick way to send a message with a key to Kafka. Fortunately, the kafka-consume-producer script supports key parsing. You just need to include a few –property parameters when running the script.

./kafka-console-producer.sh --bootstrap-server kafka.example.com:9092 --topic ljrtesting --property "parse.key=true" --property "key.separator=:"

When you send messages, they are in the format Key<deliminator>Message — so “LJRKey:1” will send a message with the key of “LJRKey” and the message content of “1”.