Category: Technology

Postgresql and Timescale with RedHat VDO

RedHat is phasing out ZFS – there are several reasons for this move, but primarily ZFS is a closed source Solaris (now Oracle) codebase. While OpenZFS exists, it’s not quite ‘the same’. RedHat’s preferred solution is Virtual Data Optimizer (VDO). This page walks through the process of installing PostgreSQL and creating a database cluster on VDO and installing TimescaleDB extension on the database cluster for RedHat Enterprise 8 (RHEL8)

Before we create a VDO disk, we need to install it

yum install vdo kmod-kvdo

Then we need to create a vdo – here a VDO named ‘PGData’ is created on /dev/sdb – a 9TB volume on which we will hold 16TB

vdo create --name=PGData --device=/dev/sdb --vdoLogicalSize=16T

Check to verify that the object was created – it is /dev/mapper/PGData in this instance

vdo list

Now format the volume using xfs.

mkfs.xfs /dev/mapper/PGData

And finally add a mount point

# Create the mount point folder
mkdir /pgpool
# Update fstab to mount the new volume to that mount pint
cat /etc/fstab
/dev/mapper/PGData /pgpool xfs defaults,x-systemd.requires=vdo.service 0 0
# Load the updated fstab
systemctl daemon-reload
# and mount the volume
mount -a

it should be mounted at ‘/pgpool/’

The main reason for using VDO with Postgres is because of its compression feature – this is automatically enabled, although we may need to tweak settings as we test it.

We now have a place in our pool where we want our Postgres database to store its data. So let’s go ahead and install PostgreSQL,

here we are using RHEL8 and installing PostgreSQL 12

# Install the repository RPM:
dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-8-x86_64/pgdg-redhat-repo-latest.noarch.rpm
dnf clean all
# Disable the built-in PostgreSQL module:
dnf -qy module disable postgresql
# Install PostgreSQL:
dnf install -y postgresql12-server

Once the installation is done we need to initiate the database cluster and start the server . Since we want our Postgres to store data in our VDO volume we need to initialize it into our custom directory, we can do that in many ways,

In all cases we need to make sure that the mount point of our zpool i.e., ‘/pgpool/pgdata/’ is owned by the ‘postgres’ user which is created when we install PostgreSQL. We can do that by running the below command before running below steps for starting the postgres server

mkdir /pgpool/pgdata
chown -R postgres:postgres /pgpool

Customize the systemd service by editing the postgresql-12 unit file and updateding the PGDATA environment variable

vdotest-uos:pgpool # grep Environment /usr/lib/systemd/system/postgresql-12.service
# Note: avoid inserting whitespace in these Environment= lines, or you may
Environment=PGDATA=/pgpool/pgdata

and  then initialize, enable and start our server as below

/usr/pgsql-12/bin/postgresql-12-setup initdb
systemctl enable postgresql-12
systemctl start postgresql-12

Here ‘/usr/pgsql-12/bin/’ is the bin directory of postgres installation you can substitute it with your bin directory path.

or

We can also directly give the data directory value while initializing db using below command

/usr/pgsql-12/bin/initdb -D /pgpool/pgdata/

and then start the server using

systemctl start postgresql-12

Now we have installed postgreSQL and started the server, we will install the Timescale extension for Postgres now.

add the time scale repo with below command

tee /etc/yum.repos.d/timescale_timescaledb.repo <<EOL
[timescale_timescaledb]
name=timescale_timescaledb
baseurl=https://packagecloud.io/timescale/timescaledb/el/8/\$basearch
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/timescale/timescaledb/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
EOL
sudo yum update -y

then install  it using below command

yum install -y timescaledb-postgresql-12

After installing we need to add ‘timescale’ to shared_preload_libraries in our postgresql.conf, Timescale gives us ‘timescaledb-tune‘ which can be used for this and also configuring different settings for our database. Since we initialize our PG database cluster in a custom location we need to point the direction of postgresql.conf to timescaledb-tune it also requires a path to our pg_config file we can do both by following command.

timescaledb-tune --pg-config=/usr/pgsql-12/bin/pg_config --conf-path=/pgpool/pgdata/postgresql.conf

