Data Science & Analytics Cron Jobs: Complete Automation Guide
Master data science automation with cron jobs. Comprehensive guide covering data pipelines, ML model training, ETL processes, report generation, and analytics workflow automation.
What You'll Learn
- Automated data pipeline orchestration
- Machine learning model training and deployment
- ETL/ELT process automation
- Automated report generation and distribution
- Data quality monitoring and validation
- Analytics dashboard updates and maintenance
Data Pipeline Automation
ETL Pipeline Scheduling
# Data pipeline cron schedule
# Daily ETL at 2 AM
0 2 * * * /opt/data-pipelines/daily_etl.py
# Hourly incremental data sync
0 * * * * /opt/data-pipelines/incremental_sync.py
# Weekly data quality checks on Sunday at 3 AM
0 3 * * 0 /opt/data-pipelines/data_quality_check.py
# Monthly data warehouse optimization
0 4 1 * * /opt/data-pipelines/optimize_warehouse.py
Comprehensive ETL Pipeline Script
#!/usr/bin/env python3
# /opt/data-pipelines/daily_etl.py
import logging
import pandas as pd
from datetime import datetime, timedelta
import boto3
import json
import sys
from sqlalchemy import create_engine
import requests
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/data-pipeline.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class DataPipeline:
def __init__(self):
self.s3_client = boto3.client('s3')
self.db_engine = create_engine('postgresql://user:pass@localhost/warehouse')
self.slack_webhook = "https://hooks.slack.com/services/YOUR/WEBHOOK"
def extract_data(self):
"""Extract data from multiple sources"""
logger.info("Starting data extraction")
try:
# Extract from API
api_data = self.extract_from_api()
# Extract from database
db_data = self.extract_from_database()
# Extract from S3 files
s3_data = self.extract_from_s3()
return {
'api_data': api_data,
'db_data': db_data,
's3_data': s3_data
}
except Exception as e:
logger.error(f"Data extraction failed: {e}")
self.send_alert(f"ETL extraction failed: {e}")
raise
def extract_from_api(self):
"""Extract data from REST API"""
logger.info("Extracting data from API")
# Example: Extract from analytics API
response = requests.get(
'https://api.analytics.com/v1/events',
headers={'Authorization': 'Bearer YOUR_TOKEN'},
params={
'start_date': (datetime.now() - timedelta(days=1)).isoformat(),
'end_date': datetime.now().isoformat()
}
)
if response.status_code == 200:
return pd.DataFrame(response.json()['data'])
else:
raise Exception(f"API request failed: {response.status_code}")
def extract_from_database(self):
"""Extract data from operational database"""
logger.info("Extracting data from database")
query = '''
SELECT
user_id,
event_type,
timestamp,
properties
FROM events
WHERE DATE(timestamp) = CURRENT_DATE - INTERVAL '1 day'
'''
return pd.read_sql(query, self.db_engine)
def extract_from_s3(self):
"""Extract data from S3"""
logger.info("Extracting data from S3")
# S3 extraction logic here...
return pd.DataFrame() # placeholder
def transform_data(self, raw_data):
"""Transform and clean raw data"""
logger.info("Transforming data")
# Transformation logic here...
return raw_data # placeholder
def load_data(self, transformed_data):
"""Load data into data warehouse"""
logger.info("Loading data into warehouse")
# Loading logic here...
def send_alert(self, message):
"""Send alert to Slack"""
payload = {
'text': f'🚨 Data Pipeline Alert: {message}',
'username': 'Data Pipeline Bot'
}
requests.post(self.slack_webhook, json=payload)
def run(self):
"""Run the complete ETL pipeline"""
start_time = datetime.now()
logger.info("Starting ETL pipeline")
try:
# Extract
raw_data = self.extract_data()
# Transform
transformed_data = self.transform_data(raw_data)
# Load
self.load_data(transformed_data)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
logger.info(f"ETL pipeline completed successfully in {duration:.2f} seconds")
except Exception as e:
logger.error(f"ETL pipeline failed: {e}")
sys.exit(1)
if __name__ == "__main__":
pipeline = DataPipeline()
pipeline.run()
Machine Learning Model Automation
Model Training and Deployment
# ML model automation schedule
# Daily model retraining at 4 AM
0 4 * * * /opt/ml-pipelines/retrain_models.py
# Model performance monitoring every 6 hours
0 */6 * * * /opt/ml-pipelines/monitor_model_performance.py
# Weekly model validation on Sunday at 5 AM
0 5 * * 0 /opt/ml-pipelines/validate_models.py
# Monthly model comparison and selection
0 6 1 * * /opt/ml-pipelines/model_selection.py
Automated Report Generation
Business Intelligence Automation
# Report generation schedule
# Daily executive dashboard at 7 AM
0 7 * * * /opt/reports/generate_executive_dashboard.py
# Weekly performance report every Monday at 8 AM
0 8 * * 1 /opt/reports/weekly_performance_report.py
# Monthly analytics report on 1st at 9 AM
0 9 1 * * /opt/reports/monthly_analytics_report.py
# Hourly real-time metrics update
0 * * * * /opt/reports/update_realtime_metrics.py
Automated Report Generation Script
#!/usr/bin/env python3
# /opt/reports/generate_executive_dashboard.py
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
import boto3
import logging
from sqlalchemy import create_engine
class ReportGenerator:
def __init__(self):
self.db_engine = create_engine('postgresql://user:pass@localhost/analytics')
self.s3_client = boto3.client('s3')
self.report_bucket = 'analytics-reports'
# Email configuration
self.smtp_server = 'smtp.company.com'
self.smtp_port = 587
self.email_user = 'reports@company.com'
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def extract_kpi_data(self):
"""Extract KPI data for dashboard"""
self.logger.info("Extracting KPI data")
# Revenue metrics
revenue_query = '''
SELECT
DATE(created_at) as date,
SUM(amount) as total_revenue,
COUNT(DISTINCT user_id) as active_users
FROM sales
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY 1
ORDER BY 1;
'''
return pd.read_sql(revenue_query, self.db_engine)
def generate_report_html(self, kpi_data):
"""Generate HTML report from data"""
self.logger.info("Generating report HTML")
fig = make_subplots(rows=2, cols=1, subplot_titles=("Daily Revenue", "Active Users"))
fig.add_trace(go.Scatter(x=kpi_data['date'], y=kpi_data['total_revenue'], mode='lines', name='Revenue'), row=1, col=1)
fig.add_trace(go.Scatter(x=kpi_data['date'], y=kpi_data['active_users'], mode='lines', name='Active Users'), row=2, col=1)
return fig.to_html(full_html=False, include_plotlyjs='cdn')
def send_report_email(self, html_content):
"""Send report via email"""
self.logger.info("Sending report email")
msg = MIMEMultipart()
msg['Subject'] = f"Executive Dashboard - {datetime.now().strftime('%Y-%m-%d')}"
msg['From'] = self.email_user
msg['To'] = "executives@company.com"
msg.attach(MIMEText(html_content, 'html'))
# SMTP logic here...
def run(self):
"""Generate and distribute the report"""
self.logger.info("Starting report generation")
try:
data = self.extract_kpi_data()
html = self.generate_report_html(data)
self.send_report_email(html)
self.logger.info("Report sent successfully")
except Exception as e:
self.logger.error(f"Report generation failed: {e}")
if __name__ == "__main__":
generator = ReportGenerator()
generator.run()
Data Quality and Validation
Automate data quality checks to ensure data integrity and reliability.
# Data quality monitoring schedule
# Daily data validation at 5 AM
0 5 * * * /opt/data-quality/validate_data.py
# Anomaly detection every hour
0 * * * * /opt/data-quality/detect_anomalies.py
# Data profiling weekly on Monday at 6 AM
0 6 * * 1 /opt/data-quality/profile_data.py
# Schema change monitoring
*/15 * * * * /opt/data-quality/monitor_schema_changes.py
A/B Testing and Experimentation
Automate the analysis of A/B tests and experiments.
# A/B testing automation schedule
# Daily experiment results analysis at 8 AM
0 8 * * * /opt/experiments/analyze_ab_tests.py
# Weekly experiment summary report on Monday at 9 AM
0 9 * * 1 /opt/experiments/summarize_experiments.py
# Real-time significance monitoring
*/30 * * * * /opt/experiments/check_significance.py
# Automated experiment rollout
0 10 * * * /opt/experiments/rollout_winner.py
Conclusion
Automating data science workflows with cron jobs is essential for building scalable and reliable data products.
Next Steps
Ready to implement your own data science automation? Explore our interactive cron job generator to build and test your schedules.