Sumo Logic: Validating Collector Data Sources via API

This script is an example of using the Sumo Logic API to retrieve collector details. This particular script looks for Linux servers and validates that each collector has the desired log sources defined. Those that do not contain all desired sources are denoted for farther investigation.

import requests
from requests.auth import HTTPBasicAuth
import pandas as pd
from config import access_id, access_key  # Import your credentials from config.py

# Base URL for Sumo Logic API
base_url = 'https://api.sumologic.com/api/v1'

def get_all_collectors():
    """Retrieve all collectors with pagination support."""
    collectors = []
    limit = 1000  # Adjust as needed; check API docs for max limit
    offset = 0

    while True:
        url = f'{base_url}/collectors?limit={limit}&offset={offset}'
        response = requests.get(url, auth=HTTPBasicAuth(access_id, access_key))
        if response.status_code == 200:
            result = response.json()
            collectors.extend(result.get('collectors', []))
            if len(result.get('collectors', [])) < limit:
                break  # Exit the loop if we received fewer than the limit, meaning it's the last page
            offset += limit
        else:
            print('Error fetching collectors:', response.status_code, response.text)
            break

    return collectors

def get_sources(collector_id):
    """Retrieve sources for a specific collector."""
    url = f'{base_url}/collectors/{collector_id}/sources'
    response = requests.get(url, auth=HTTPBasicAuth(access_id, access_key))
    if response.status_code == 200:
        sources = response.json().get('sources', [])
        # print(f"Log Sources for collector {collector_id}: {sources}")
        return sources
    else:
        print(f'Error fetching sources for collector {collector_id}:', response.status_code, response.text)
        return []

def check_required_logs(sources):
    """Check if the required logs are present in the sources."""
    required_logs = {
        '_security_events': False,
        '_linux_system_events': False,
        'cron_logs': False,
        'dnf_rpm_logs': False
    }

    for source in sources:
        if source['sourceType'] == 'LocalFile':
            name = source.get('name', '')
            for key in required_logs.keys():
                if name.endswith(key):
                    required_logs[key] = True

    # Determine missing logs
    missing_logs = {log: "MISSING" if not present else "" for log, present in required_logs.items()}
    return missing_logs

# Main execution
if __name__ == "__main__":
    collectors = get_all_collectors()
    report_data = []

    for collector in collectors:
        # Check if the collector's osName is 'Linux'
        if collector.get('osName') == 'Linux':
            collector_id = collector['id']
            collector_name = collector['name']
            print(f"Checking Linux Collector: ID: {collector_id}, Name: {collector_name}")

            sources = get_sources(collector_id)
            missing_logs = check_required_logs(sources)
            if any(missing_logs.values()):
                report_entry = {
                    "Collector Name": collector_name,
                    "_security_events": missing_logs['_security_events'],
                    "_linux_system_events": missing_logs['_linux_system_events'],
                    "cron_logs": missing_logs['cron_logs'],
                    "dnf_rpm_logs": missing_logs['dnf_rpm_logs']
                }
                # print(f"Missing logs for collector {collector_name}: {report_entry}")
                report_data.append(report_entry)

    # Create a DataFrame and write to Excel
    df = pd.DataFrame(report_data, columns=[
        "Collector Name", "_security_events", "_linux_system_events", "cron_logs", "dnf_rpm_logs"
    ])

    # Generate the filename with current date and time
    if not df.empty:
        timestamp = pd.Timestamp.now().strftime("%Y%m%d-%H%M")
        output_file = f"{timestamp}-missing_logs_report.xlsx"
        df.to_excel(output_file, index=False)
        print(f"\nData written to {output_file}")
    else:
        print("\nAll collectors have the required logs.")

Leave a Reply

Your email address will not be published. Required fields are marked *