Tag: python

Confluent Kafka Queue Length

The documentation for the Python Confluent Kafka module includes a len function on the producer. I wanted to use the function because we’re getting a number of duplicated messages on the client, and I was trying to isolate what might be causing the problem. Unfortunately, calling producer.len() failed indicating there’s no len() method. I used dir(producer) to show that, no, there isn’t a len() method.

I realized today that the documentation is telling me that I can call the built-in len() function on a producer to get the queue length.

Code:

print(f"Before produce there are {len(producer)} messages awaiting delivery")
producer.produce(topic, key=bytes(str(int(cs.timestamp) ), 'utf8'), value=cs.SerializeToString() )
print(f"After produce there are {len(producer)} messages awaiting delivery")
producer.poll(0) # Per https://github.com/confluentinc/confluent-kafka-python/issues/16 for queue full error
print(f"After poll0 there are {len(producer)} messages awaiting delivery")

Output:

Before produce there are 160 messages awaiting delivery
After produce there are 161 messages awaiting delivery
After poll0 there are 155 messages awaiting delivery

Boolean Opts in Python

I have a few command line arguments on a Python script that are most readily used if they are boolean. I sometimes need a “verbose” option for script debugging — print a lot of extra stuff to show what’s going on, and I usually want a “dry run” option where the script reads data, performs calculations, and prints results to the screen without making any changes or sending data anywhere (database, email, etc). To use command line arguments as boolean values, I use a function that converts a variety of possible inputs to True/False.

def string2boolean(strInput):
    """
    :param strInput: String string to be converted to boolean
    :return: Boolean representation of input
    """
    if isinstance(strInput, bool):
        return strInput
    if strInput.lower() in ('yes', 'true', 't', 'y', '1'):
        return True
    elif strInput.lower() in ('no', 'false', 'f', 'n', '0'):
        return False
    else:
        raise argparse.ArgumentTypeError('Boolean value expected.')

Use “type” when adding the argument to run the input through your function.

    parser.add_argument('-r', '--dryrun', action='store', type=string2boolean, dest='boolDryRun', default=False, help="Preview data processing without sending data to DB or Kafka. Valid values: 'true' or 'false'.")

Google OAUTH Stuff

Reminder to self — when you set up a desktop app with OAUTH to use the Google APIs … you have to hit the authorization URL from the computer running the code. That means, for my calendar scraper, that I need to do X-redirection from the server & run the script. Firefox launches & the flow actually completes. Attempting to hit the URL from my computer yields a connection failure to the https://localhost:SomePort at the end of the workflow.

Move token.pickle to backup file, run getCalendarEvents.py with X-redirection so auth can be processed through web form.

Python: dir

I am writing this down because I never manage to remember these two super useful functions that tells you what a variable is.

iLastProcessedTimestamp = 0
with open(‘test.txt’) as f:
iLastProcessedTimestamp = int(f.readline())
print(dir(iLastProcessedTimestamp))
print(type(iLastProcessedTimestamp))

The type function tells you the variable’s class (in this case, int). The dir function tells you the attributes of the variable.

Pylint — Ignoring Errors

MS Word has an ‘ignore this error’ thing in the grammar checker that I use fairly regularly — technical writing has syntax that reads as wrong, grammatical errors for impact, or informal writing where I don’t much care for some rules of grammar … I don’t want to turn off the grammar checker, but I do want to stop seeing a squiggly line under a specific sentence that I don’t want to change. Turns out Pylint has something similar:

PIP SSL Error

Upgraded pip today, and I pretty quickly regretted it. SSL Error attempting to install anything from the Internet (and, amazingly, some things where I downloaded the wheel file). The answer is to downgrade PIP until you hit a version that doesn’t have the error. Annoying. Not sure what the latest rev I could have used was — going back one level and getting the error in loop was more time than I could devote to the project, so I just jumped back six months. Had success with 20.0.2 and left working alone.

Everything from 20.3.1 through 21.0.1 has this failure:

