Security Automation with Cron Jobs: Complete Guide

Master security automation with cron jobs. Comprehensive guide covering vulnerability scanning, log analysis, compliance monitoring, and threat detection.

20 min read
Security Automation with Cron Jobs: Complete Guide

Master security automation with cron jobs. Comprehensive guide covering vulnerability scanning, log analysis, compliance monitoring, intrusion detection, and automated security response.

What You'll Learn

  • Automated vulnerability scanning and assessment
  • Security log analysis and threat detection
  • Compliance monitoring and reporting
  • Intrusion detection and response automation
  • Certificate management and SSL monitoring
  • Security patch management automation

Vulnerability Scanning Automation

Automated Security Scans

# Vulnerability scanning schedule
# Daily network scan at 2 AM
0 2 * * * /opt/security/network_scan.sh

# Weekly full vulnerability assessment on Sunday at 3 AM
0 3 * * 0 /opt/security/full_vuln_scan.py

# Hourly web application security scan
0 * * * * /opt/security/web_app_scan.sh

# Monthly compliance scan on 1st at 4 AM
0 4 1 * * /opt/security/compliance_scan.py

Comprehensive Vulnerability Scanner

#!/usr/bin/env python3
# /opt/security/full_vuln_scan.py

import subprocess
import json
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime
import logging
import requests
import xml.etree.ElementTree as ET