After running above command we need to restart our Postgres server, we can do that by one of the below commands

systemctl restart postgresql-12

After restarting using one of the above commands connect to the database you want to use Timescale hypertables in and run below statement to load Timescale extension

CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;

you can check if Timescale is loaded by passing ‘\dx’ command to psql which will load the extension list.

in order to configure PostgreSQL to allow remote connection we need to do couple of changes as below

ElasticSearch to OpenSearch Migration: Creating Tenants

Finally, create the tenants … we’re using OAUTH for Kibana authentication, so I wasn’t able to use the API to export “saved objects”. Fortunately, we don’t have many tenants … and exporting/importing those saved objects manually isn’t an onerous task.

import requests
from requests.auth import HTTPBasicAuth

def createTenant(strTenantName, strDescription):
        jsonAddTenant = {  "description": strDescription }
        r2 = requests.put(f"https://opensearch.example.com:9200/_opendistro/_security/api/tenants/{strTenantName}", json=jsonAddTenant, auth = HTTPBasicAuth('something', 'something'), verify=False)
        print(r2.text)
        print(r2.status_code)

#  Get all tenants from ES
r = requests.get(f"https://elasticsearch.example.com:9200/_opendistro/_security/api/tenants", auth = HTTPBasicAuth('something', 'something'), verify=False)

dictAllTenants = r.json()

for item in dictAllTenants.items():
        if item[1].get('reserved') == False:
                createTenant(item[0], item[1].get('description'))

ElasticSearch to OpenSearch Migration: Lifecycle Management Policies

Since there are a lot of changes in how lifecycle policies work between ElasticSearch and OpenSearch, the recommendation I’ve seen is to manually create them … but it’s a lot of repetitive typing, so I used a script to create a base policy — a name with a a hot allocation — and manually added all of the remaining stages, transitions, and index patterns to which the policy should be applied.

import requests
from requests.auth import HTTPBasicAuth
import json
from time import sleep
from datetime import timedelta

f = open("data-LifecyclePolicies.txt", "w")

listIgnoredILMPolicies = ["watch-history-ilm-policy"]

# Get all roles from prod & list users in those roles
r = requests.get(f"https://elasticsearch.example.com:9200/_ilm/policy", auth = HTTPBasicAuth('something', 'something'), verify=False)

dictAllILMPolicies= r.json()

for item in dictAllILMPolicies.items():
        if item[0] not in listIgnoredILMPolicies:
                strILMPolicyName = item[0]
                dictILMPolicySettings = item[1]
                iHotDays = None
                iWarmDays = None
                iColdDays = None
                iDeleteDays = None
                if item[1].get('policy').get('phases').get('hot'):
                        iHotDays = (item[1].get('policy').get('phases').get('hot').get('min_age'))
                if item[1].get('policy').get('phases').get('warm'):
                        iWarmDays = (item[1].get('policy').get('phases').get('warm').get('min_age'))
                if item[1].get('policy').get('phases').get('cold'):
                        iColdDays = (item[1].get('policy').get('phases').get('cold').get('min_age'))
                if item[1].get('policy').get('phases').get('delete'):
                        iDeleteDays = (item[1].get('policy').get('phases').get('delete').get('min_age'))
                print(f"Policy named {strILMPolicyName} has phases:")
                print(f"\tHot {iHotDays}")
                print(f"\tWarm {iWarmDays}")
                print(f"\tCold {iColdDays}")
                print(f"\tDelete {iDeleteDays}")
                print("\n")

                f.write(f"Policy named {strILMPolicyName} has phases:\n")
                f.write(f"\tHot {iHotDays}\n")
                f.write(f"\tWarm {iWarmDays}\n")
                f.write(f"\tCold {iColdDays}\n")
                f.write(f"\tDelete {iDeleteDays}\n")
                f.write("\n")
                jsonILMPolicyCreation = {
                                  "policy": {
                                    "description": "Ported from ES7",
                                    "default_state": "hot",
                                    "states": [
                                      {
                                        "name": "hot",
                                        "actions": [
                                          {
                                            "retry": {
                                              "count": 3,
                                              "backoff": "exponential",
                                              "delay": "1m"
                                            },
                                            "allocation": {
                                              "require": {
                                                "temp": "hot"
                                              },
                                              "include": {},
                                              "exclude": {},
                                              "wait_for": "false"
                                            }
                                          }
                                        ],
                                        "transitions": []
                                      }
                                    ],
                                    "ism_template": []
                                  }
                                }

                r2 = requests.put(f"https://opensearch:9200/_plugins/_ism/policies/{item[0]}", json=jsonILMPolicyCreation, auth = HTTPBasicAuth('something', 'something'), verify=False)
                print(r2.text)
                print(r2.status_code)
