Tag: sumo logic

Sumo Logic: Creating Roles via API

This script creates very basic roles with no extra capabilities and restricts the role to viewing only the indicated source category’s data.

################################################################################
# This script reads an Excel file containing role data, then uses the Sumo Logic
# API to create roles based on the data. It checks each row for a role name and
# uses the source category to set data filters. The script requires a config.py
# file with access credentials.
################################################################################
import pandas as pd
import requests
import json
from config import access_id, access_key  # Import credentials from config.py

# Path to Excel file
excel_file_path = 'NewRoles.xlsx'

# Base URL for Sumo Logic API
base_url = 'https://api.sumologic.com/api/v1'

################################################################################
# Function to create a new role using the Sumo Logic API.
# 
# Args:
#     role_name (str): The name of the role to create.
#     role_description (str): The description of the role.
#     source_category (str): The source category to restrict the role to.
# 
# Returns:
#     None. Prints the status of the API call.
################################################################################
def create_role(role_name, role_description, source_category):
    
    url = f'{base_url}/roles'

    # Role payload
    data_filter = f'_sourceCategory={source_category}'
    payload = {
        'name': role_name,
        'description': role_description,
        'logAnalyticsDataFilter': data_filter,
        'auditDataFilter': data_filter,
        'securityDataFilter': data_filter
    }

    # Headers for the request
    headers = {
        'Content-Type': 'application/json',
        'Accept': 'application/json'
    }

    # Debugging line
    print(f"Attempting to create role: '{role_name}' with description: '{role_description}' and filter: '{data_filter}'")

    # Make the POST request to create a new role
    response = requests.post(url, auth=(access_id, access_key), headers=headers, data=json.dumps(payload))

    # Check the response
    if response.status_code == 201:
        print(f'Role {role_name} created successfully.')
    else:
        print(f'Failed to create role {role_name}. Status Code: {response.status_code}')
        print('Response:', response.json())

################################################################################
# Reads an Excel file and processes each row to extract role information and
# create roles using the Sumo Logic API.
# 
# Args:
#     file_path (str): The path to the Excel file containing role data.
# 
# Returns:
#     None. Processes the file and attempts to create roles based on the data.
################################################################################
def process_excel(file_path):
    # Load the spreadsheet
    df = pd.read_excel(file_path, engine='openpyxl')

    # Print column names to help debug and find correct ones
    print("Columns found in Excel:", df.columns)

    # Iterate over each row in the DataFrame
    for index, row in df.iterrows():
        role_name = row['Role Name']  # Correct column name for role name
        source_category = row['Source Category']  # Correct column name for source category to which role is restricted

        # Only create a role if the role name is not null
        if pd.notnull(role_name):
            role_description = f'Provides access to source category {source_category}'
            create_role(role_name, role_description, source_category)


# Process the Excel file
process_excel(excel_file_path)

Sumo Logic: Validating Collector Data Sources via API

This script is an example of using the Sumo Logic API to retrieve collector details. This particular script looks for Linux servers and validates that each collector has the desired log sources defined. Those that do not contain all desired sources are denoted for farther investigation.

import requests
from requests.auth import HTTPBasicAuth
import pandas as pd
from config import access_id, access_key  # Import your credentials from config.py

# Base URL for Sumo Logic API
base_url = 'https://api.sumologic.com/api/v1'

def get_all_collectors():
    """Retrieve all collectors with pagination support."""
    collectors = []
    limit = 1000  # Adjust as needed; check API docs for max limit
    offset = 0

    while True:
        url = f'{base_url}/collectors?limit={limit}&offset={offset}'
        response = requests.get(url, auth=HTTPBasicAuth(access_id, access_key))
        if response.status_code == 200:
            result = response.json()
            collectors.extend(result.get('collectors', []))
            if len(result.get('collectors', [])) < limit:
                break  # Exit the loop if we received fewer than the limit, meaning it's the last page
            offset += limit
        else:
            print('Error fetching collectors:', response.status_code, response.text)
            break

    return collectors

def get_sources(collector_id):
    """Retrieve sources for a specific collector."""
    url = f'{base_url}/collectors/{collector_id}/sources'
    response = requests.get(url, auth=HTTPBasicAuth(access_id, access_key))
    if response.status_code == 200:
        sources = response.json().get('sources', [])
        # print(f"Log Sources for collector {collector_id}: {sources}")
        return sources
    else:
        print(f'Error fetching sources for collector {collector_id}:', response.status_code, response.text)
        return []

