To create the roles, use the ElasticSearch API to get the existing role definitions, remove a few attributes I don’t want to set (reserved, static, hidden), and create the corresponding role in OpenSearch. I skip all of the reserved roles.
import requests
from requests.auth import HTTPBasicAuth
f = open("results-roles.txt", "a")
objGetRoleRequest = requests.get(f"https://elasticsearch.example.com:9200/_opendistro/_security/api/roles", auth = HTTPBasicAuth('something', 'something'), verify=False)
dictRoleInfo = objGetRoleRequest.json()
for item in dictRoleInfo.items():
if item[1].get('reserved') is False:
print(item)
print("\n")
dictRoleDefinition = dict(item[1])
dictRoleDefinition.pop('reserved')
dictRoleDefinition.pop('static')
dictRoleDefinition.pop('hidden')
r = requests.put(f"https://opensearch.example.com:9200/_plugins/_security/api/roles/{item[0]}", json=dictRoleDefinition, auth = HTTPBasicAuth('something', 'something'), verify=False)
print(r.json())
if r.status_code == 200:
print(f"{item[0]}\t{r.status_code}\t{r.json()}\n")
f.write(f"{item[0]}\t{r.status_code}\t{r.json()}\n")
else:
print(f"HTTP Error: {r.status_code} on web call")
print(f"{item[0]}\t{r.status_code}\t{r.json()}\n")
f.write(f"{item[0]}\t{r.status_code}\t{r.json()}\n")
f.close()
One of the trickier bits of migrating from ElasticSearch to OpenSearch has been the local users — most of our users are authenticated via OAUTH, but programmatic access is done with local user accounts. Fortunately, you appear to be able to get the user password hash from the .opendistro_security API if you authenticate using an SSL cert.
This means the CN of the certificate being used must be registered in the elasticsearch.yml as an admin DN:
Provided the certificate is an admin_dn, the account can be used to search the .opendistro_security index and return local user info — including hashes. Information within the document is base 64 encoded, so the value needs to be decoded before you’ve got legible user information. One the user record has been obtained, the information can be used to POST details to the OpenSearch API and create a matching user.
import json
import requests
import base64
from requests.auth import HTTPBasicAuth
clientCrt = "./certs/ljr-mgr.pem"
clientKey = "./certs/ljr-mgr.key"
strOSAdminUser = 'something'
strOSAdminPass = 'something'
r = requests.get("https://elasticsearch.example.com:9200/.opendistro_security/_search?pretty", verify=False, cert=(clientCrt, clientKey))
if r.status_code == 200:
dictResult = r.json()
for item in dictResult.get('hits').get('hits'):
if item.get('_id') == "internalusers":
strInternalUsersXML = item.get('_source').get('internalusers')
strUserJSON = base64.b64decode(strInternalUsersXML).decode("utf-8")
dictUserInfo = json.loads(strUserJSON)
for tupleUserRecord in dictUserInfo.items():
strUserName = tupleUserRecord[0]
dictUserRecord = tupleUserRecord[1]
if dictUserRecord.get('reserved') == False:
dictUserDetails = {
"hash": dictUserRecord.get('hash'),
"opendistro_security_roles": dictUserRecord.get('opendistro_security_roles'),
"backend_roles": dictUserRecord.get('backend_roles'),
"attributes": dictUserRecord.get('attributes')
}
if dictUserRecord.get('description') is not None:
dictUserDetails["description"] = dictUserRecord.get('description')
reqCreateUser = requests.put(f'https://opensearch.example.com:9200/_plugins/_security/api/internalusers/{strUserName}', json=dictUserDetails, auth = HTTPBasicAuth(strOSAdminUser, strOSAdminPass), verify=False)
print(reqCreateUser.text)
else:
print(r.status_code)
Since we cannot do an in-place upgrade of our ElasticSearch environment, I need to move everything to the new servers. The biggest component is moving the data — which can easily be done using the remote reindex. Use the ElasticSearch API to get a list of all indices, and tell the OpenSearch API to reindex that index from the ElasticSearch remote. This operates on deltas — it will add new documents to an index — so my plan is to spend a few days seeding the initial data, then perform delta updates leading up to the scheduled change.
import requests
from requests.auth import HTTPBasicAuth
f = open("results.txt", "a")
listIndexNames = []
reqGetIndexes = requests.get('https://elasticsearch.example.com:9200/_cat/indices?format=json', auth=HTTPBasicAuth('something','something'), verify=False)
for jsonIndex in reqGetIndexes.json():
if jsonIndex.get('index')[0] != '.':
listIndexNames.append(jsonIndex.get('index'))
for strIndexName in listIndexNames:
jsonReindexItem = {
"source": {
"remote": {
"host": "https://elasticsearch.example.com:9200",
"username": "something",
"password": "something"
},
"index": strIndexName
},
"dest": {
"index": strIndexName
}
}
r = requests.post('https://opensearch.example.com:9200/_reindex', json=jsonReindexItem, auth = HTTPBasicAuth('something', 'something'), verify=False)
print(r.json())
jsonResponse = r.json()
if r.status_code == 400 and "mapping set to strict" in jsonResponse.get('failures')[0].get('cause').get("reason"):
# {'error': {'root_cause': [{'type': 'x_content_parse_exception', 'reason': '[1:2] [reindex] unknown field [key]'}], 'type': 'x_content_parse_exception', 'reason': '[1:2] [reindex] unknown field [key]'}, 'status': 400}
if jsonResponse.get('failures'):
print(jsonResponse.get('failures')[0].get('cause').get("reason"))
print("I need to set dynamic mapping")
r2 = requests.put(f'https://opensearch.example.com:9200/{strIndexName}/_mapping', json={"dynamic":"true"}, auth = HTTPBasicAuth('something', 'something'), verify=False)
print(r2.json)
r3 = requests.post('https://opensearch.example.com:9200/_reindex', json=jsonReindexItem, auth = HTTPBasicAuth('something', 'something), verify=False)
print(r.json())
print(f"{strIndexName}\t{r3.status_code}\t{r.json()}\n")
f.write(f"{strIndexName}\t{r3.status_code}\t{r.json()}\n")
elif r.status_code == 200:
print(jsonResponse)
print(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")
f.write(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")
else:
print(f"HTTP Error: {r.status_code} on web call")
print(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")
f.write(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")
f.close()
We spent a morning trying to figure out why containers in a new installation of Swarm just couldn’t talk to each other. Overlay network looked fine. Firewall looked fine. You could get from the host to the container, just not from the container to a container on the other server. So … here’s a bug where your swarm (i.e. the thing you do when you want docker stuff to run across more than one server) cannot actually, ya know, talk to the other servers. Sigh!
Use the keytool command to create a trust store with the CA chain used in your certificates. I am using Venafi, so I need to import two CA public keys:
Create a producer-ssl.properties or consumer-ssl.properties based on your current producer/consumer properties file. Update the port – 9095 is used for SSL – and append the following lines
Once you have a property configured properties file, you can invoke either the kafka-console-consumer.sh or kafka-console-producer.sh scripts indicating your new properties file:
The following process was used to enable SSL communication with the Kakfa servers. Firstly, generate certificates for each server in the environment. I am using a third-party certificate provider, Venafi. When you download the certificates, make sure to select the “PEM (OpenSSL)” format and check the box to “Extract PEM content into separate files (.crt, .key)”
Upload each zip file to the appropriate server under /tmp/ named in the $(hostname).zip format. The following series of commands creates the files needed in the Kafka server configuration. You will be asked to set passwords for the keystore and truststore JKS files. Don’t forget what you use — we’ll need them later.
# Assumes Venafi certificates downloaded as OpenSSL zip files with separate public/private keys are present in /tmp/$(hostname).zip
mkdir /kafka/config/ssl/$(date +%Y)
cd /kafka/config/ssl/$(date +%Y)
mv /tmp/$(hostname).zip ./
unzip $(hostname).zip
# Create keystore for Kakfa
openssl pkcs12 -export -in $(hostname).crt -inkey $(hostname).key -out $(hostname).p12 -name $(hostname) -CAfile ./ca.crt -caname root
keytool -importkeystore -destkeystore $(hostname).keystore.jks -srckeystore $(hostname).p12 -srcstoretype pkcs12 -alias $(hostname)
# Create truststore from CA certs
keytool -keystore kafka.server.truststore.jks -alias SectigoRoot -import -file "Sectigo RSA Organization Validation Secure Server CA.crt"
keytool -keystore kafka.server.truststore.jks -alias UserTrustRoot -import -file "USERTrust RSA Certification Authority.crt"
# Fix permissions
chown -R kafkauser:kafkagroup /kafka/config/ssl
# Create symlinks for current-year certs
cd ..
ln -s /kafka/config/ssl/$(date +%Y)/$(hostname).keystore.jks /kafka/config/ssl/kafka.keystore.jks
ln -s /kafka/config/ssl/$(date +%Y)/kafka.server.truststore.jks /kafka/config/ssl/kafka.truststore.jks
By creating symlinks to the active certs, you can renew the certificates by creating a new /kafka/config/ssl/$(date +%Y) folder and updating the symlink. No change to the configuration files is needed.
Update Kafka server.properties to Use SSL
Append a listener prefixed with SSL:// to the existing listeners – as an example:
#2024-03-27 LJR Adding SSL port on 9095
#listeners=PLAINTEXT://kafka1587.example.net:9092
#advertised.listeners=PLAINTEXT://kafka1587.example.net:9092
listeners=PLAINTEXT://kafka1587.example.net:9092,,SSL://kafka1587.example.net:9095
advertised.listeners=PLAINTEXT://kafka1587.example.net:9092,SSL://kafka1587.example.net:9095
Then add configuration values to use the keystore and truststore, specify which SSL protocols will be permitted, and set whatever client auth requirements you want:
ssl.keystore.location=/kafka/config/ssl/kafka.keystore.jks
ssl.keystore.password=<WhateverYouSetEarlier>
ssl.truststore.location=/kafka/config/ssl/kafka.truststore.jks
ssl.truststore.password=<WhateverYouSetForThisOne>
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.client.auth=none # Or whatever auth setting you require
Save the server.properties file and use “systemctl restart kafka” to restart the Kafka service.
Update Firewall Rules to Permit Traffic on New Port
I read this crazy way of cooking bacon — so much better than carefully spreading three slices out across the pan and cooking in batches. You take the whole package (or whatever portion thereof you wish to cook). Separate the slices, but pile them up loosely in the pan over medium heat.
This takes 15-25 minutes — just let it cook, stirring occasionally.
Then remove your beautiful, crispy, curly bacon slices to a paper towel to drain.