f.close()

ElasticSearch to OpenSearch Migration: Map Users to Roles

After the roles are created, I need to map users into the roles — using the ElasticSearch API to list all roles and add each user to the corresponding OpenSearch role.

import requests
from requests.auth import HTTPBasicAuth

def addUserToRole(strRole, strUID):
        jsonAddUser = [
        {               "op": "add",            "path": f"/{strRole}",          "value": {"users": strUID} }]
        print(f"{strRole}\t{jsonAddUser}")
        r2 = requests.patch(f"https://opensearch.example.com:9200/_plugins/_security/api/rolesmapping", json=jsonAddUser, auth = HTTPBasicAuth('something', 'something'), verify=False)
        print(r2.text)
        print(r2.status_code)

listIgnoredGroups = ['security_rest_api_access', 'logstash_role', 'elastalert_role', 'kibana_server', 'wsadmin_role', 'mgmt_role', 'logstash', 'manage_snapshots', 'readall', 'all_access', 'own_index', 'kibana_user', ]

# Get all roles from prod & list users in those roles
#GET _opendistro/_security/api/rolesmapping/
r = requests.get(f"https://elasticsearch.example.com:9200/_opendistro/_security/api/rolesmapping/", auth = HTTPBasicAuth('something', 'something'), verify=False)

dictAllRoles = r.json()

# For each role, list out each user and add that user to that role in OS
for item in dictAllRoles.items():
        if item[0] not in listIgnoredGroups:
                for strUID in item[1].get('users'):
                        addUserToRole(item[0], item[1].get('users'))

ElasticSearch to OpenSearch Migration: Creating Roles

To create the roles, use the ElasticSearch API to get the existing role definitions, remove a few attributes I don’t want to set (reserved, static, hidden), and create the corresponding role in OpenSearch. I skip all of the reserved roles.

import requests
from requests.auth import HTTPBasicAuth

f = open("results-roles.txt", "a")

objGetRoleRequest = requests.get(f"https://elasticsearch.example.com:9200/_opendistro/_security/api/roles", auth = HTTPBasicAuth('something', 'something'), verify=False)
dictRoleInfo = objGetRoleRequest.json()
for item in dictRoleInfo.items():
        if item[1].get('reserved') is False:
                print(item)
                print("\n")
                dictRoleDefinition = dict(item[1])
                dictRoleDefinition.pop('reserved')
                dictRoleDefinition.pop('static')
                dictRoleDefinition.pop('hidden')
                r = requests.put(f"https://opensearch.example.com:9200/_plugins/_security/api/roles/{item[0]}", json=dictRoleDefinition, auth = HTTPBasicAuth('something', 'something'), verify=False)
                print(r.json())

                if r.status_code == 200:
                        print(f"{item[0]}\t{r.status_code}\t{r.json()}\n")
                        f.write(f"{item[0]}\t{r.status_code}\t{r.json()}\n")
                else:
                        print(f"HTTP Error: {r.status_code} on web call")
                        print(f"{item[0]}\t{r.status_code}\t{r.json()}\n")
                        f.write(f"{item[0]}\t{r.status_code}\t{r.json()}\n")
f.close()

ElasticSearch to OpenSearch: Local User Migration

One of the trickier bits of migrating from ElasticSearch to OpenSearch has been the local users — most of our users are authenticated via OAUTH, but programmatic access is done with local user accounts. Fortunately, you appear to be able to get the user password hash from the .opendistro_security API if you authenticate using an SSL cert.

