MetaMock Logging System Documentation

Last Updated: December 4, 2024
Status: Production-ready with job-specific CloudWatch routing

Overview

The MetaMock logging system provides centralized, context-aware logging with environment-based routing. It supports both development console logging and production CloudWatch integration with job-specific log streams for better debugging and monitoring.

Core Architecture

Application Code → Logger → Context Formatter → Routing Handler → Output
     ↓                ↓             ↓               ↓            ↓
Set Context    Auto-inject    Add Context    Route by Job    CloudWatch/Console

Key Features

1. Context Variables (Thread-Safe)

  • User Context: user_id for request attribution
  • Job Context: job_id, task_name, task_run_id for workflow tracking
  • Uses Python contextvars for async/thread safety

2. Automatic Context Injection

All log messages automatically include relevant context:

[INFO] [user_id=123 job_id=bulk-abc123 task=bulk_workflow] Starting image generation

3. Environment-Based Routing

  • Development: Console output with full context
  • Production + CloudWatch: Dual routing (CloudWatch + console warnings)

4. Job-Specific Log Streams

When job_id is set, logs are automatically routed to dedicated CloudWatch streams:

CloudWatch Log Group: metamock-app
├── celery-worker/container123/2024-12-04    # Default logs
├── backend/container456/2024-12-04          # Default logs  
└── jobs/bulk-abc123                         # Job-specific logs

Context Management API

Setting Context

from backend.utils.logger import set_user_context, set_job_context, get_logger

# Set user context (typically in middleware)
set_user_context(user_id=123)

# Set job context (automatically creates CloudWatch stream)
set_job_context(
    job_id="bulk-abc123",
    task_name="bulk_workflow", 
    task_run_id="run-456"
)

# Get logger and log normally
logger = get_logger(__name__)
logger.info("Processing user request")
# Output: [INFO] [user_id=123 job_id=bulk-abc123 task=bulk_workflow run_id=run-456] Processing user request

Clearing Context

from backend.utils.logger import clear_user_context, clear_job_context, clear_all_context

# Clear specific context
clear_user_context()
clear_job_context()

# Clear everything
clear_all_context()

Retrieving Context

from backend.utils.logger import get_user_context, get_job_context, get_all_context

user_ctx = get_user_context()        # {'user_id': 123}
job_ctx = get_job_context()          # {'job_id': 'abc123', 'task_name': 'bulk_workflow', 'task_run_id': 'run-456'}
all_ctx = get_all_context()          # Combined dictionary

Configuration

Environment Variables

Variable Default Description
ENVIRONMENT development Environment mode (development/production)
USE_CLOUDWATCH false Enable CloudWatch logging in production
LOG_LEVEL INFO Logging level (DEBUG, INFO, WARNING, ERROR)
CLOUDWATCH_LOG_GROUP metamock-app CloudWatch log group name
AWS_ACCESS_KEY_ID - AWS credentials for CloudWatch
AWS_SECRET_ACCESS_KEY - AWS credentials for CloudWatch
AWS_DEFAULT_REGION ap-south-1 AWS region for CloudWatch

Logging Behavior by Environment

Development Mode

  • Output: Console only (stdout)
  • Format: [LEVEL] [context] message
  • Level: All levels from LOG_LEVEL

Production Mode (CloudWatch Disabled)

  • Output: Console only (stdout)
  • Format: [LEVEL] [context] message
  • Level: All levels from LOG_LEVEL

Production Mode (CloudWatch Enabled)

  • Output: Dual routing
  • CloudWatch: All log levels, job-specific streams
  • Console: WARNING and ERROR only for immediate visibility
  • Format: CloudWatch adds timestamps automatically
  • Streams:
  • Default: {service}/{hostname}/{date}
  • Job-specific: jobs/{job_id}

CloudWatch Integration

Log Stream Strategy

Default Streams (per worker, per day):

celery-worker/abc123def456/2024-12-04
backend/xyz789uvw012/2024-12-04

Job-Specific Streams (when job_id is set):

jobs/bulk-generation-uuid-123
jobs/image-upscale-uuid-456
jobs/user-export-uuid-789

Benefits of Job-Specific Streams

  1. Isolation: Each job's logs are completely separate
  2. Debugging: Easy to trace entire workflow for specific job
  3. Monitoring: Set CloudWatch alarms on specific job streams
  4. Cleanup: Auto-expire job logs independently from system logs

Dynamic Handler Creation

When set_job_context(job_id="abc123") is called:

  1. Check: Is CloudWatch enabled and in production?
  2. Cache: Does handler already exist for this job_id?
  3. Create: New CloudWatch handler with stream jobs/abc123
  4. Route: All subsequent logs go to job-specific stream

