This script is an example of using the Sumo Logic API to retrieve collector details. This particular script looks for Linux servers and validates that each collector has the desired log sources defined. Those that do not contain all desired sources are denoted for farther investigation.
import requests
from requests.auth import HTTPBasicAuth
import pandas as pd
from config import access_id, access_key # Import your credentials from config.py
# Base URL for Sumo Logic API
base_url = 'https://api.sumologic.com/api/v1'
def get_all_collectors():
"""Retrieve all collectors with pagination support."""
collectors = []
limit = 1000 # Adjust as needed; check API docs for max limit
offset = 0
while True:
url = f'{base_url}/collectors?limit={limit}&offset={offset}'
response = requests.get(url, auth=HTTPBasicAuth(access_id, access_key))
if response.status_code == 200:
result = response.json()
collectors.extend(result.get('collectors', []))
if len(result.get('collectors', [])) < limit:
break # Exit the loop if we received fewer than the limit, meaning it's the last page
offset += limit
else:
print('Error fetching collectors:', response.status_code, response.text)
break
return collectors
def get_sources(collector_id):
"""Retrieve sources for a specific collector."""
url = f'{base_url}/collectors/{collector_id}/sources'
response = requests.get(url, auth=HTTPBasicAuth(access_id, access_key))
if response.status_code == 200:
sources = response.json().get('sources', [])
# print(f"Log Sources for collector {collector_id}: {sources}")
return sources
else:
print(f'Error fetching sources for collector {collector_id}:', response.status_code, response.text)
return []
def check_required_logs(sources):
"""Check if the required logs are present in the sources."""
required_logs = {
'_security_events': False,
'_linux_system_events': False,
'cron_logs': False,
'dnf_rpm_logs': False
}
for source in sources:
if source['sourceType'] == 'LocalFile':
name = source.get('name', '')
for key in required_logs.keys():
if name.endswith(key):
required_logs[key] = True
# Determine missing logs
missing_logs = {log: "MISSING" if not present else "" for log, present in required_logs.items()}
return missing_logs
# Main execution
if __name__ == "__main__":
collectors = get_all_collectors()
report_data = []
for collector in collectors:
# Check if the collector's osName is 'Linux'
if collector.get('osName') == 'Linux':
collector_id = collector['id']
collector_name = collector['name']
print(f"Checking Linux Collector: ID: {collector_id}, Name: {collector_name}")
sources = get_sources(collector_id)
missing_logs = check_required_logs(sources)
if any(missing_logs.values()):
report_entry = {
"Collector Name": collector_name,
"_security_events": missing_logs['_security_events'],
"_linux_system_events": missing_logs['_linux_system_events'],
"cron_logs": missing_logs['cron_logs'],
"dnf_rpm_logs": missing_logs['dnf_rpm_logs']
}
# print(f"Missing logs for collector {collector_name}: {report_entry}")
report_data.append(report_entry)
# Create a DataFrame and write to Excel
df = pd.DataFrame(report_data, columns=[
"Collector Name", "_security_events", "_linux_system_events", "cron_logs", "dnf_rpm_logs"
])
# Generate the filename with current date and time
if not df.empty:
timestamp = pd.Timestamp.now().strftime("%Y%m%d-%H%M")
output_file = f"{timestamp}-missing_logs_report.xlsx"
df.to_excel(output_file, index=False)
print(f"\nData written to {output_file}")
else:
print("\nAll collectors have the required logs.")