This is my base script for using the Sumo Logic API to query logs and analyze data. This particular script finds hosts sending syslog data successfully through our firewall, looks who owns the netblock (they weren’t all internal!), and checks our configuration management database (cmdb) to see if we have a host registered with the destination IP address of the syslog traffic.
import requests
from requests.auth import HTTPBasicAuth
import time
from collections import defaultdict
import cx_Oracle
import pandas as pd
import ipaddress
from datetime import datetime
from ipwhois import IPWhois
from ipwhois.exceptions import IPDefinedError
# Import credentials from a config file
from config import access_id, access_key, oracle_username, oracle_password
# Initialize Oracle Client
cx_Oracle.init_oracle_client(lib_dir=r"C:\Oracle\instantclient_21_15")
oracle_dsn = cx_Oracle.makedsn('cmdb_db.example.com', 1521, service_name='cmdb_db.example.com')
# Function to query Oracle database
def query_oracle_cmdb(strIPAddress):
with cx_Oracle.connect(user=oracle_username, password=oracle_password, dsn=oracle_dsn) as connection:
cursor = connection.cursor()
query = """
SELECT HOSTNAME, FRIENDLYNAME, STATUS, COLLECTIONTIME, RETIREDBYDISPLAYNAME,
RETIREDDATETIME, SERVERAPPSUPPORTTEAM, SERVERENVIRONMENT
FROM NBIREPORT.CHERWELL_CMDBDATA_FULL
WHERE IPADDRESS = :ipaddy
"""
cursor.execute(query, [strIPAddress])
result = cursor.fetchone()
cursor.close()
return result if result else ("",) * 8
# Function to determine IP ownership
def get_ip_ownership(ip):
# Define internal IP ranges
internal_networks = [
ipaddress.IPv4Network("10.0.0.0/8"),
ipaddress.IPv4Network("172.16.0.0/12"),
ipaddress.IPv4Network("192.168.0.0/16")
]
# Check if the IP is internal
ip_obj = ipaddress.IPv4Address(ip)
if any(ip_obj in network for network in internal_networks):
return "INTERNAL"
# For external IPs, use ipwhois to get ownership info
try:
obj = IPWhois(ip)
result = obj.lookup_rdap(depth=1)
ownership = result['network']['name']
except IPDefinedError:
ownership = "Reserved IP"
except Exception as e:
print(f"Error looking up IP {ip}: {e}")
ownership = "UNKNOWN"
return ownership
# Base URL for Sumo Logic API
base_url = 'https://api.sumologic.com/api/v1'
# Define the search query
search_query = '''
(dpt=514)
AND _sourcecategory = "observe/perimeter/firewall/logs"
| where !(act = "deny")
| where !(act = "timeout")
| where !(act = "ip-conn")
| where (proto=17 or proto=6)
| count dst, act
'''
# Function to create and manage search jobs
def run_search_job(start_time, end_time):
search_job_data = {
'query': search_query,
'from': start_time,
'to': end_time,
'timeZone': 'UTC'
}
# Create a search job
search_job_url = f'{base_url}/search/jobs'
response = requests.post(
search_job_url,
auth=HTTPBasicAuth(access_id, access_key),
json=search_job_data
)
if response.status_code != 202:
print('Error starting search job:', response.status_code, response.text)
return None
# Get the search job ID
job_id = response.json()['id']
print('Search Job ID:', job_id)
# Poll for the search job status
job_status_url = f'{search_job_url}/{job_id}'
while True:
response = requests.get(job_status_url, auth=HTTPBasicAuth(access_id, access_key))
status = response.json().get('state', None)
print('Search Job Status:', status)
if status in ['DONE GATHERING RESULTS', 'CANCELLED', 'FAILED']:
break
time.sleep(5) # Reasonable delay to prevent overwhelming the server
return job_id if status == 'DONE GATHERING RESULTS' else None
# Function to retrieve results of a search job
def retrieve_results(job_id):
dst_counts = defaultdict(int)
results_url = f'{base_url}/search/jobs/{job_id}/messages'
offset = 0
limit = 1000
while True:
params = {'offset': offset, 'limit': limit}
try:
response = requests.get(results_url, auth=HTTPBasicAuth(access_id, access_key), params=params, timeout=30)
if response.status_code == 200:
results = response.json()
messages = results.get('messages', [])
for message in messages:
message_map = message['map']
dst = message_map.get('dst')
if dst:
dst_counts[dst] += 1
if len(messages) < limit:
break
offset += limit
else:
print('Error retrieving results:', response.status_code, response.text)
break
except requests.exceptions.RequestException as e:
print(f'Error during request: {e}')
time.sleep(5)
continue
return dst_counts
# Main execution
if __name__ == "__main__":
# Prompt for the start date
start_date_input = input("Enter the start date (YYYY-MM-DD): ")
try:
start_time = datetime.strptime(start_date_input, "%Y-%m-%d").strftime("%Y-%m-%dT00:00:00")
except ValueError:
print("Invalid date format. Please enter the date in YYYY-MM-DD format.")
exit()
# Use today's date as the end date
end_time = datetime.now().strftime("%Y-%m-%dT00:00:00")
# Create a search job
job_id = run_search_job(start_time, end_time)
if job_id:
# Retrieve and process results
dst_counts = retrieve_results(job_id)
# Prepare data for Excel
data_for_excel = []
print("\nDestination IP Counts and Oracle Data:")
for dst, count in dst_counts.items():
oracle_data = query_oracle_cmdb(dst)
ownership = get_ip_ownership(dst)
# Use only Oracle data columns
combined_data = (dst, count, ownership) + oracle_data
data_for_excel.append(combined_data)
print(combined_data)
# Create a DataFrame and write to Excel
df = pd.DataFrame(data_for_excel, columns=[
"IP Address", "Occurrence Count", "Ownership",
"CMDB_Hostname", "CMDB_Friendly Name", "CMDB_Status", "CMDB_Collection Time",
"CMDB_Retired By", "CMDB_Retired Date", "CMDB_Support Team", "CMDB_Environment"
])
# Generate the filename with current date and time
timestamp = datetime.now().strftime("%Y%m%d-%H%M")
output_file = f"{timestamp}-sumo_oracle_data.xlsx"
df.to_excel(output_file, index=False)
print(f"\nData written to {output_file}")
else:
print('Search job did not complete successfully.')