MetaMock Logging System Documentation
Last Updated: December 4, 2024
Status: Production-ready with job-specific CloudWatch routing
Overview
The MetaMock logging system provides centralized, context-aware logging with environment-based routing. It supports both development console logging and production CloudWatch integration with job-specific log streams for better debugging and monitoring.
Core Architecture
Application Code → Logger → Context Formatter → Routing Handler → Output
↓ ↓ ↓ ↓ ↓
Set Context Auto-inject Add Context Route by Job CloudWatch/Console
Key Features
1. Context Variables (Thread-Safe)
- User Context:
user_idfor request attribution - Job Context:
job_id,task_name,task_run_idfor workflow tracking - Uses Python
contextvarsfor async/thread safety
2. Automatic Context Injection
All log messages automatically include relevant context:
[INFO] [user_id=123 job_id=bulk-abc123 task=bulk_workflow] Starting image generation
3. Environment-Based Routing
- Development: Console output with full context
- Production + CloudWatch: Dual routing (CloudWatch + console warnings)
4. Job-Specific Log Streams
When job_id is set, logs are automatically routed to dedicated CloudWatch streams:
CloudWatch Log Group: metamock-app
├── celery-worker/container123/2024-12-04 # Default logs
├── backend/container456/2024-12-04 # Default logs
└── jobs/bulk-abc123 # Job-specific logs
Context Management API
Setting Context
from backend.utils.logger import set_user_context, set_job_context, get_logger
# Set user context (typically in middleware)
set_user_context(user_id=123)
# Set job context (automatically creates CloudWatch stream)
set_job_context(
job_id="bulk-abc123",
task_name="bulk_workflow",
task_run_id="run-456"
)
# Get logger and log normally
logger = get_logger(__name__)
logger.info("Processing user request")
# Output: [INFO] [user_id=123 job_id=bulk-abc123 task=bulk_workflow run_id=run-456] Processing user request
Clearing Context
from backend.utils.logger import clear_user_context, clear_job_context, clear_all_context
# Clear specific context
clear_user_context()
clear_job_context()
# Clear everything
clear_all_context()
Retrieving Context
from backend.utils.logger import get_user_context, get_job_context, get_all_context
user_ctx = get_user_context() # {'user_id': 123}
job_ctx = get_job_context() # {'job_id': 'abc123', 'task_name': 'bulk_workflow', 'task_run_id': 'run-456'}
all_ctx = get_all_context() # Combined dictionary
Configuration
Environment Variables
| Variable | Default | Description |
|---|---|---|
ENVIRONMENT |
development |
Environment mode (development/production) |
USE_CLOUDWATCH |
false |
Enable CloudWatch logging in production |
LOG_LEVEL |
INFO |
Logging level (DEBUG, INFO, WARNING, ERROR) |
CLOUDWATCH_LOG_GROUP |
metamock-app |
CloudWatch log group name |
AWS_ACCESS_KEY_ID |
- | AWS credentials for CloudWatch |
AWS_SECRET_ACCESS_KEY |
- | AWS credentials for CloudWatch |
AWS_DEFAULT_REGION |
ap-south-1 |
AWS region for CloudWatch |
Logging Behavior by Environment
Development Mode
- Output: Console only (
stdout) - Format:
[LEVEL] [context] message - Level: All levels from
LOG_LEVEL
Production Mode (CloudWatch Disabled)
- Output: Console only (
stdout) - Format:
[LEVEL] [context] message - Level: All levels from
LOG_LEVEL
Production Mode (CloudWatch Enabled)
- Output: Dual routing
- CloudWatch: All log levels, job-specific streams
- Console: WARNING and ERROR only for immediate visibility
- Format: CloudWatch adds timestamps automatically
- Streams:
- Default:
{service}/{hostname}/{date} - Job-specific:
jobs/{job_id}
CloudWatch Integration
Log Stream Strategy
Default Streams (per worker, per day):
celery-worker/abc123def456/2024-12-04
backend/xyz789uvw012/2024-12-04
Job-Specific Streams (when job_id is set):
jobs/bulk-generation-uuid-123
jobs/image-upscale-uuid-456
jobs/user-export-uuid-789
Benefits of Job-Specific Streams
- Isolation: Each job's logs are completely separate
- Debugging: Easy to trace entire workflow for specific job
- Monitoring: Set CloudWatch alarms on specific job streams
- Cleanup: Auto-expire job logs independently from system logs
Dynamic Handler Creation
When set_job_context(job_id="abc123") is called:
- Check: Is CloudWatch enabled and in production?
- Cache: Does handler already exist for this
job_id? - Create: New CloudWatch handler with stream
jobs/abc123 - Route: All subsequent logs go to job-specific stream
Handler Caching
- Global Cache:
_job_handlers: Dict[str, logging.Handler] - Lifecycle: Handlers persist until application restart
- Memory: Efficient reuse for repeated job operations
Usage Patterns
1. API Request Logging
# In middleware or route handler
from backend.utils.logger import set_user_context, get_logger
def process_request(request):
# Set context from authenticated user
set_user_context(user_id=request.user.id)
logger = get_logger(__name__)
logger.info("Processing API request")
# Output: [INFO] [user_id=123] Processing API request
2. Bulk Generation Workflow
# In Celery task
from backend.utils.logger import set_user_context, set_job_context, get_logger
@celery_app.task
def start_bulk_generation(preset_id, user_id):
job_id = str(uuid.uuid4())
# Set context for entire workflow
set_user_context(user_id=user_id)
set_job_context(job_id=job_id, task_name="bulk_workflow")
logger = get_logger(__name__)
logger.info(f"Starting bulk generation for preset {preset_id}")
# Output: [INFO] [user_id=123 job_id=abc123 task=bulk_workflow] Starting bulk generation for preset xyz
# All subsequent logs in this workflow automatically include context
process_images() # Logs will include job context
upload_to_s3() # Logs will include job context
3. Step-by-Step Workflow Tracking
from backend.utils.logger import set_job_context, get_logger
def process_workflow_step(job_id, step_name):
# Update task context for current step
set_job_context(job_id=job_id, task_name=step_name, task_run_id=generate_task_run_id())
logger = get_logger(__name__)
logger.info(f"Starting step: {step_name}")
# Output: [INFO] [job_id=abc123 task=validate_preset run_id=def456] Starting step: validate_preset
try:
# Step implementation
logger.info("Step completed successfully")
except Exception as e:
logger.error(f"Step failed: {e}")
Implementation Details
Context Variables (Thread-Safe)
from contextvars import ContextVar
# Global context variables
user_id_context: ContextVar[Optional[int]] = ContextVar('user_id', default=None)
job_id_context: ContextVar[Optional[str]] = ContextVar('job_id', default=None)
task_name_context: ContextVar[Optional[str]] = ContextVar('task_name', default=None)
task_run_id_context: ContextVar[Optional[str]] = ContextVar('task_run_id', default=None)
Benefits:
- Async-Safe: Context preserved across await boundaries
- Thread-Safe: Different threads have separate context
- Request-Scoped: Context automatically isolated per request/task
Custom Formatter
class ContextFormatter(logging.Formatter):
def format(self, record):
# Automatically inject context into every log message
context_parts = []
if user_id := user_id_context.get(None):
context_parts.append(f"user_id={user_id}")
if job_id := job_id_context.get(None):
context_parts.append(f"job_id={job_id}")
# ... more context fields
record.context_info = f"[{' '.join(context_parts)}] " if context_parts else ""
return super().format(record)
Dynamic Job Routing
class JobRoutingHandler(logging.Handler):
def emit(self, record):
job_id = job_id_context.get(None)
if job_id and job_id in _job_handlers:
# Route to job-specific CloudWatch stream
_job_handlers[job_id].emit(record)
else:
# Use default handler (per-worker stream)
self.default_handler.emit(record)
Monitoring and Debugging
CloudWatch Queries
Find all logs for specific job:
fields @timestamp, @message
| filter @logStream like /jobs\/bulk-abc123/
| sort @timestamp desc
Find errors across all bulk jobs:
fields @timestamp, @message
| filter @logStream like /jobs\/bulk-/ and @message like /ERROR/
| sort @timestamp desc
Monitor job completion rates:
fields @timestamp, @message
| filter @message like /Starting bulk generation/ or @message like /Bulk generation completed/
| stats count() by bin(5m)
Development Debugging
Console Output Example:
[INFO] [user_id=123] User authenticated
[INFO] [user_id=123 job_id=bulk-abc123 task=bulk_workflow] Starting bulk generation
[DEBUG] [user_id=123 job_id=bulk-abc123 task=validate_preset run_id=def456] Validating preset configuration
[INFO] [user_id=123 job_id=bulk-abc123 task=generate_prompts run_id=ghi789] Generated 5 unique prompts
[ERROR] [user_id=123 job_id=bulk-abc123 task=generate_images run_id=jkl012] Image generation failed: API timeout
Log Stream Management
Stream Naming Convention:
- Default: {service}/{hostname}/{date}
- celery-worker/abc123def456/2024-12-04
- backend/xyz789uvw012/2024-12-04
- Job-Specific: jobs/{job_id}
- jobs/bulk-generation-uuid-123
- jobs/image-upscale-uuid-456
Automatic Cleanup: - CloudWatch retention policies apply to all streams - Job streams can have different retention than default streams - No manual cleanup required
Error Handling and Fallbacks
CloudWatch Failures
If CloudWatch setup fails: 1. Graceful Degradation: Falls back to console logging 2. Error Logging: Warning logged about CloudWatch failure 3. No Service Interruption: Application continues normally
Missing Dependencies
If watchtower not installed:
1. Import Fallback: Catches ImportError
2. Console Logging: Uses standard console handler
3. Development Mode: No impact on development workflow
AWS Credential Issues
If AWS credentials invalid:
1. Client Creation Fails: Returns None from _get_cloudwatch_client()
2. Handler Creation Skipped: No CloudWatch handlers created
3. Console Fallback: All logs go to console
Best Practices
1. Context Lifecycle Management
# ✅ Good: Set context early in request/task
def handle_request():
set_user_context(user_id=current_user.id)
# ... rest of request processing
# ✅ Good: Clear context when switching users/jobs
def switch_context():
clear_all_context()
set_user_context(user_id=new_user.id)
# ❌ Avoid: Forgetting to clear context between operations
2. Logger Instance Management
# ✅ Good: Get logger once per module
logger = get_logger(__name__)
class MyService:
def method1(self):
logger.info("Method 1 called") # Reuse module logger
def method2(self):
logger.info("Method 2 called") # Reuse module logger
# ❌ Avoid: Creating new loggers repeatedly
def bad_logging():
logger = get_logger(__name__) # Don't do this in every function
logger.info("Message")
3. Context Information
# ✅ Good: Meaningful context
set_job_context(
job_id="bulk-gen-uuid-123",
task_name="bulk_generation",
task_run_id="step-5-uuid"
)
# ✅ Good: Descriptive task names
set_job_context(job_id=job_id, task_name="validate_preset")
set_job_context(job_id=job_id, task_name="generate_images")
# ❌ Avoid: Generic or unclear context
set_job_context(job_id="123", task_name="task")
4. Log Levels
# ✅ Good: Appropriate log levels
logger.debug("Detailed processing information") # Development debugging
logger.info("User started bulk generation") # Important events
logger.warning("Retrying failed image generation") # Recoverable issues
logger.error("Failed to upload to S3") # Serious errors
# ❌ Avoid: Inappropriate levels
logger.error("User logged in") # Not an error
logger.info("Variable x = 5") # Too verbose for info
Troubleshooting
Issue: Logs not appearing in CloudWatch
Diagnosis:
1. Check environment variables: ENVIRONMENT=production, USE_CLOUDWATCH=true
2. Verify AWS credentials are valid
3. Confirm watchtower package is installed
4. Check CloudWatch log group exists
Debug Commands:
from backend.utils.logger import _get_cloudwatch_client
client, log_group = _get_cloudwatch_client()
print(f"Client: {client}, Group: {log_group}")
Issue: Job-specific streams not created
Diagnosis:
1. Verify set_job_context(job_id="...") is called
2. Check if running in production with CloudWatch enabled
3. Look for handler creation logs in stderr
Debug Commands:
from backend.utils.logger import _job_handlers
print(f"Active handlers: {list(_job_handlers.keys())}")
Issue: Context not appearing in logs
Diagnosis: 1. Verify context is set before logging 2. Check if context variables are thread/async isolated 3. Confirm formatter is applied to handlers
Debug Commands:
from backend.utils.logger import get_all_context
print(f"Current context: {get_all_context()}")
Issue: Too many log streams
Cause: Creating new handlers for every job operation
Solution: Handler caching prevents this, but verify job_id consistency
Prevention:
# ✅ Good: Consistent job_id
job_id = str(uuid.uuid4())
set_job_context(job_id=job_id, task_name="step1")
set_job_context(job_id=job_id, task_name="step2") # Same handler reused
# ❌ Avoid: New job_id for each step
set_job_context(job_id=str(uuid.uuid4()), task_name="step1") # New handler
set_job_context(job_id=str(uuid.uuid4()), task_name="step2") # New handler
Future Enhancements
Planned Features
- Log Aggregation: Cross-service log correlation
- Metrics Integration: Automatic CloudWatch metrics from logs
- Alerting: Smart alerts based on error patterns
- Performance Monitoring: Request/job duration tracking
- Log Sampling: Reduce log volume in high-traffic scenarios
Integration Opportunities
- APM Tools: DataDog, New Relic integration
- Error Tracking: Sentry integration with context
- Distributed Tracing: OpenTelemetry support
- Security Monitoring: Audit trail enhancement
This logging system provides the foundation for comprehensive observability across the MetaMock platform, with particular strength in tracking complex multi-step workflows like bulk generation.