class VulnerabilityScanner:
    def __init__(self):
        self.targets = [
            '192.168.1.0/24',
            'web.company.com',
            'api.company.com'
        ]
        self.slack_webhook = 'https://hooks.slack.com/services/YOUR/WEBHOOK'
        self.email_recipients = ['security@company.com', 'devops@company.com']
        
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/security-scan.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
    def run_nmap_scan(self, target):
        """Run comprehensive Nmap scan"""
        self.logger.info(f"Starting Nmap scan for {target}")
        
        cmd = [
            'nmap', '-sS', '-sV', '-O', '--script=vuln',
            '-oX', f'/tmp/nmap_{target.replace("/", "_")}.xml',
            target
        ]
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=3600)
            if result.returncode == 0:
                return self.parse_nmap_results(f'/tmp/nmap_{target.replace("/", "_")}.xml')
            else:
                self.logger.error(f"Nmap scan failed for {target}: {result.stderr}")
                return []
        except subprocess.TimeoutExpired:
            self.logger.error(f"Nmap scan timed out for {target}")
            return []
            
    def parse_nmap_results(self, xml_file):
        """Parse Nmap XML results"""
        vulnerabilities = []
        
        try:
            tree = ET.parse(xml_file)
            root = tree.getroot()
            
            for host in root.findall('host'):
                ip = host.find('address').get('addr')
                
                for port in host.findall('.//port'):
                    port_num = port.get('portid')
                    service = port.find('service')
                    
                    if service is not None:
                        service_name = service.get('name', 'unknown')
                        version = service.get('version', 'unknown')
                        
                        # Check for script results (vulnerabilities)
                        for script in port.findall('.//script'):
                            if 'vuln' in script.get('id', ''):
                                vulnerabilities.append({
                                    'host': ip,
                                    'port': port_num,
                                    'service': service_name,
                                    'version': version,
                                    'vulnerability': script.get('output', '')
                                })
                                
        except Exception as e:
            self.logger.error(f"Error parsing Nmap results: {e}")
            
        return vulnerabilities
        
    def run_nikto_scan(self, target):
        """Run Nikto web vulnerability scan"""
        self.logger.info(f"Starting Nikto scan for {target}")
        
        cmd = [
            'nikto', '-h', target, '-Format', 'json',
            '-output', f'/tmp/nikto_{target.replace(".", "_")}.json'
        ]
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=1800)
            if result.returncode == 0:
                return self.parse_nikto_results(f'/tmp/nikto_{target.replace(".", "_")}.json')
            else:
                self.logger.error(f"Nikto scan failed for {target}")
                return []
        except subprocess.TimeoutExpired:
            self.logger.error(f"Nikto scan timed out for {target}")
            return []
            
    def parse_nikto_results(self, json_file):
        """Parse Nikto JSON results"""
        vulnerabilities = []
        
        try:
            with open(json_file, 'r') as f:
                data = json.load(f)
                
            for vuln in data.get('vulnerabilities', []):
                vulnerabilities.append({
                    'type': 'web',
                    'severity': vuln.get('severity', 'medium'),
                    'description': vuln.get('msg', ''),
                    'uri': vuln.get('uri', ''),
                    'method': vuln.get('method', 'GET')
                })
                
        except Exception as e:
            self.logger.error(f"Error parsing Nikto results: {e}")
            
        return vulnerabilities
        
    def check_ssl_certificates(self, domain):
        """Check SSL certificate validity"""
        self.logger.info(f"Checking SSL certificate for {domain}")
        
        cmd = [
            'openssl', 's_client', '-connect', f'{domain}:443',
            '-servername', domain, '-verify_return_error'
        ]
        
        try:
            result = subprocess.run(cmd, input='', capture_output=True, text=True, timeout=30)
            
            # Parse certificate expiration
            cert_cmd = [
                'openssl', 'x509', '-noout', '-dates'
            ]
            
            cert_result = subprocess.run(cert_cmd, input=result.stdout, 
                                       capture_output=True, text=True)
            
            if 'notAfter' in cert_result.stdout:
                return {
                    'domain': domain,
                    'certificate_info': cert_result.stdout,
                    'valid': result.returncode == 0
                }
                
        except Exception as e:
            self.logger.error(f"SSL check failed for {domain}: {e}")
            
        return None
        
    def generate_security_report(self, all_vulnerabilities):
        """Generate comprehensive security report"""
        report = {
            'scan_date': datetime.now().isoformat(),
            'total_vulnerabilities': len(all_vulnerabilities),
            'critical_count': len([v for v in all_vulnerabilities if v.get('severity') == 'critical']),
            'high_count': len([v for v in all_vulnerabilities if v.get('severity') == 'high']),
            'medium_count': len([v for v in all_vulnerabilities if v.get('severity') == 'medium']),
            'low_count': len([v for v in all_vulnerabilities if v.get('severity') == 'low']),
            'vulnerabilities': all_vulnerabilities
        }
        
        # Save report
        with open(f'/var/reports/security_report_{datetime.now().strftime("%Y%m%d")}.json', 'w') as f:
            json.dump(report, f, indent=2)
            
        return report
        
    def send_security_alert(self, report):
        """Send security alert if critical vulnerabilities found"""
        critical_vulns = [v for v in report['vulnerabilities'] if v.get('severity') == 'critical']
        
        if critical_vulns:
            message = f"🚨 CRITICAL SECURITY ALERT\n\n"
            message += f"Found {len(critical_vulns)} critical vulnerabilities:\n"
            
            for vuln in critical_vulns[:5]:  # Show first 5
                message += f"• {vuln.get('description', 'Unknown vulnerability')}\n"
                
            # Send to Slack
            payload = {
                'text': message,
                'username': 'Security Scanner',
                'icon_emoji': ':warning:'
            }
            
            requests.post(self.slack_webhook, json=payload)
            
            # Send email
            self.send_email_report(report, urgent=True)
            
    def send_email_report(self, report, urgent=False):
        """Send email security report"""
        subject = "🔒 Security Scan Report"
        if urgent:
            subject = "🚨 URGENT: Critical Security Vulnerabilities Detected"
            
        html_body = f'''
        <html>
        <body>
            <h2>Security Scan Report - {report['scan_date']}</h2>
            
            <h3>Summary</h3>
            <ul>
                <li>Total Vulnerabilities: {report['total_vulnerabilities']}</li>
                <li>Critical: {report['critical_count']}</li>
                <li>High: {report['high_count']}</li>
                <li>Medium: {report['medium_count']}</li>
                <li>Low: {report['low_count']}</li>
            </ul>
            
            <h3>Action Required</h3>
            <p>Please review and remediate critical and high severity vulnerabilities immediately.</p>
            
            <p>Full report available at: /var/reports/security_report_{datetime.now().strftime("%Y%m%d")}.json</p>
        </body>
        </html>
        '''
        
        msg = MIMEMultipart()
        msg['From'] = 'security-scanner@company.com'
        msg['To'] = ', '.join(self.email_recipients)
        msg['Subject'] = subject
        
        msg.attach(MIMEText(html_body, 'html'))
        
        # Send email (configure SMTP server)
        # smtp_server.send_message(msg)
        
    def run_scan(self):
        """Run comprehensive security scan"""
        self.logger.info("Starting comprehensive security scan")
        
        all_vulnerabilities = []
        
        try:
            # Network vulnerability scans
            for target in self.targets:
                if '/' in target:  # Network range
                    vulns = self.run_nmap_scan(target)
                    all_vulnerabilities.extend(vulns)
                else:  # Web target
                    # Web vulnerability scan
                    web_vulns = self.run_nikto_scan(target)
                    all_vulnerabilities.extend(web_vulns)
                    
                    # SSL certificate check
                    ssl_info = self.check_ssl_certificates(target)
                    if ssl_info and not ssl_info['valid']:
                        all_vulnerabilities.append({
                            'type': 'ssl',
                            'severity': 'high',
                            'description': f'SSL certificate issue for {target}',
                            'details': ssl_info['certificate_info']
                        })
                        
            # Generate report
            report = self.generate_security_report(all_vulnerabilities)
            
            # Send alerts if needed
            self.send_security_alert(report)
            
            self.logger.info(f"Security scan completed. Found {len(all_vulnerabilities)} vulnerabilities")
            
        except Exception as e:
            self.logger.error(f"Security scan failed: {e}")
            raise