def check_required_logs(sources):
    """Check if the required logs are present in the sources."""
    required_logs = {
        '_security_events': False,
        '_linux_system_events': False,
        'cron_logs': False,
        'dnf_rpm_logs': False
    }

    for source in sources:
        if source['sourceType'] == 'LocalFile':
            name = source.get('name', '')
            for key in required_logs.keys():
                if name.endswith(key):
                    required_logs[key] = True

    # Determine missing logs
    missing_logs = {log: "MISSING" if not present else "" for log, present in required_logs.items()}
    return missing_logs

# Main execution
if __name__ == "__main__":
    collectors = get_all_collectors()
    report_data = []

    for collector in collectors:
        # Check if the collector's osName is 'Linux'
        if collector.get('osName') == 'Linux':
            collector_id = collector['id']
            collector_name = collector['name']
            print(f"Checking Linux Collector: ID: {collector_id}, Name: {collector_name}")

            sources = get_sources(collector_id)
            missing_logs = check_required_logs(sources)
            if any(missing_logs.values()):
                report_entry = {
                    "Collector Name": collector_name,
                    "_security_events": missing_logs['_security_events'],
                    "_linux_system_events": missing_logs['_linux_system_events'],
                    "cron_logs": missing_logs['cron_logs'],
                    "dnf_rpm_logs": missing_logs['dnf_rpm_logs']
                }
                # print(f"Missing logs for collector {collector_name}: {report_entry}")
                report_data.append(report_entry)

    # Create a DataFrame and write to Excel
    df = pd.DataFrame(report_data, columns=[
        "Collector Name", "_security_events", "_linux_system_events", "cron_logs", "dnf_rpm_logs"
    ])

    # Generate the filename with current date and time
    if not df.empty:
        timestamp = pd.Timestamp.now().strftime("%Y%m%d-%H%M")
        output_file = f"{timestamp}-missing_logs_report.xlsx"
        df.to_excel(output_file, index=False)
        print(f"\nData written to {output_file}")
    else:
        print("\nAll collectors have the required logs.")

Sumo Logic: Running Queries via API

This is my base script for using the Sumo Logic API to query logs and analyze data. This particular script finds hosts sending syslog data successfully through our firewall, looks who owns the netblock (they weren’t all internal!), and checks our configuration management database (cmdb) to see if we have a host registered with the destination IP address of the syslog traffic.

import requests
from requests.auth import HTTPBasicAuth
import time
from collections import defaultdict
import cx_Oracle
import pandas as pd
import ipaddress
from datetime import datetime
from ipwhois import IPWhois
from ipwhois.exceptions import IPDefinedError

# Import credentials from a config file
from config import access_id, access_key, oracle_username, oracle_password

# Initialize Oracle Client
cx_Oracle.init_oracle_client(lib_dir=r"C:\Oracle\instantclient_21_15")
oracle_dsn = cx_Oracle.makedsn('cmdb_db.example.com', 1521, service_name='cmdb_db.example.com')

# Function to query Oracle database
def query_oracle_cmdb(strIPAddress):
    with cx_Oracle.connect(user=oracle_username, password=oracle_password, dsn=oracle_dsn) as connection:
        cursor = connection.cursor()
        query = """
            SELECT HOSTNAME, FRIENDLYNAME, STATUS, COLLECTIONTIME, RETIREDBYDISPLAYNAME, 
                    RETIREDDATETIME, SERVERAPPSUPPORTTEAM, SERVERENVIRONMENT
            FROM NBIREPORT.CHERWELL_CMDBDATA_FULL
            WHERE IPADDRESS = :ipaddy
        """
        cursor.execute(query, [strIPAddress])
        result = cursor.fetchone()
        cursor.close()
        return result if result else ("",) * 8

# Function to determine IP ownership
def get_ip_ownership(ip):
    # Define internal IP ranges
    internal_networks = [
        ipaddress.IPv4Network("10.0.0.0/8"),
        ipaddress.IPv4Network("172.16.0.0/12"),
        ipaddress.IPv4Network("192.168.0.0/16")
    ]
    
    # Check if the IP is internal
    ip_obj = ipaddress.IPv4Address(ip)
    if any(ip_obj in network for network in internal_networks):
        return "INTERNAL"
    
    # For external IPs, use ipwhois to get ownership info
    try:
        obj = IPWhois(ip)
        result = obj.lookup_rdap(depth=1)
        ownership = result['network']['name']
    except IPDefinedError:
        ownership = "Reserved IP"
    except Exception as e:
        print(f"Error looking up IP {ip}: {e}")
        ownership = "UNKNOWN"
    
    return ownership