Handler Caching

  • Global Cache: _job_handlers: Dict[str, logging.Handler]
  • Lifecycle: Handlers persist until application restart
  • Memory: Efficient reuse for repeated job operations

Usage Patterns

1. API Request Logging

# In middleware or route handler
from backend.utils.logger import set_user_context, get_logger

def process_request(request):
    # Set context from authenticated user
    set_user_context(user_id=request.user.id)

    logger = get_logger(__name__)
    logger.info("Processing API request")
    # Output: [INFO] [user_id=123] Processing API request

2. Bulk Generation Workflow

# In Celery task
from backend.utils.logger import set_user_context, set_job_context, get_logger

@celery_app.task
def start_bulk_generation(preset_id, user_id):
    job_id = str(uuid.uuid4())

    # Set context for entire workflow
    set_user_context(user_id=user_id)
    set_job_context(job_id=job_id, task_name="bulk_workflow")

    logger = get_logger(__name__)
    logger.info(f"Starting bulk generation for preset {preset_id}")
    # Output: [INFO] [user_id=123 job_id=abc123 task=bulk_workflow] Starting bulk generation for preset xyz

    # All subsequent logs in this workflow automatically include context
    process_images()  # Logs will include job context
    upload_to_s3()    # Logs will include job context

3. Step-by-Step Workflow Tracking

from backend.utils.logger import set_job_context, get_logger

def process_workflow_step(job_id, step_name):
    # Update task context for current step
    set_job_context(job_id=job_id, task_name=step_name, task_run_id=generate_task_run_id())

    logger = get_logger(__name__)
    logger.info(f"Starting step: {step_name}")
    # Output: [INFO] [job_id=abc123 task=validate_preset run_id=def456] Starting step: validate_preset

    try:
        # Step implementation
        logger.info("Step completed successfully")
    except Exception as e:
        logger.error(f"Step failed: {e}")

Implementation Details

Context Variables (Thread-Safe)

from contextvars import ContextVar

# Global context variables
user_id_context: ContextVar[Optional[int]] = ContextVar('user_id', default=None)
job_id_context: ContextVar[Optional[str]] = ContextVar('job_id', default=None)
task_name_context: ContextVar[Optional[str]] = ContextVar('task_name', default=None)
task_run_id_context: ContextVar[Optional[str]] = ContextVar('task_run_id', default=None)

Benefits: - Async-Safe: Context preserved across await boundaries - Thread-Safe: Different threads have separate context - Request-Scoped: Context automatically isolated per request/task

Custom Formatter

class ContextFormatter(logging.Formatter):
    def format(self, record):
        # Automatically inject context into every log message
        context_parts = []

        if user_id := user_id_context.get(None):
            context_parts.append(f"user_id={user_id}")
        if job_id := job_id_context.get(None):
            context_parts.append(f"job_id={job_id}")
        # ... more context fields

        record.context_info = f"[{' '.join(context_parts)}] " if context_parts else ""
        return super().format(record)

Dynamic Job Routing

class JobRoutingHandler(logging.Handler):
    def emit(self, record):
        job_id = job_id_context.get(None)

        if job_id and job_id in _job_handlers:
            # Route to job-specific CloudWatch stream
            _job_handlers[job_id].emit(record)
        else:
            # Use default handler (per-worker stream)
            self.default_handler.emit(record)

Monitoring and Debugging

CloudWatch Queries

Find all logs for specific job:

fields @timestamp, @message
| filter @logStream like /jobs\/bulk-abc123/
| sort @timestamp desc

Find errors across all bulk jobs:

fields @timestamp, @message
| filter @logStream like /jobs\/bulk-/ and @message like /ERROR/
| sort @timestamp desc

Monitor job completion rates:

fields @timestamp, @message
| filter @message like /Starting bulk generation/ or @message like /Bulk generation completed/
| stats count() by bin(5m)

Development Debugging

Console Output Example:

[INFO] [user_id=123] User authenticated
[INFO] [user_id=123 job_id=bulk-abc123 task=bulk_workflow] Starting bulk generation
[DEBUG] [user_id=123 job_id=bulk-abc123 task=validate_preset run_id=def456] Validating preset configuration
[INFO] [user_id=123 job_id=bulk-abc123 task=generate_prompts run_id=ghi789] Generated 5 unique prompts
[ERROR] [user_id=123 job_id=bulk-abc123 task=generate_images run_id=jkl012] Image generation failed: API timeout

Log Stream Management

Stream Naming Convention: - Default: {service}/{hostname}/{date} - celery-worker/abc123def456/2024-12-04 - backend/xyz789uvw012/2024-12-04 - Job-Specific: jobs/{job_id} - jobs/bulk-generation-uuid-123 - jobs/image-upscale-uuid-456

Automatic Cleanup: - CloudWatch retention policies apply to all streams - Job streams can have different retention than default streams - No manual cleanup required

