Master security automation with cron jobs. Comprehensive guide covering vulnerability scanning, log analysis, compliance monitoring, intrusion detection, and automated security response.
What You'll Learn
- Automated vulnerability scanning and assessment
- Security log analysis and threat detection
- Compliance monitoring and reporting
- Intrusion detection and response automation
- Certificate management and SSL monitoring
- Security patch management automation
Vulnerability Scanning Automation
Automated Security Scans
# Vulnerability scanning schedule
# Daily network scan at 2 AM
0 2 * * * /opt/security/network_scan.sh
# Weekly full vulnerability assessment on Sunday at 3 AM
0 3 * * 0 /opt/security/full_vuln_scan.py
# Hourly web application security scan
0 * * * * /opt/security/web_app_scan.sh
# Monthly compliance scan on 1st at 4 AM
0 4 1 * * /opt/security/compliance_scan.py
Comprehensive Vulnerability Scanner
#!/usr/bin/env python3
# /opt/security/full_vuln_scan.py
import subprocess
import json
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime
import logging
import requests
import xml.etree.ElementTree as ET
class VulnerabilityScanner:
def __init__(self):
self.targets = [
'192.168.1.0/24',
'web.company.com',
'api.company.com'
]
self.slack_webhook = 'https://hooks.slack.com/services/YOUR/WEBHOOK'
self.email_recipients = ['security@company.com', 'devops@company.com']
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/security-scan.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def run_nmap_scan(self, target):
"""Run comprehensive Nmap scan"""
self.logger.info(f"Starting Nmap scan for {target}")
cmd = [
'nmap', '-sS', '-sV', '-O', '--script=vuln',
'-oX', f'/tmp/nmap_{target.replace("/", "_")}.xml',
target
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=3600)
if result.returncode == 0:
return self.parse_nmap_results(f'/tmp/nmap_{target.replace("/", "_")}.xml')
else:
self.logger.error(f"Nmap scan failed for {target}: {result.stderr}")
return []
except subprocess.TimeoutExpired:
self.logger.error(f"Nmap scan timed out for {target}")
return []
def parse_nmap_results(self, xml_file):
"""Parse Nmap XML results"""
vulnerabilities = []
try:
tree = ET.parse(xml_file)
root = tree.getroot()
for host in root.findall('host'):
ip = host.find('address').get('addr')
for port in host.findall('.//port'):
port_num = port.get('portid')
service = port.find('service')
if service is not None:
service_name = service.get('name', 'unknown')
version = service.get('version', 'unknown')
# Check for script results (vulnerabilities)
for script in port.findall('.//script'):
if 'vuln' in script.get('id', ''):
vulnerabilities.append({
'host': ip,
'port': port_num,
'service': service_name,
'version': version,
'vulnerability': script.get('output', '')
})
except Exception as e:
self.logger.error(f"Error parsing Nmap results: {e}")
return vulnerabilities
def run_nikto_scan(self, target):
"""Run Nikto web vulnerability scan"""
self.logger.info(f"Starting Nikto scan for {target}")
cmd = [
'nikto', '-h', target, '-Format', 'json',
'-output', f'/tmp/nikto_{target.replace(".", "_")}.json'
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=1800)
if result.returncode == 0:
return self.parse_nikto_results(f'/tmp/nikto_{target.replace(".", "_")}.json')
else:
self.logger.error(f"Nikto scan failed for {target}")
return []
except subprocess.TimeoutExpired:
self.logger.error(f"Nikto scan timed out for {target}")
return []
def parse_nikto_results(self, json_file):
"""Parse Nikto JSON results"""
vulnerabilities = []
try:
with open(json_file, 'r') as f:
data = json.load(f)
for vuln in data.get('vulnerabilities', []):
vulnerabilities.append({
'type': 'web',
'severity': vuln.get('severity', 'medium'),
'description': vuln.get('msg', ''),
'uri': vuln.get('uri', ''),
'method': vuln.get('method', 'GET')
})
except Exception as e:
self.logger.error(f"Error parsing Nikto results: {e}")
return vulnerabilities
def check_ssl_certificates(self, domain):
"""Check SSL certificate validity"""
self.logger.info(f"Checking SSL certificate for {domain}")
cmd = [
'openssl', 's_client', '-connect', f'{domain}:443',
'-servername', domain, '-verify_return_error'
]
try:
result = subprocess.run(cmd, input='', capture_output=True, text=True, timeout=30)
# Parse certificate expiration
cert_cmd = [
'openssl', 'x509', '-noout', '-dates'
]
cert_result = subprocess.run(cert_cmd, input=result.stdout,
capture_output=True, text=True)
if 'notAfter' in cert_result.stdout:
return {
'domain': domain,
'certificate_info': cert_result.stdout,
'valid': result.returncode == 0
}
except Exception as e:
self.logger.error(f"SSL check failed for {domain}: {e}")
return None
def generate_security_report(self, all_vulnerabilities):
"""Generate comprehensive security report"""
report = {
'scan_date': datetime.now().isoformat(),
'total_vulnerabilities': len(all_vulnerabilities),
'critical_count': len([v for v in all_vulnerabilities if v.get('severity') == 'critical']),
'high_count': len([v for v in all_vulnerabilities if v.get('severity') == 'high']),
'medium_count': len([v for v in all_vulnerabilities if v.get('severity') == 'medium']),
'low_count': len([v for v in all_vulnerabilities if v.get('severity') == 'low']),
'vulnerabilities': all_vulnerabilities
}
# Save report
with open(f'/var/reports/security_report_{datetime.now().strftime("%Y%m%d")}.json', 'w') as f:
json.dump(report, f, indent=2)
return report
def send_security_alert(self, report):
"""Send security alert if critical vulnerabilities found"""
critical_vulns = [v for v in report['vulnerabilities'] if v.get('severity') == 'critical']
if critical_vulns:
message = f"🚨 CRITICAL SECURITY ALERT\n\n"
message += f"Found {len(critical_vulns)} critical vulnerabilities:\n"
for vuln in critical_vulns[:5]: # Show first 5
message += f"• {vuln.get('description', 'Unknown vulnerability')}\n"
# Send to Slack
payload = {
'text': message,
'username': 'Security Scanner',
'icon_emoji': ':warning:'
}
requests.post(self.slack_webhook, json=payload)
# Send email
self.send_email_report(report, urgent=True)
def send_email_report(self, report, urgent=False):
"""Send email security report"""
subject = "🔒 Security Scan Report"
if urgent:
subject = "🚨 URGENT: Critical Security Vulnerabilities Detected"
html_body = f'''
<html>
<body>
<h2>Security Scan Report - {report['scan_date']}</h2>
<h3>Summary</h3>
<ul>
<li>Total Vulnerabilities: {report['total_vulnerabilities']}</li>
<li>Critical: {report['critical_count']}</li>
<li>High: {report['high_count']}</li>
<li>Medium: {report['medium_count']}</li>
<li>Low: {report['low_count']}</li>
</ul>
<h3>Action Required</h3>
<p>Please review and remediate critical and high severity vulnerabilities immediately.</p>
<p>Full report available at: /var/reports/security_report_{datetime.now().strftime("%Y%m%d")}.json</p>
</body>
</html>
'''
msg = MIMEMultipart()
msg['From'] = 'security-scanner@company.com'
msg['To'] = ', '.join(self.email_recipients)
msg['Subject'] = subject
msg.attach(MIMEText(html_body, 'html'))
# Send email (configure SMTP server)
# smtp_server.send_message(msg)
def run_scan(self):
"""Run comprehensive security scan"""
self.logger.info("Starting comprehensive security scan")
all_vulnerabilities = []
try:
# Network vulnerability scans
for target in self.targets:
if '/' in target: # Network range
vulns = self.run_nmap_scan(target)
all_vulnerabilities.extend(vulns)
else: # Web target
# Web vulnerability scan
web_vulns = self.run_nikto_scan(target)
all_vulnerabilities.extend(web_vulns)
# SSL certificate check
ssl_info = self.check_ssl_certificates(target)
if ssl_info and not ssl_info['valid']:
all_vulnerabilities.append({
'type': 'ssl',
'severity': 'high',
'description': f'SSL certificate issue for {target}',
'details': ssl_info['certificate_info']
})
# Generate report
report = self.generate_security_report(all_vulnerabilities)
# Send alerts if needed
self.send_security_alert(report)
self.logger.info(f"Security scan completed. Found {len(all_vulnerabilities)} vulnerabilities")
except Exception as e:
self.logger.error(f"Security scan failed: {e}")
raise
if __name__ == "__main__":
scanner = VulnerabilityScanner()
scanner.run_scan()
Security Log Analysis
Automated Threat Detection
#!/usr/bin/env python3
# /opt/security/analyze_logs.py
import re
import json
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import geoip2.database
import logging
import requests
class SecurityLogAnalyzer:
def __init__(self):
self.log_files = [
'/var/log/auth.log',
'/var/log/nginx/access.log',
'/var/log/apache2/access.log',
'/var/log/fail2ban.log'
]
self.threat_patterns = {
'sql_injection': [
r"(union.*select|select.*from|insert.*into|drop.*table)",
r"('.*or.*'|".*or.*")",
r"(exec(|eval(|system()"
],
'xss_attempt': [
r"<script.*?>.*?</script>",
r"javascript:",
r"on(load|error|click)="
],
'brute_force': [
r"Failed password for",
r"authentication failure",
r"Invalid user"
],
'directory_traversal': [
r"../",
r"..\\",
r"/etc/passwd"
]
}
self.suspicious_ips = set()
self.geoip_reader = geoip2.database.Reader('/opt/GeoLite2-City.mmdb')
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def analyze_auth_logs(self):
"""Analyze authentication logs for suspicious activity"""
failed_attempts = defaultdict(list)
successful_logins = []
try:
with open('/var/log/auth.log', 'r') as f:
for line in f:
# Failed SSH attempts
if 'Failed password' in line:
match = re.search(r'Failed password for (\w+) from ([\d.]+)', line)
if match:
user, ip = match.groups()
timestamp = self.extract_timestamp(line)
failed_attempts[ip].append({
'user': user,
'timestamp': timestamp
})
# Successful logins
elif 'Accepted password' in line:
match = re.search(r'Accepted password for (\w+) from ([\d.]+)', line)
if match:
user, ip = match.groups()
timestamp = self.extract_timestamp(line)
successful_logins.append({
'user': user,
'ip': ip,
'timestamp': timestamp
})
except FileNotFoundError:
self.logger.warning("Auth log file not found")
return failed_attempts, successful_logins
def analyze_web_logs(self):
"""Analyze web server logs for attacks"""
attacks = defaultdict(list)
for log_file in ['/var/log/nginx/access.log', '/var/log/apache2/access.log']:
try:
with open(log_file, 'r') as f:
for line in f:
for attack_type, patterns in self.threat_patterns.items():
for pattern in patterns:
if re.search(pattern, line, re.IGNORECASE):
ip = self.extract_ip_from_log(line)
if ip:
attacks[attack_type].append({
'ip': ip,
'log_line': line.strip(),
'timestamp': datetime.now().isoformat()
})
self.suspicious_ips.add(ip)
except FileNotFoundError:
continue
return attacks
def extract_ip_from_log(self, log_line):
"""Extract IP address from log line"""
ip_pattern = r'([\d]{1,3}\.[\d]{1,3}\.[\d]{1,3}\.[\d]{1,3})'
match = re.search(ip_pattern, log_line)
return match.group(1) if match else None
def extract_timestamp(self, log_line):
"""Extract timestamp from log line"""
# Simple timestamp extraction - adjust based on log format
timestamp_pattern = r'(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})'
match = re.search(timestamp_pattern, log_line)
return match.group(1) if match else datetime.now().isoformat()
def geolocate_ips(self, ip_list):
"""Get geolocation information for IP addresses"""
geo_info = {}
for ip in ip_list:
try:
response = self.geoip_reader.city(ip)
geo_info[ip] = {
'country': response.country.name,
'city': response.city.name,
'latitude': float(response.location.latitude),
'longitude': float(response.location.longitude)
}
except Exception:
geo_info[ip] = {'country': 'Unknown', 'city': 'Unknown'}
return geo_info
def check_threat_intelligence(self, ip_list):
"""Check IPs against threat intelligence feeds"""
malicious_ips = []
# Example: Check against AbuseIPDB (requires API key)
api_key = 'YOUR_ABUSEIPDB_API_KEY'
for ip in ip_list:
try:
url = 'https://api.abuseipdb.com/api/v2/check'
headers = {
'Key': api_key,
'Accept': 'application/json'
}
params = {
'ipAddress': ip,
'maxAgeInDays': 90,
'verbose': ''
}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
if data['data']['abuseConfidencePercentage'] > 50:
malicious_ips.append({
'ip': ip,
'confidence': data['data']['abuseConfidencePercentage'],
'usage_type': data['data']['usageType'],
'country': data['data']['countryCode']
})
except Exception as e:
self.logger.warning(f"Threat intelligence check failed for {ip}: {e}")
return malicious_ips
def detect_anomalies(self, failed_attempts):
"""Detect anomalous login patterns"""
anomalies = []
for ip, attempts in failed_attempts.items():
# Brute force detection
if len(attempts) > 10: # More than 10 failed attempts
anomalies.append({
'type': 'brute_force',
'ip': ip,
'attempt_count': len(attempts),
'users_targeted': list(set([a['user'] for a in attempts]))
})
# Dictionary attack detection
unique_users = set([a['user'] for a in attempts])
if len(unique_users) > 5: # Targeting multiple users
anomalies.append({
'type': 'dictionary_attack',
'ip': ip,
'users_targeted': list(unique_users)
})
return anomalies
def generate_threat_report(self, failed_attempts, web_attacks, anomalies, malicious_ips):
"""Generate comprehensive threat report"""
report = {
'timestamp': datetime.now().isoformat(),
'summary': {
'total_failed_attempts': sum(len(attempts) for attempts in failed_attempts.values()),
'unique_attacking_ips': len(failed_attempts),
'web_attacks': sum(len(attacks) for attacks in web_attacks.values()),
'anomalies_detected': len(anomalies),
'known_malicious_ips': len(malicious_ips)
},
'failed_login_attempts': dict(failed_attempts),
'web_attacks': dict(web_attacks),
'anomalies': anomalies,
'malicious_ips': malicious_ips,
'geolocation': self.geolocate_ips(list(self.suspicious_ips))
}
# Save report
report_file = f'/var/reports/threat_report_{datetime.now().strftime("%Y%m%d_%H%M")}.json'
with open(report_file, 'w') as f:
json.dump(report, f, indent=2)
return report
def send_security_alerts(self, report):
"""Send security alerts for high-priority threats"""
alerts = []
# High-priority conditions
if report['summary']['anomalies_detected'] > 0:
alerts.append(f"🚨 {report['summary']['anomalies_detected']} security anomalies detected")
if report['summary']['known_malicious_ips'] > 0:
alerts.append(f"⚠️ {report['summary']['known_malicious_ips']} known malicious IPs detected")
if report['summary']['total_failed_attempts'] > 100:
alerts.append(f"📊 High volume of failed login attempts: {report['summary']['total_failed_attempts']}")
if alerts:
message = "Security Alert Summary:\n" + "\n".join(alerts)
# Send to Slack
payload = {
'text': message,
'username': 'Security Monitor',
'icon_emoji': ':shield:'
}
requests.post('https://hooks.slack.com/services/YOUR/WEBHOOK', json=payload)
def run_analysis(self):
"""Run complete log analysis"""
self.logger.info("Starting security log analysis")
try:
# Analyze authentication logs
failed_attempts, successful_logins = self.analyze_auth_logs()
# Analyze web server logs
web_attacks = self.analyze_web_logs()
# Detect anomalies
anomalies = self.detect_anomalies(failed_attempts)
# Check threat intelligence
all_suspicious_ips = list(self.suspicious_ips) + list(failed_attempts.keys())
malicious_ips = self.check_threat_intelligence(all_suspicious_ips)
# Generate report
report = self.generate_threat_report(failed_attempts, web_attacks, anomalies, malicious_ips)
# Send alerts
self.send_security_alerts(report)
self.logger.info("Security log analysis completed")
except Exception as e:
self.logger.error(f"Log analysis failed: {e}")
raise
if __name__ == "__main__":
analyzer = SecurityLogAnalyzer()
analyzer.run_analysis()
Compliance Monitoring
Automated Compliance Checks
# Compliance monitoring schedule
# Daily PCI DSS compliance check at 6 AM
0 6 * * * /opt/security/pci_compliance_check.py
# Weekly GDPR compliance audit on Monday at 7 AM
0 7 * * 1 /opt/security/gdpr_compliance_audit.py
# Monthly SOX compliance report on 1st at 8 AM
0 8 1 * * /opt/security/sox_compliance_report.py
# Daily file integrity monitoring
0 */4 * * * /opt/security/file_integrity_check.sh
Intrusion Detection and Response
Automated Incident Response
#!/usr/bin/env python3
# /opt/security/intrusion_response.py
import subprocess
import json
import time
from datetime import datetime
import logging
import requests
class IntrusionResponseSystem:
def __init__(self):
self.blocked_ips = set()
self.quarantine_dir = '/var/quarantine'
self.incident_log = '/var/log/security-incidents.log'
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def block_malicious_ip(self, ip_address, reason):
"""Block IP address using iptables"""
self.logger.info(f"Blocking IP {ip_address}: {reason}")
try:
# Add to iptables
cmd = ['iptables', '-A', 'INPUT', '-s', ip_address, '-j', 'DROP']
subprocess.run(cmd, check=True)
# Add to fail2ban if available
fail2ban_cmd = ['fail2ban-client', 'set', 'sshd', 'banip', ip_address]
subprocess.run(fail2ban_cmd, check=False)
self.blocked_ips.add(ip_address)
# Log incident
incident = {
'timestamp': datetime.now().isoformat(),
'action': 'ip_blocked',
'ip': ip_address,
'reason': reason
}
with open(self.incident_log, 'a') as f:
f.write(json.dumps(incident) + '\n')
except subprocess.CalledProcessError as e:
self.logger.error(f"Failed to block IP {ip_address}: {e}")
def quarantine_file(self, file_path, reason):
"""Quarantine suspicious file"""
self.logger.info(f"Quarantining file {file_path}: {reason}")
try:
import shutil
import os
# Create quarantine directory if it doesn't exist
os.makedirs(self.quarantine_dir, exist_ok=True)
# Move file to quarantine
quarantine_path = os.path.join(
self.quarantine_dir,
f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{os.path.basename(file_path)}"
)
shutil.move(file_path, quarantine_path)
# Log incident
incident = {
'timestamp': datetime.now().isoformat(),
'action': 'file_quarantined',
'original_path': file_path,
'quarantine_path': quarantine_path,
'reason': reason
}
with open(self.incident_log, 'a') as f:
f.write(json.dumps(incident) + '\n')
except Exception as e:
self.logger.error(f"Failed to quarantine file {file_path}: {e}")
def send_incident_alert(self, incident_type, details):
"""Send immediate incident alert"""
message = f"🚨 SECURITY INCIDENT DETECTED\n\n"
message += f"Type: {incident_type}\n"
message += f"Time: {datetime.now().isoformat()}\n"
message += f"Details: {details}\n"
# Send to Slack
payload = {
'text': message,
'username': 'Security Incident Response',
'icon_emoji': ':rotating_light:'
}
requests.post('https://hooks.slack.com/services/YOUR/WEBHOOK', json=payload)
def monitor_system_integrity(self):
"""Monitor system file integrity"""
critical_files = [
'/etc/passwd',
'/etc/shadow',
'/etc/sudoers',
'/etc/ssh/sshd_config'
]
for file_path in critical_files:
try:
# Check file permissions
import stat
import os
file_stat = os.stat(file_path)
file_mode = stat.filemode(file_stat.st_mode)
# Alert on unexpected permissions
if file_path == '/etc/shadow' and file_mode != '-rw-r-----':
self.send_incident_alert(
'File Permission Change',
f'{file_path} has unexpected permissions: {file_mode}'
)
except Exception as e:
self.logger.error(f"Error checking {file_path}: {e}")
def run_response_check(self):
"""Run automated response checks"""
self.logger.info("Running intrusion detection and response")
try:
# Monitor system integrity
self.monitor_system_integrity()
# Check for new incidents in logs
self.analyze_recent_incidents()
except Exception as e:
self.logger.error(f"Response check failed: {e}")
if __name__ == "__main__":
response_system = IntrusionResponseSystem()
response_system.run_response_check()
Certificate Management
Automated Certificate Monitoring
# Certificate management schedule
# Daily SSL certificate expiration check at 9 AM
0 9 * * * /opt/security/check_ssl_expiration.py
# Weekly certificate renewal attempt on Sunday at 10 AM
0 10 * * 0 /opt/security/renew_certificates.sh
# Monthly certificate audit on 1st at 11 AM
0 11 1 * * /opt/security/certificate_audit.py
# Daily TLS configuration check
0 12 * * * /opt/security/tls_config_check.py
Security Patch Management
Automated Patch Deployment
#!/bin/bash
# /opt/security/security_patch_management.sh
set -e
LOG_FILE="/var/log/security-patches.log"
SLACK_WEBHOOK="https://hooks.slack.com/services/YOUR/WEBHOOK"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a $LOG_FILE
}
send_slack_notification() {
local message="$1"
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"$message\"}" \
$SLACK_WEBHOOK
}
# Check for security updates
log "Checking for security updates"
if command -v apt &> /dev/null; then
# Debian/Ubuntu
apt update
SECURITY_UPDATES=$(apt list --upgradable 2>/dev/null | grep -i security | wc -l)
if [ $SECURITY_UPDATES -gt 0 ]; then
log "Found $SECURITY_UPDATES security updates"
# Install security updates
DEBIAN_FRONTEND=noninteractive apt-get -y upgrade \
-o Dpkg::Options::="--force-confdef" \
-o Dpkg::Options::="--force-confold"
send_slack_notification "✅ Installed $SECURITY_UPDATES security updates on $(hostname)"
else
log "No security updates available"
fi
elif command -v yum &> /dev/null; then
# RHEL/CentOS
yum check-update --security
SECURITY_UPDATES=$?
if [ $SECURITY_UPDATES -eq 100 ]; then
log "Security updates available, installing..."
yum update -y --security
send_slack_notification "✅ Installed security updates on $(hostname)"
else
log "No security updates available"
fi
fi
# Check if reboot is required
if [ -f /var/run/reboot-required ]; then
log "System reboot required after updates"
send_slack_notification "⚠️ System $(hostname) requires reboot after security updates"
fi
log "Security patch check completed"
Conclusion
Security automation with cron jobs provides continuous protection against evolving threats. By implementing these comprehensive automation strategies, you can maintain robust security posture, ensure compliance, and respond quickly to security incidents.
Remember to regularly review and update your security automation scripts, monitor for false positives, and maintain proper access controls for all automated security processes.
Secure Your Infrastructure
Start implementing security automation with our cron expression generator.