This means the CN of the certificate being used must be registered in the elasticsearch.yml as an admin DN:

plugins.security.authcz.admin_dn:
  - 'CN=admin,O=LJRTest,ST=Ohio,C=US'
  - 'CN=ljradmin,O=LJRTest,ST=Ohio,C=US'

Provided the certificate is an admin_dn, the account can be used to search the .opendistro_security index and return local user info — including hashes. Information within the document is base 64 encoded, so the value needs to be decoded before you’ve got legible user information. One the user record has been obtained, the information can be used to POST details to the OpenSearch API and create a matching user.

import json
import requests
import base64
from requests.auth import HTTPBasicAuth

clientCrt = "./certs/ljr-mgr.pem"
clientKey = "./certs/ljr-mgr.key"
strOSAdminUser = 'something'
strOSAdminPass = 'something'

r = requests.get("https://elasticsearch.example.com:9200/.opendistro_security/_search?pretty", verify=False, cert=(clientCrt, clientKey))
if r.status_code == 200:
        dictResult = r.json()

        for item in dictResult.get('hits').get('hits'):
                if item.get('_id') == "internalusers":
                        strInternalUsersXML = item.get('_source').get('internalusers')
                        strUserJSON = base64.b64decode(strInternalUsersXML).decode("utf-8")
                        dictUserInfo = json.loads(strUserJSON)
                        for tupleUserRecord in dictUserInfo.items():
                                strUserName = tupleUserRecord[0]
                                dictUserRecord = tupleUserRecord[1]
                                if dictUserRecord.get('reserved') == False:
                                        dictUserDetails = {
                                                "hash": dictUserRecord.get('hash'),
                                                "opendistro_security_roles": dictUserRecord.get('opendistro_security_roles'),
                                                "backend_roles": dictUserRecord.get('backend_roles'),
                                                "attributes": dictUserRecord.get('attributes')
                                                }

                                        if dictUserRecord.get('description') is not None:
                                                dictUserDetails["description"] = dictUserRecord.get('description')

                                        reqCreateUser = requests.put(f'https://opensearch.example.com:9200/_plugins/_security/api/internalusers/{strUserName}', json=dictUserDetails, auth = HTTPBasicAuth(strOSAdminUser, strOSAdminPass), verify=False)
                                        print(reqCreateUser.text)
else:
        print(r.status_code)

ElasticSearch to OpenSearch Migration: Remote Reindex to Move Data

Since we cannot do an in-place upgrade of our ElasticSearch environment, I need to move everything to the new servers. The biggest component is moving the data — which can easily be done using the remote reindex. Use the ElasticSearch API to get a list of all indices, and tell the OpenSearch API to reindex that index from the ElasticSearch remote. This operates on deltas — it will add new documents to an index — so my plan is to spend a few days seeding the initial data, then perform delta updates leading up to the scheduled change.

import requests
from requests.auth import HTTPBasicAuth

f = open("results.txt", "a")

listIndexNames = []

reqGetIndexes = requests.get('https://elasticsearch.example.com:9200/_cat/indices?format=json', auth=HTTPBasicAuth('something','something'), verify=False)
for jsonIndex in reqGetIndexes.json():
        if jsonIndex.get('index')[0] != '.':
                listIndexNames.append(jsonIndex.get('index'))