Error Handling and Fallbacks

CloudWatch Failures

If CloudWatch setup fails: 1. Graceful Degradation: Falls back to console logging 2. Error Logging: Warning logged about CloudWatch failure 3. No Service Interruption: Application continues normally

Missing Dependencies

If watchtower not installed: 1. Import Fallback: Catches ImportError 2. Console Logging: Uses standard console handler 3. Development Mode: No impact on development workflow

AWS Credential Issues

If AWS credentials invalid: 1. Client Creation Fails: Returns None from _get_cloudwatch_client() 2. Handler Creation Skipped: No CloudWatch handlers created 3. Console Fallback: All logs go to console

Best Practices

1. Context Lifecycle Management

# ✅ Good: Set context early in request/task
def handle_request():
    set_user_context(user_id=current_user.id)
    # ... rest of request processing

# ✅ Good: Clear context when switching users/jobs
def switch_context():
    clear_all_context()
    set_user_context(user_id=new_user.id)

# ❌ Avoid: Forgetting to clear context between operations

2. Logger Instance Management

# ✅ Good: Get logger once per module
logger = get_logger(__name__)

class MyService:
    def method1(self):
        logger.info("Method 1 called")  # Reuse module logger

    def method2(self):
        logger.info("Method 2 called")  # Reuse module logger

# ❌ Avoid: Creating new loggers repeatedly
def bad_logging():
    logger = get_logger(__name__)  # Don't do this in every function
    logger.info("Message")

3. Context Information

# ✅ Good: Meaningful context
set_job_context(
    job_id="bulk-gen-uuid-123",
    task_name="bulk_generation", 
    task_run_id="step-5-uuid"
)

# ✅ Good: Descriptive task names
set_job_context(job_id=job_id, task_name="validate_preset")
set_job_context(job_id=job_id, task_name="generate_images")

# ❌ Avoid: Generic or unclear context
set_job_context(job_id="123", task_name="task")

4. Log Levels

# ✅ Good: Appropriate log levels
logger.debug("Detailed processing information")  # Development debugging
logger.info("User started bulk generation")      # Important events
logger.warning("Retrying failed image generation") # Recoverable issues
logger.error("Failed to upload to S3")           # Serious errors

# ❌ Avoid: Inappropriate levels
logger.error("User logged in")  # Not an error
logger.info("Variable x = 5")   # Too verbose for info

Troubleshooting

Issue: Logs not appearing in CloudWatch

Diagnosis: 1. Check environment variables: ENVIRONMENT=production, USE_CLOUDWATCH=true 2. Verify AWS credentials are valid 3. Confirm watchtower package is installed 4. Check CloudWatch log group exists

Debug Commands:

from backend.utils.logger import _get_cloudwatch_client
client, log_group = _get_cloudwatch_client()
print(f"Client: {client}, Group: {log_group}")

Issue: Job-specific streams not created

Diagnosis: 1. Verify set_job_context(job_id="...") is called 2. Check if running in production with CloudWatch enabled 3. Look for handler creation logs in stderr

Debug Commands:

from backend.utils.logger import _job_handlers
print(f"Active handlers: {list(_job_handlers.keys())}")

Issue: Context not appearing in logs

Diagnosis: 1. Verify context is set before logging 2. Check if context variables are thread/async isolated 3. Confirm formatter is applied to handlers

Debug Commands:

from backend.utils.logger import get_all_context
print(f"Current context: {get_all_context()}")

Issue: Too many log streams

Cause: Creating new handlers for every job operation

Solution: Handler caching prevents this, but verify job_id consistency

Prevention:

# ✅ Good: Consistent job_id
job_id = str(uuid.uuid4())
set_job_context(job_id=job_id, task_name="step1")
set_job_context(job_id=job_id, task_name="step2")  # Same handler reused

# ❌ Avoid: New job_id for each step
set_job_context(job_id=str(uuid.uuid4()), task_name="step1")  # New handler
set_job_context(job_id=str(uuid.uuid4()), task_name="step2")  # New handler

Future Enhancements

Planned Features

  1. Log Aggregation: Cross-service log correlation
  2. Metrics Integration: Automatic CloudWatch metrics from logs
  3. Alerting: Smart alerts based on error patterns
  4. Performance Monitoring: Request/job duration tracking
  5. Log Sampling: Reduce log volume in high-traffic scenarios

Integration Opportunities

  1. APM Tools: DataDog, New Relic integration
  2. Error Tracking: Sentry integration with context
  3. Distributed Tracing: OpenTelemetry support
  4. Security Monitoring: Audit trail enhancement

This logging system provides the foundation for comprehensive observability across the MetaMock platform, with particular strength in tracking complex multi-step workflows like bulk generation.