D:\tmp\5\pip>pip install basic_sftp
WARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(1, ‘[SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:1076)’))’: /simple/basic-sftp/
WARNING: Retrying (Retry(total=3, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(1, ‘[SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:1076)’))’: /simple/basic-sftp/
WARNING: You are using pip version 20.3.1; however, version 21.0.1 is available.
You should consider upgrading via the ‘c:\programs\anaconda3\python.exe -m pip install –upgrade pip’ command.

Python — dis

Found a cool method for testing the efficiency of different approaches to a python expression — dis disassembles the call and prints the component steps. Here, we see that there’s not much functional difference between “not a=b” and “a != b”.

Cyberark — Error Listing Accounts

I was getting an odd error from my attempt to list accounts in Cyberark — “Object reference not set to an instance of an object”. Searching the Internet yielded a lot of issues that weren’t my problem (ampersands in account names in an older version, issues with SSL {and, seriously, someone says disable SSL on the connection they use to retrieve passwords!?! And not just random someone, but RAND?!?}). My issue turned out to be that I was copy/pasting code and used requests.post instead of requests.get — attempting to POST to a GET URL generates this error too.

DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): cyberark.example.com:443
DEBUG:urllib3.connectionpool:https://cyberark.example.com:443 “POST /PasswordVault/API/auth/Cyberark/Logon HTTP/1.1” 200 182
Before request, header is {‘Content-Type’: ‘application/json’, ‘Authorization’: ‘5TQz5WVjYm5tMjBh5C00M5YyLT50MjYt5Tc2Y5I2ZDI5…AwMDA5MDA7’}
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): cyberark.example.com:443
DEBUG:urllib3.connectionpool:https://cyberark.example.com:443 “POST /PasswordVault/api/Accounts?search=sample_account&searchType=contains HTTP/1.1” 500 97
{“ErrorCode”:”CAWS00001E”,”ErrorMessage”:”Object reference not set to an instance of an object.”} 500 Internal Server Error

Python Time-Expiring Cache

I needed to post information into a SharePoint Online list. There’s an auth header required to post data, but the authentication expires every couple of minutes. Obviously, I could just get a new auth string each time … but that’s terribly inefficient. I could also use what I had and refresh it when it fails … inelegant but effective. I wanted, instead, to have a value cached for a duration slightly less than the server-side expiry on the auth string. This decorator allows me to use the cached auth header string for some period of time and actually run the function that grabs the auth header string before the string is invalidated.

import functools
import time
from datetime import datetime

def timed_cache(iMaxAge, iMaxSize=128, boolTyped=False):
    #######################################
    # LRU cache decorator with expiry time
    #
    # Args:
    #    iMaxAge: Seconds to live for cached results. 
    #    iMaxSize: Maximum cache size (see functools.lru_cache).
    #    boolTyped: Cache on distinct input types (see functools.lru_cache).
    #######################################
    def _decorator(fn):
        @functools.lru_cache(maxsize=iMaxSize, typed=boolTyped)
        def _new(*args, __time_salt, **kwargs):
            return fn(*args, **kwargs)

        @functools.wraps(fn)
        def _wrapped(*args, **kwargs):
            return _new(*args, **kwargs, __time_salt=int(time.time() / iMaxAge))
        return _wrapped

    return _decorator

# Usage example -- 23 second cache expiry
@timed_cache(23)
def slow_function(iSleepTime: int):
    datetimeStart = datetime.now()
    time.sleep(iSleepTime)
    return f"test started at {datetimeStart} and ended at at {datetime.now()}"


print(f"Start at {datetime.now()}")

for i in range(1, 50):
    print(slow_function(5))
    
    if i % 5 is 0:
        print(f"sleeping 5 at {datetime.now()}")
        time.sleep(5)
        print(f"done sleeping at {datetime.now()}\n\n")

print(f"Ended at at {datetime.now()}")

Scraping Calendar Events

We’ve learned the value of engaging with local government — with few people involved in local proceedings, it’s pretty easy for a generally unpopular proposal to seem reasonable. And then we’re all stuck with the generally unpopular regulation. It is a pain, however, to keep manually adding the next Trustee meeting. And there’s no way I’m checking the website daily to find out about any emergency meetings.

Now I’m pulling the events from their Google calendar and creating new meeting items in my Exchange calendar:

  1. Register the app with Google to use the API
  2. Install exchangelib
  3. Copy config.sample to config.py and add personal information
  4. Create a ca.crt file with the CA signing key for your Exchange server (or remove the custom adapter if your server cert is signed by a public key)
  5. Run getCalendarEvents.py and follow the URL to authorize access to your calendar

I’ve tweaked the script to grab events from the school district’s calendar in SchoolPointe too. Now we know when there’s a school board meeting or dress-up day.