for strIndexName in listIndexNames:
  jsonReindexItem = {
    "source": {
      "remote": {
        "host": "https://elasticsearch.example.com:9200",
        "username": "something",
        "password": "something"
      },
  "index": strIndexName
    },
    "dest": {
  "index": strIndexName
    }
  }

  r = requests.post('https://opensearch.example.com:9200/_reindex', json=jsonReindexItem, auth = HTTPBasicAuth('something', 'something'), verify=False)
  print(r.json())
  jsonResponse = r.json()

  if r.status_code == 400 and "mapping set to strict" in jsonResponse.get('failures')[0].get('cause').get("reason"):
    # {'error': {'root_cause': [{'type': 'x_content_parse_exception', 'reason': '[1:2] [reindex] unknown field [key]'}], 'type': 'x_content_parse_exception', 'reason': '[1:2] [reindex] unknown field [key]'}, 'status': 400}
    if jsonResponse.get('failures'):
      print(jsonResponse.get('failures')[0].get('cause').get("reason"))
      print("I need to set dynamic mapping")
      r2 = requests.put(f'https://opensearch.example.com:9200/{strIndexName}/_mapping', json={"dynamic":"true"}, auth = HTTPBasicAuth('something', 'something'), verify=False)
      print(r2.json)
      r3 = requests.post('https://opensearch.example.com:9200/_reindex', json=jsonReindexItem, auth = HTTPBasicAuth('something', 'something), verify=False)
      print(r.json())
      print(f"{strIndexName}\t{r3.status_code}\t{r.json()}\n")
      f.write(f"{strIndexName}\t{r3.status_code}\t{r.json()}\n")

  elif r.status_code == 200:
    print(jsonResponse)
    print(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")
    f.write(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")
  else:
    print(f"HTTP Error: {r.status_code} on web call")
    print(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")
    f.write(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")

f.close()

ElasticSearch to OpenSearch Migration: Creating Index Templates

Prior to creating the indices, I need to create the index templates.

import requests
from requests.auth import HTTPBasicAuth
import json
from time import sleep

def serialize_sets(obj):
        if isinstance(obj, set):
                return list(obj)
        return obj

listIgnoredTemplates = ['.watch-history', '.watch-history-1', '.watch-history-2', '.watch-history-3', '.watch-history-4', '.watch-history-5', '.watch-history-6', '.watch-history-7', '.watch-history-8', '.watch-history-9', '.watch-history-10', '.watch-history-11', 'ilm-history', 'ilm-history_2', 'tenant_template', '.monitoring-logstash']

# Get all roles from prod & list users in those roles
r = requests.get(f"https://elasticsearch.example.com:9200/_template", auth = HTTPBasicAuth('something', 'something'), verify=False)

dictAllTemplates= r.json()

for item in dictAllTemplates.items():
        if item[0] not in listIgnoredTemplates:
                if item[1].get('settings').get('index'):
                        iShards = (item[1].get('settings').get('index').get('number_of_shards'))
                        iReplicas = (item[1].get('settings').get('index').get('number_of_replicas'))
                else:
                        iShards = 3
                        iReplicas = 1
                if iShards is None:
                        iShards = 3
                if iReplicas is None:
                        iReplicas = 1
                if item[1].get('settings').get('index') and item[1].get('settings').get('index').get('lifecycle'):
                        jsonAddTemplate = {
                                 "index_patterns": item[1].get('index_patterns'),
                                  "template": {
                                    "aliases": {
                                      item[1].get('settings').get('index').get('lifecycle').get('rollover_alias'): {}
                                    },
                                    "settings": {
                                      "number_of_shards": iShards,
                                      "number_of_replicas": iReplicas
                                    },
                                    "mappings":        item[1].get('mappings')
                                    }
                                  }
                else:
                        jsonAddTemplate = {
                                  "index_patterns": item[1].get('index_patterns'),
                                  "template": {
                                    "settings": {
                                      "number_of_shards": iShards,
                                      "number_of_replicas": iReplicas
                                    },
                                    "mappings":         item[1].get('mappings')
                                    }
                                  }
                r2 = requests.put(f"https://opensearch.example.com:9200/_index_template/{item[0]}", json=jsonAddTemplate, auth = HTTPBasicAuth('something', 'something'), verify=False)
                print(r2.text)
                print(r2.status_code)
                sleep(2)

Another pretty buggy bug

We spent a morning trying to figure out why containers in a new installation of Swarm just couldn’t talk to each other. Overlay network looked fine. Firewall looked fine. You could get from the host to the container, just not from the container to a container on the other server. So … here’s a bug where your swarm (i.e. the thing you do when you want docker stuff to run across more than one server) cannot actually, ya know, talk to the other servers. Sigh!

https://github.com/moby/moby/issues/41775