Tag: zookeeper

Kafka Metadata Mismatch

I’ll prefix this with a caveat — when Zookeeper and the Kafka metadata properties topic ID values don’t match up … there’s something wrong beyond “hey, make this string the same as that string and try again”. Look for communication issues, disk issues, etc that might lead to bad data. And, lacking any cause, the general recommendation is to drop and recreate the topic.

But! “Hey, I am going to delete all the data in this Kafka topic. Everyone good with that?” is seldom answered with a resounding “of course, we don’t actually want to keep the data we specifically configured a system to keep”. And you need to keep the topic around even though something has clearly gone sideways. Fortunately, you can manually update either the zookeeper topic ID or the one on disk. I find it easier to update the one on disk in Kafka because this change can be reverted. Stop all of the Kafka servers. The following script runs on each Kafka server – identifies all of the folders within the Kafka data for the partition and changes the wrong topic ID to the right one. Then start the Kafka servers again, and the partition should be functional. For now.

#!/bin/bash

# Function to check if Kafka is stopped
check_kafka_stopped() {
    if systemctl is-active --quiet kafka; then
        echo "Error: Kafka service is still running. Please stop Kafka before proceeding."
        exit 1
    else
        echo "Kafka service is stopped. Proceeding with backup and update."
    fi
}

# Define the base directory where Kafka stores its data
DATA_DIR="/path/todata-kafka"

# Define the topic prefix to search for
TOPIC_PREFIX="MY_TOPIC_NAME-"

# Old and new topic IDs
OLD_TOPIC_ID="2xSmlPMBRv2_ihuWgrvQNA"
NEW_TOPIC_ID="57kBnenRTo21_VhEhWFOWg"

# Date string for backup file naming
DATE_STR=$(date +%Y%m%d)

# Check if Kafka is stopped
check_kafka_stopped

# Find all directories matching the topic pattern and process each one
for dir in $DATA_DIR/${TOPIC_PREFIX}*; do
    # Construct the path to the partition.metadata file
    metadata_file="$dir/partition.metadata"

    # Check if the metadata file exists
    if [[ -f "$metadata_file" ]]; then
        # Backup the existing partition.metadata file
        backup_file="$dir/partmetadata.$DATE_STR"
        echo "Backing up $metadata_file to $backup_file"
        cp "$metadata_file" "$backup_file"

        # Use sed to replace the line containing the old topic_id with the new one
        echo "Updating topic_id in $metadata_file"
        sed -i "s/^topic_id: $OLD_TOPIC_ID/topic_id: $NEW_TOPIC_ID/" "$metadata_file"
    else
        echo "No partition.metadata file found in $dir"
    fi
done

echo "Backup and topic ID update complete."

How do you get the Kafka metadata and Zookeeper topic IDs? In Zookeeper, ask it:

kafkaserver::fixTopicID # bin/zookeeper-shell.sh $(hostname):2181
get /brokers/topics/MY_TOPIC_NAME
{"removing_replicas":{},"partitions":{"2":[251,250],"5":[249,250],"27":[248,247],"12":[248,251],"8":[247,248],"15":[251,250],"21":[247,250],"18":[249,248],"24":[250,248],"7":[251,247],"1":[250,249],"17":[248,247],"23":[249,247],"26":[247,251],"4":[248,247],"11":[247,250],"14":[250,248],"20":[251,249],"29":[250,249],"6":[250,251],"28":[249,248],"9":[248,249],"0":[249,248],"22":[248,251],"16":[247,251],"19":[250,249],"3":[247,251],"10":[251,249],"25":[251,250],"13":[249,247]},"topic_id":"57kBnenRTo21_VhEhWFOWg","adding_replicas":{},"version":3}

Kafka’s is stored on disk in each folder for a topic partition:

kafkaserver::fixTopicID # cat /path/to/data-kafka/MY_TOPIC_NAME-29/partition.metadata
version: 0
topic_id: 2xSmlPMBRv2_ihuWgrvQNA

Once the data is processed, it would be a good idea to recreate the topic … but ensuring the two topic ID values match up will get you back online.

Zookeeper: Finding the Leader

When restarting our ensemble of zookeepers, I restart the leader last (to avoid repeatedly reallocating the role). Which means I’ve got to find the leader. Luckily the zookeepers are happy to report if they are the leader or a follower if you send ‘srvr’ to the zookeeper port.

jumpserver:~ # echo srvr | nc zcserver38.example.net 2181
Zookeeper version: 3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, built on 05/04/2020 15:53 GMT
Latency min/avg/max: 0/0/1383
Received: 3783871
Sent: 3784761
Connections: 7
Outstanding: 0
Zxid: 0x800003d25
Mode: follower
Node count: 3715

Looking at the “Mode” line above, I can see that’s the follower. So I’ll check the next Zookeeper …

jumpserver:~ # echo srvr | nc zcserver39.example.net 2181
Zookeeper version: 3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, built on 05/04/2020 15:53 GMT
Latency min/avg/max: 0/0/1167
Received: 836866
Sent: 848235
Connections: 1
Outstanding: 0
Zxid: 0x800003d25
Mode: leader
Node count: 3715
Proposal sizes last/min/max: 36/32/19782

And that’s the leader — so 39 will be the last one rebooted.

Kafka: Finding the Controller in Zookeeper

When restarting all of the Kafka servers (e.g. a periodic patch and reboot), it is better if you avoid rolling the controller between the nodes on every reboot. To accomplish this goal, find out which server is acting as the controller and restart it last — you’ll have the controller move one time using this method, but only once no matter how many servers are in your deployment.

# Connect to zookeeper
./zookeeper-shell.sh localhost:2181
# Find controller by opening the zookeeper shell and querying for controller
get /controller
{"version":1,"brokerid":250,"timestamp":"1676694139851"}

# Get details on broker ID reported as controller
get /brokers/ids/250
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","SASL_PLAINTEXT":"SASL_PLAINTEXT"},"endpoints":["PLAINTEXT://kafkahost.example.net:9093","SASL_PLAINTEXT://kafkahost.example.net:9092"],"jmx_port":9999,"host":"kafkahost.example.net","timestamp":"1676348503278","port":9093,"version":4}