# Base URL for Sumo Logic API
base_url = 'https://api.sumologic.com/api/v1'

# Define the search query
search_query = '''
(dpt=514)
AND _sourcecategory = "observe/perimeter/firewall/logs"
| where !(act = "deny")
| where !(act = "timeout")
| where !(act = "ip-conn")
| where (proto=17 or proto=6)
| count dst, act
'''

# Function to create and manage search jobs
def run_search_job(start_time, end_time):
    search_job_data = {
        'query': search_query,
        'from': start_time,
        'to': end_time,
        'timeZone': 'UTC'
    }

    # Create a search job
    search_job_url = f'{base_url}/search/jobs'
    response = requests.post(
        search_job_url,
        auth=HTTPBasicAuth(access_id, access_key),
        json=search_job_data
    )

    if response.status_code != 202:
        print('Error starting search job:', response.status_code, response.text)
        return None

    # Get the search job ID
    job_id = response.json()['id']
    print('Search Job ID:', job_id)

    # Poll for the search job status
    job_status_url = f'{search_job_url}/{job_id}'
    while True:
        response = requests.get(job_status_url, auth=HTTPBasicAuth(access_id, access_key))
        status = response.json().get('state', None)
        print('Search Job Status:', status)
        if status in ['DONE GATHERING RESULTS', 'CANCELLED', 'FAILED']:
            break
        time.sleep(5)  # Reasonable delay to prevent overwhelming the server

    return job_id if status == 'DONE GATHERING RESULTS' else None

# Function to retrieve results of a search job
def retrieve_results(job_id):
    dst_counts = defaultdict(int)
    results_url = f'{base_url}/search/jobs/{job_id}/messages'
    offset = 0
    limit = 1000

    while True:
        params = {'offset': offset, 'limit': limit}
        try:
            response = requests.get(results_url, auth=HTTPBasicAuth(access_id, access_key), params=params, timeout=30)
            if response.status_code == 200:
                results = response.json()
                messages = results.get('messages', [])
                
                for message in messages:
                    message_map = message['map']
                    dst = message_map.get('dst')
                    if dst:
                        dst_counts[dst] += 1
                
                if len(messages) < limit:
                    break

                offset += limit
            else:
                print('Error retrieving results:', response.status_code, response.text)
                break
        except requests.exceptions.RequestException as e:
            print(f'Error during request: {e}')
            time.sleep(5)
            continue

    return dst_counts

# Main execution
if __name__ == "__main__":
    # Prompt for the start date
    start_date_input = input("Enter the start date (YYYY-MM-DD): ")
    try:
        start_time = datetime.strptime(start_date_input, "%Y-%m-%d").strftime("%Y-%m-%dT00:00:00")
    except ValueError:
        print("Invalid date format. Please enter the date in YYYY-MM-DD format.")
        exit()

    # Use today's date as the end date
    end_time = datetime.now().strftime("%Y-%m-%dT00:00:00")

    # Create a search job
    job_id = run_search_job(start_time, end_time)
    if job_id:
        # Retrieve and process results
        dst_counts = retrieve_results(job_id)

        # Prepare data for Excel
        data_for_excel = []

        print("\nDestination IP Counts and Oracle Data:")
        for dst, count in dst_counts.items():
            oracle_data = query_oracle_cmdb(dst)
            ownership = get_ip_ownership(dst)
            # Use only Oracle data columns
            combined_data = (dst, count, ownership) + oracle_data
            data_for_excel.append(combined_data)
            print(combined_data)

        # Create a DataFrame and write to Excel
        df = pd.DataFrame(data_for_excel, columns=[
            "IP Address", "Occurrence Count", "Ownership",
            "CMDB_Hostname", "CMDB_Friendly Name", "CMDB_Status", "CMDB_Collection Time", 
            "CMDB_Retired By", "CMDB_Retired Date", "CMDB_Support Team", "CMDB_Environment"
        ])

        # Generate the filename with current date and time
        timestamp = datetime.now().strftime("%Y%m%d-%H%M")
        output_file = f"{timestamp}-sumo_oracle_data.xlsx"
        df.to_excel(output_file, index=False)
        print(f"\nData written to {output_file}")
    else:
        print('Search job did not complete successfully.')