if __name__ == "__main__":
    scanner = VulnerabilityScanner()
    scanner.run_scan()

Security Log Analysis

Automated Threat Detection

#!/usr/bin/env python3
# /opt/security/analyze_logs.py

import re
import json
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import geoip2.database
import logging
import requests

class SecurityLogAnalyzer:
    def __init__(self):
        self.log_files = [
            '/var/log/auth.log',
            '/var/log/nginx/access.log',
            '/var/log/apache2/access.log',
            '/var/log/fail2ban.log'
        ]
        
        self.threat_patterns = {
            'sql_injection': [
                r"(union.*select|select.*from|insert.*into|drop.*table)",
                r"('.*or.*'|".*or.*")",
                r"(exec(|eval(|system()"
            ],
            'xss_attempt': [
                r"<script.*?>.*?</script>",
                r"javascript:",
                r"on(load|error|click)="
            ],
            'brute_force': [
                r"Failed password for",
                r"authentication failure",
                r"Invalid user"
            ],
            'directory_traversal': [
                r"../",
                r"..\\",
                r"/etc/passwd"
            ]
        }
        
        self.suspicious_ips = set()
        self.geoip_reader = geoip2.database.Reader('/opt/GeoLite2-City.mmdb')
        
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        
    def analyze_auth_logs(self):
        """Analyze authentication logs for suspicious activity"""
        failed_attempts = defaultdict(list)
        successful_logins = []
        
        try:
            with open('/var/log/auth.log', 'r') as f:
                for line in f:
                    # Failed SSH attempts
                    if 'Failed password' in line:
                        match = re.search(r'Failed password for (\w+) from ([\d.]+)', line)
                        if match:
                            user, ip = match.groups()
                            timestamp = self.extract_timestamp(line)
                            failed_attempts[ip].append({
                                'user': user,
                                'timestamp': timestamp
                            })
                            
                    # Successful logins
                    elif 'Accepted password' in line:
                        match = re.search(r'Accepted password for (\w+) from ([\d.]+)', line)
                        if match:
                            user, ip = match.groups()
                            timestamp = self.extract_timestamp(line)
                            successful_logins.append({
                                'user': user,
                                'ip': ip,
                                'timestamp': timestamp
                            })
                            
        except FileNotFoundError:
            self.logger.warning("Auth log file not found")
            
        return failed_attempts, successful_logins
        
    def analyze_web_logs(self):
        """Analyze web server logs for attacks"""
        attacks = defaultdict(list)
        
        for log_file in ['/var/log/nginx/access.log', '/var/log/apache2/access.log']:
            try:
                with open(log_file, 'r') as f:
                    for line in f:
                        for attack_type, patterns in self.threat_patterns.items():
                            for pattern in patterns:
                                if re.search(pattern, line, re.IGNORECASE):
                                    ip = self.extract_ip_from_log(line)
                                    if ip:
                                        attacks[attack_type].append({
                                            'ip': ip,
                                            'log_line': line.strip(),
                                            'timestamp': datetime.now().isoformat()
                                        })
                                        self.suspicious_ips.add(ip)
                                        
            except FileNotFoundError:
                continue
                
        return attacks
        
    def extract_ip_from_log(self, log_line):
        """Extract IP address from log line"""
        ip_pattern = r'([\d]{1,3}\.[\d]{1,3}\.[\d]{1,3}\.[\d]{1,3})'
        match = re.search(ip_pattern, log_line)
        return match.group(1) if match else None
        
    def extract_timestamp(self, log_line):
        """Extract timestamp from log line"""
        # Simple timestamp extraction - adjust based on log format
        timestamp_pattern = r'(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})'
        match = re.search(timestamp_pattern, log_line)
        return match.group(1) if match else datetime.now().isoformat()
        
    def geolocate_ips(self, ip_list):
        """Get geolocation information for IP addresses"""
        geo_info = {}
        
        for ip in ip_list:
            try:
                response = self.geoip_reader.city(ip)
                geo_info[ip] = {
                    'country': response.country.name,
                    'city': response.city.name,
                    'latitude': float(response.location.latitude),
                    'longitude': float(response.location.longitude)
                }
            except Exception:
                geo_info[ip] = {'country': 'Unknown', 'city': 'Unknown'}
                
        return geo_info
        
    def check_threat_intelligence(self, ip_list):
        """Check IPs against threat intelligence feeds"""
        malicious_ips = []
        
        # Example: Check against AbuseIPDB (requires API key)
        api_key = 'YOUR_ABUSEIPDB_API_KEY'
        
        for ip in ip_list:
            try:
                url = 'https://api.abuseipdb.com/api/v2/check'
                headers = {
                    'Key': api_key,
                    'Accept': 'application/json'
                }
                params = {
                    'ipAddress': ip,
                    'maxAgeInDays': 90,
                    'verbose': ''
                }
                
                response = requests.get(url, headers=headers, params=params)
                if response.status_code == 200:
                    data = response.json()
                    if data['data']['abuseConfidencePercentage'] > 50:
                        malicious_ips.append({
                            'ip': ip,
                            'confidence': data['data']['abuseConfidencePercentage'],
                            'usage_type': data['data']['usageType'],
                            'country': data['data']['countryCode']
                        })
                        
            except Exception as e:
                self.logger.warning(f"Threat intelligence check failed for {ip}: {e}")
                
        return malicious_ips
        
    def detect_anomalies(self, failed_attempts):
        """Detect anomalous login patterns"""
        anomalies = []
        
        for ip, attempts in failed_attempts.items():
            # Brute force detection
            if len(attempts) > 10:  # More than 10 failed attempts
                anomalies.append({
                    'type': 'brute_force',
                    'ip': ip,
                    'attempt_count': len(attempts),
                    'users_targeted': list(set([a['user'] for a in attempts]))
                })
                
            # Dictionary attack detection
            unique_users = set([a['user'] for a in attempts])
            if len(unique_users) > 5:  # Targeting multiple users
                anomalies.append({
                    'type': 'dictionary_attack',
                    'ip': ip,
                    'users_targeted': list(unique_users)
                })
                
        return anomalies
        
    def generate_threat_report(self, failed_attempts, web_attacks, anomalies, malicious_ips):
        """Generate comprehensive threat report"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'summary': {
                'total_failed_attempts': sum(len(attempts) for attempts in failed_attempts.values()),
                'unique_attacking_ips': len(failed_attempts),
                'web_attacks': sum(len(attacks) for attacks in web_attacks.values()),
                'anomalies_detected': len(anomalies),
                'known_malicious_ips': len(malicious_ips)
            },
            'failed_login_attempts': dict(failed_attempts),
            'web_attacks': dict(web_attacks),
            'anomalies': anomalies,
            'malicious_ips': malicious_ips,
            'geolocation': self.geolocate_ips(list(self.suspicious_ips))
        }
        
        # Save report
        report_file = f'/var/reports/threat_report_{datetime.now().strftime("%Y%m%d_%H%M")}.json'
        with open(report_file, 'w') as f:
            json.dump(report, f, indent=2)
            
        return report
        
    def send_security_alerts(self, report):
        """Send security alerts for high-priority threats"""
        alerts = []
        
        # High-priority conditions
        if report['summary']['anomalies_detected'] > 0:
            alerts.append(f"🚨 {report['summary']['anomalies_detected']} security anomalies detected")
            
        if report['summary']['known_malicious_ips'] > 0:
            alerts.append(f"⚠️ {report['summary']['known_malicious_ips']} known malicious IPs detected")
            
        if report['summary']['total_failed_attempts'] > 100:
            alerts.append(f"📊 High volume of failed login attempts: {report['summary']['total_failed_attempts']}")
            
        if alerts:
            message = "Security Alert Summary:\n" + "\n".join(alerts)
            
            # Send to Slack
            payload = {
                'text': message,
                'username': 'Security Monitor',
                'icon_emoji': ':shield:'
            }
            
            requests.post('https://hooks.slack.com/services/YOUR/WEBHOOK', json=payload)
            
    def run_analysis(self):
        """Run complete log analysis"""
        self.logger.info("Starting security log analysis")
        
        try:
            # Analyze authentication logs
            failed_attempts, successful_logins = self.analyze_auth_logs()
            
            # Analyze web server logs
            web_attacks = self.analyze_web_logs()
            
            # Detect anomalies
            anomalies = self.detect_anomalies(failed_attempts)
            
            # Check threat intelligence
            all_suspicious_ips = list(self.suspicious_ips) + list(failed_attempts.keys())
            malicious_ips = self.check_threat_intelligence(all_suspicious_ips)
            
            # Generate report
            report = self.generate_threat_report(failed_attempts, web_attacks, anomalies, malicious_ips)
            
            # Send alerts
            self.send_security_alerts(report)
            
            self.logger.info("Security log analysis completed")
            
        except Exception as e:
            self.logger.error(f"Log analysis failed: {e}")
            raise

if __name__ == "__main__":
    analyzer = SecurityLogAnalyzer()
    analyzer.run_analysis()

Compliance Monitoring

Automated Compliance Checks

# Compliance monitoring schedule
# Daily PCI DSS compliance check at 6 AM
0 6 * * * /opt/security/pci_compliance_check.py

# Weekly GDPR compliance audit on Monday at 7 AM
0 7 * * 1 /opt/security/gdpr_compliance_audit.py

# Monthly SOX compliance report on 1st at 8 AM
0 8 1 * * /opt/security/sox_compliance_report.py

# Daily file integrity monitoring
0 */4 * * * /opt/security/file_integrity_check.sh

Intrusion Detection and Response

Automated Incident Response

#!/usr/bin/env python3
# /opt/security/intrusion_response.py

import subprocess
import json
import time
from datetime import datetime
import logging
import requests

class IntrusionResponseSystem:
    def __init__(self):
        self.blocked_ips = set()
        self.quarantine_dir = '/var/quarantine'
        self.incident_log = '/var/log/security-incidents.log'
        
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        
    def block_malicious_ip(self, ip_address, reason):
        """Block IP address using iptables"""
        self.logger.info(f"Blocking IP {ip_address}: {reason}")
        
        try:
            # Add to iptables
            cmd = ['iptables', '-A', 'INPUT', '-s', ip_address, '-j', 'DROP']
            subprocess.run(cmd, check=True)
            
            # Add to fail2ban if available
            fail2ban_cmd = ['fail2ban-client', 'set', 'sshd', 'banip', ip_address]
            subprocess.run(fail2ban_cmd, check=False)
            
            self.blocked_ips.add(ip_address)
            
            # Log incident
            incident = {
                'timestamp': datetime.now().isoformat(),
                'action': 'ip_blocked',
                'ip': ip_address,
                'reason': reason
            }
            
            with open(self.incident_log, 'a') as f:
                f.write(json.dumps(incident) + '\n')
                
        except subprocess.CalledProcessError as e:
            self.logger.error(f"Failed to block IP {ip_address}: {e}")
            
    def quarantine_file(self, file_path, reason):
        """Quarantine suspicious file"""
        self.logger.info(f"Quarantining file {file_path}: {reason}")
        
        try:
            import shutil
            import os
            
            # Create quarantine directory if it doesn't exist
            os.makedirs(self.quarantine_dir, exist_ok=True)
            
            # Move file to quarantine
            quarantine_path = os.path.join(
                self.quarantine_dir, 
                f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{os.path.basename(file_path)}"
            )
            
            shutil.move(file_path, quarantine_path)
            
            # Log incident
            incident = {
                'timestamp': datetime.now().isoformat(),
                'action': 'file_quarantined',
                'original_path': file_path,
                'quarantine_path': quarantine_path,
                'reason': reason
            }
            
            with open(self.incident_log, 'a') as f:
                f.write(json.dumps(incident) + '\n')
                
        except Exception as e:
            self.logger.error(f"Failed to quarantine file {file_path}: {e}")
            
    def send_incident_alert(self, incident_type, details):
        """Send immediate incident alert"""
        message = f"🚨 SECURITY INCIDENT DETECTED\n\n"
        message += f"Type: {incident_type}\n"
        message += f"Time: {datetime.now().isoformat()}\n"
        message += f"Details: {details}\n"
        
        # Send to Slack
        payload = {
            'text': message,
            'username': 'Security Incident Response',
            'icon_emoji': ':rotating_light:'
        }
        
        requests.post('https://hooks.slack.com/services/YOUR/WEBHOOK', json=payload)
        
    def monitor_system_integrity(self):
        """Monitor system file integrity"""
        critical_files = [
            '/etc/passwd',
            '/etc/shadow',
            '/etc/sudoers',
            '/etc/ssh/sshd_config'
        ]
        
        for file_path in critical_files:
            try:
                # Check file permissions
                import stat
                import os
                
                file_stat = os.stat(file_path)
                file_mode = stat.filemode(file_stat.st_mode)
                
                # Alert on unexpected permissions
                if file_path == '/etc/shadow' and file_mode != '-rw-r-----':
                    self.send_incident_alert(
                        'File Permission Change',
                        f'{file_path} has unexpected permissions: {file_mode}'
                    )
                    
            except Exception as e:
                self.logger.error(f"Error checking {file_path}: {e}")
                
    def run_response_check(self):
        """Run automated response checks"""
        self.logger.info("Running intrusion detection and response")
        
        try:
            # Monitor system integrity
            self.monitor_system_integrity()
            
            # Check for new incidents in logs
            self.analyze_recent_incidents()
            
        except Exception as e:
            self.logger.error(f"Response check failed: {e}")

if __name__ == "__main__":
    response_system = IntrusionResponseSystem()
    response_system.run_response_check()

Certificate Management

Automated Certificate Monitoring

# Certificate management schedule
# Daily SSL certificate expiration check at 9 AM
0 9 * * * /opt/security/check_ssl_expiration.py

# Weekly certificate renewal attempt on Sunday at 10 AM
0 10 * * 0 /opt/security/renew_certificates.sh

# Monthly certificate audit on 1st at 11 AM
0 11 1 * * /opt/security/certificate_audit.py

# Daily TLS configuration check
0 12 * * * /opt/security/tls_config_check.py

Security Patch Management

Automated Patch Deployment

#!/bin/bash
# /opt/security/security_patch_management.sh

set -e

LOG_FILE="/var/log/security-patches.log"
SLACK_WEBHOOK="https://hooks.slack.com/services/YOUR/WEBHOOK"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a $LOG_FILE
}

send_slack_notification() {
    local message="$1"
    curl -X POST -H 'Content-type: application/json' \
        --data "{\"text\":\"$message\"}" \
        $SLACK_WEBHOOK
}

# Check for security updates
log "Checking for security updates"

if command -v apt &> /dev/null; then
    # Debian/Ubuntu
    apt update
    SECURITY_UPDATES=$(apt list --upgradable 2>/dev/null | grep -i security | wc -l)
    
    if [ $SECURITY_UPDATES -gt 0 ]; then
        log "Found $SECURITY_UPDATES security updates"
        
        # Install security updates
        DEBIAN_FRONTEND=noninteractive apt-get -y upgrade \
            -o Dpkg::Options::="--force-confdef" \
            -o Dpkg::Options::="--force-confold"
            
        send_slack_notification "✅ Installed $SECURITY_UPDATES security updates on $(hostname)"
    else
        log "No security updates available"
    fi
    
elif command -v yum &> /dev/null; then
    # RHEL/CentOS
    yum check-update --security
    SECURITY_UPDATES=$?
    
    if [ $SECURITY_UPDATES -eq 100 ]; then
        log "Security updates available, installing..."
        yum update -y --security
        send_slack_notification "✅ Installed security updates on $(hostname)"
    else
        log "No security updates available"
    fi
fi

# Check if reboot is required
if [ -f /var/run/reboot-required ]; then
    log "System reboot required after updates"
    send_slack_notification "⚠️ System $(hostname) requires reboot after security updates"
fi

log "Security patch check completed"

Conclusion

Security automation with cron jobs provides continuous protection against evolving threats. By implementing these comprehensive automation strategies, you can maintain robust security posture, ensure compliance, and respond quickly to security incidents.

Remember to regularly review and update your security automation scripts, monitor for false positives, and maintain proper access controls for all automated security processes.

Secure Your Infrastructure

Start implementing security automation with our cron expression generator.

Ready to Create Your Cron Job?

Now that you understand the concepts, try our cron expression generator to create your own cron jobs!

Try Cron Generator