Master production logging and observability to track execution times and build robust audit trails for your ML pipelines. Ensure your models remain debuggable.
Previously in this course, we explored Tracking Performance Degradation in Production ML Pipelines to identify when models fail silently. While that lesson focused on metrics, this lesson adds the "how-to" of system-level visibility: implementing comprehensive logging and observability to ensure every inference request is traceable and every pipeline bottleneck is visible.
In production, silence is not golden—it’s a liability. If a model starts returning unexpected results or latency spikes, you need structured logs to reconstruct the state of the world at that exact moment.
Observability in MLOps isn't just about printing statements to the console. It is the practice of emitting high-cardinality, structured data that allows you to ask arbitrary questions about your system's internal state. For ML pipelines, this breaks down into three pillars:
Avoid print() statements. They lack timestamps, severity levels, and structured context. Instead, use Python’s logging library configured to output JSON. This allows tools like Datadog, ELK, or CloudWatch to parse your logs automatically.
PYTHONimport logging import json import time from datetime import datetime # Configure a structured JSON logger def get_logger(name="ml_pipeline"): logger = logging.getLogger(name) handler = logging.StreamHandler() class JsonFormatter(logging.Formatter): def format(self, record): log_record = { "timestamp": datetime.utcnow().isoformat(), "level": record.levelname, "message": record.getMessage(), "module": record.module } # Add extra context if provided if hasattr(record, "extra_data"): log_record.update(record.extra_data) return json.dumps(log_record) handler.setFormatter(JsonFormatter()) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger logger = get_logger()
To identify bottlenecks, we need a decorator that wraps our pipeline steps. This ensures we don't pollute our business logic with timing code.
PYTHONdef track_time(func): def wrapper(*args, **kwargs): start = time.perf_counter() result = func(*args, **kwargs) duration = time.perf_counter() - start logger.info(f"Execution of {func.__name__} completed", extra={"extra_data": {"duration_seconds": round(duration, 4)}}) return result return wrapper @track_time def run_inference(input_data): # Simulate inference logic time.sleep(0.12) return {"prediction": 0.85}
An audit trail is your "black box" flight recorder. In a production environment, you should never just return a prediction to the user; you must log the request, the prediction, and the metadata (model ID, feature version) to a persistent store or a dedicated log stream.
For our project, we will append a logging step to our prediction function:
PYTHONdef log_prediction(input_features, prediction, model_version): audit_log = { "event": "prediction_audit", "input_features": input_features, "prediction": prediction, "model_version": model_version, "timestamp": datetime.utcnow().isoformat() } # In production, send this to a database or a structured log aggregator logger.info("Prediction generated", extra={"extra_data": audit_log}) # Example usage features = {"age": 30, "income": 50000} pred = run_inference(features) log_prediction(features, pred, model_version="v1.2.0")
Modify your existing inference script to include a try-except block within the logging decorator. If the model fails, log the error with the severity ERROR and capture the input_features that caused the crash. This is the first step in Monitoring Data Drift: A Practical Guide for ML Engineers, as you'll eventually need to analyze these failures to detect if they correlate with specific data segments.
stdout and let a sidecar process (like Fluentd or Vector) handle the shipping.Effective logging and observability are what separate a "notebook model" from a reliable production service. By standardizing your logs into JSON, wrapping execution steps with timing decorators, and maintaining a strict audit trail, you ensure that when the system fails—and it will—you have the data required to perform a post-mortem.
Up next: Automated Retraining Triggers. We will take these logs and turn them into actionable signals that force your pipeline to retrain when performance dips below a threshold.
Data drift occurs when production data shifts away from your training baseline. Learn to calculate the Population Stability Index and set up alerts to catch it.
Read moreLearn how to finalize your ML pipeline for production. We cover final validation, dependency locking, and operational readiness for a seamless deployment.
Logging and Observability