diff --git a/dstream.hcl b/dstream.hcl index 70ed245..6f69011 100644 --- a/dstream.hcl +++ b/dstream.hcl @@ -33,8 +33,8 @@ ingester { tables_overrides { overrides { table_name = "Persons" - poll_interval = "1s" - max_poll_interval = "5s" + poll_interval = "10s" + max_poll_interval = "300s" } } } diff --git a/internal/cdc/locking/blob_locker.go b/internal/cdc/locking/blob_locker.go index 627f04d..bec486e 100644 --- a/internal/cdc/locking/blob_locker.go +++ b/internal/cdc/locking/blob_locker.go @@ -3,13 +3,14 @@ package locking import ( "context" "fmt" - "log" "strings" "time" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/lease" + + "github.com/katasec/dstream/internal/logging" ) type BlobLocker struct { @@ -68,7 +69,8 @@ func NewBlobLocker(connectionString, containerName, lockName string) (*BlobLocke // AcquireLock tries to acquire a lock on the blob and stores the lease ID func (bl *BlobLocker) AcquireLock(ctx context.Context, lockName string) (string, error) { - log.Printf("Attempting to acquire lock for blob %s", bl.lockName) + logger := logging.GetLogger() + logger.Printf("Attempting to acquire lock for blob %s", bl.lockName) // Try to acquire lease resp, err := bl.blobLeaseClient.AcquireLease(bl.ctx, int32(bl.lockTTL.Seconds()), nil) @@ -85,10 +87,10 @@ func (bl *BlobLocker) AcquireLock(ctx context.Context, lockName string) (string, // Check if the lock is older than 2 minutes lastModified := props.LastModified lockAge := time.Since(*lastModified) - log.Printf("Lock on %s was last modified at: %v (%.2f minutes ago)", bl.lockName, lastModified.Format(time.RFC3339), lockAge.Minutes()) + logger.Printf("Lock on %s was last modified at: %v (%.2f minutes ago)", bl.lockName, lastModified.Format(time.RFC3339), lockAge.Minutes()) if lockAge > 2*time.Minute { - log.Printf("Lock on %s is older than 2 minutes (last modified: %v). Breaking lease...", bl.lockName, *lastModified) + logger.Printf("Lock on %s is older than 2 minutes (last modified: %v). Breaking lease...", bl.lockName, lastModified.Format(time.RFC3339)) // Break the lease _, err = bl.blobLeaseClient.BreakLease(ctx, nil) @@ -104,44 +106,47 @@ func (bl *BlobLocker) AcquireLock(ctx context.Context, lockName string) (string, if err != nil { return "", fmt.Errorf("failed to acquire lease after breaking for %s: %w", bl.lockName, err) } - log.Printf("Successfully acquired lock after breaking old lease for %s", bl.lockName) + logger.Printf("Successfully acquired lock after breaking old lease for %s", bl.lockName) return *resp.LeaseID, nil } - log.Printf("Table %s is already locked and the lock is still valid (%.2f minutes old, within 2 minute TTL). Skipping...", bl.lockName, lockAge.Minutes()) + logger.Printf("Table %s is already locked and the lock is still valid (%.2f minutes old, within 2 minute TTL). Skipping...", bl.lockName, lockAge.Minutes()) return "", nil } return "", fmt.Errorf("failed to acquire lock for blob %s: %w", bl.lockName, err) } - log.Printf("Lock acquired for blob %s with Lease ID: %s", bl.lockName, *resp.LeaseID) + logger.Printf("Lock acquired for blob %s with Lease ID: %s", bl.lockName, *resp.LeaseID) return *resp.LeaseID, nil } func (bl *BlobLocker) RenewLock(ctx context.Context, lockName string) error { + logger := logging.GetLogger() _, err := bl.blobLeaseClient.RenewLease(bl.ctx, nil) if err != nil { return fmt.Errorf("failed to renew lock for blob %s: %w", lockName, err) } - log.Printf("Lock renewed for blob %s", lockName) + logger.Printf("Lock renewed for blob %s", lockName) return nil } // ReleaseLock releases the lock associated with the provided lease ID for the specified blob (lockName) func (bl *BlobLocker) ReleaseLock(tx context.Context, lockName string, leaseID string) error { + logger := logging.GetLogger() _, err := bl.blobLeaseClient.ReleaseLease(bl.ctx, &lease.BlobReleaseOptions{}) if err != nil { return fmt.Errorf("failed to release lock for blob %s: %w", bl.lockName, err) } else { - log.Printf("Lock released successfully for blob %s !\n", bl.lockName) + logger.Printf("Lock released successfully for blob %s", bl.lockName) } return nil } func (bl *BlobLocker) StartLockRenewal(ctx context.Context, lockName string) { - log.Printf("Starting lock renewal for blob %s", lockName) + logger := logging.GetLogger() + logger.Printf("Starting lock renewal for blob %s", lockName) go func() { ticker := time.NewTicker(bl.lockTTL / 2) defer ticker.Stop() @@ -150,10 +155,10 @@ func (bl *BlobLocker) StartLockRenewal(ctx context.Context, lockName string) { select { case <-ticker.C: if err := bl.RenewLock(bl.ctx, bl.lockName); err != nil { - log.Printf("Failed to renew lock for blob %s: %v", lockName, err) + logger.Printf("Failed to renew lock for blob %s: %v", lockName, err) } case <-ctx.Done(): - log.Printf("Stopping lock renewal for blob %s", lockName) + logger.Printf("Stopping lock renewal for blob %s", lockName) return } } @@ -183,7 +188,8 @@ func (bl *BlobLocker) GetLockedTables(tableNames []string) ([]string, error) { if strings.Contains(err.Error(), "BlobNotFound") { continue } - log.Printf("Failed to get properties for blob %s: %v\n", lockName, err) + logger := logging.GetLogger() + logger.Printf("Failed to get properties for blob %s: %v", lockName, err) continue } @@ -194,17 +200,18 @@ func (bl *BlobLocker) GetLockedTables(tableNames []string) ([]string, error) { lockAge := time.Since(*lastModified) if *leaseStatus == "locked" && *leaseState == "leased" { - log.Printf("Table %s is locked (last modified: %v, %.2f minutes ago)", + logger := logging.GetLogger() + logger.Printf("Table %s is locked (last modified: %v, %.2f minutes ago)", tableName, lastModified.Format(time.RFC3339), lockAge.Minutes()) // Only consider the table locked if the lock is less than 2 minutes old if lockAge <= 2*time.Minute { - log.Printf(" - Lock is still valid (within 2 minute TTL)") + logger.Printf(" - Lock is still valid (within 2 minute TTL)") lockedTables = append(lockedTables, lockName) } else { - log.Printf(" - Lock is stale (older than 2 minute TTL), will be broken when acquired") + logger.Printf(" - Lock is stale (older than 2 minute TTL), will be broken when acquired") } } } diff --git a/internal/ingester/ingester.go b/internal/ingester/ingester.go index 8a807f8..9e5052f 100644 --- a/internal/ingester/ingester.go +++ b/internal/ingester/ingester.go @@ -7,8 +7,10 @@ import ( "os/signal" "sync" "syscall" + "time" "github.com/katasec/dstream/internal/logging" + "github.com/katasec/dstream/internal/monitoring" "github.com/katasec/dstream/internal/cdc/locking" "github.com/katasec/dstream/internal/cdc/orchestrator" @@ -33,7 +35,8 @@ type Ingester struct { dbConn *sql.DB lockerFactory *locking.LockerFactory // cancelFunc context.CancelFunc - wg *sync.WaitGroup + wg *sync.WaitGroup + monitor *monitoring.Monitor } // NewIngester initializes the ingester, loads the configuration, and creates the locker factory. @@ -58,7 +61,7 @@ func NewIngester() *Ingester { } // Initialize LeaseDBManager - //leaseDB := locking.NewLeaseDBManager(dbConn) + monitor := monitoring.NewMonitor(time.Second * 60) // Initialize LockerFactory with config and LeaseDBManager configType := config.Ingester.Locks.Type @@ -72,6 +75,7 @@ func NewIngester() *Ingester { dbConn: dbConn, lockerFactory: lockerFactory, wg: &sync.WaitGroup{}, + monitor: monitor, } } @@ -84,14 +88,16 @@ func NewIngester() *Ingester { // 5. Sets up signal handling for graceful shutdown // // The goroutine used to start the orchestrator is critical to the design as it: -// - Allows the main thread to proceed to signal handling -// - Enables non-blocking orchestration of the monitoring process -// - Maintains the ability to propagate cancellation signals to all monitoring activities +// - Allows the main thread to proceed to signal handling +// - Enables non-blocking orchestration of the monitoring process +// - Maintains the ability to propagate cancellation signals to all monitoring activities // // The method returns only after a shutdown signal is received and processed. func (i *Ingester) Start() error { defer i.dbConn.Close() + go i.monitor.Start() + // Create a new context with cancellation ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -141,9 +147,9 @@ func (i *Ingester) Start() error { // 4. Returns only the tables that are available for monitoring // // This function plays an important role in the distributed architecture by: -// - Ensuring tables are not monitored by multiple instances simultaneously -// - Allowing the system to scale horizontally across multiple instances -// - Providing automatic work distribution among available instances +// - Ensuring tables are not monitored by multiple instances simultaneously +// - Allowing the system to scale horizontally across multiple instances +// - Providing automatic work distribution among available instances func (i *Ingester) getTablesToMonitor() []config.ResolvedTableConfig { // Get list of table names from config @@ -196,10 +202,10 @@ func (i *Ingester) getTablesToMonitor() []config.ResolvedTableConfig { // 4. Ensures all distributed locks are properly released // // This function is a critical part of the Ingester's architecture as it: -// - Provides a clean shutdown mechanism for the entire system -// - Ensures resources are properly released (particularly distributed locks) -// - Prevents resource leaks and allows other instances to take over monitoring -// - Completes the lifecycle management responsibility of the Ingester +// - Provides a clean shutdown mechanism for the entire system +// - Ensures resources are properly released (particularly distributed locks) +// - Prevents resource leaks and allows other instances to take over monitoring +// - Completes the lifecycle management responsibility of the Ingester func (i *Ingester) handleShutdown(cancel context.CancelFunc, tableService *orchestrator.TableMonitoringOrchestrator) { // Capture SIGINT and SIGTERM signals signalChan := make(chan os.Signal, 1) diff --git a/internal/logging/logger.go b/internal/logging/logger.go index fe595a3..199f313 100644 --- a/internal/logging/logger.go +++ b/internal/logging/logger.go @@ -37,6 +37,11 @@ var ( // Logger interface defines the logging methods type Logger interface { + // Standard log package compatible methods + Printf(format string, v ...any) + Println(v ...any) + + // Structured logging methods Debug(msg string, args ...any) Info(msg string, args ...any) Warn(msg string, args ...any) @@ -121,6 +126,23 @@ func formatMessage(msg string, args ...any) string { return msg } +// Printf provides compatibility with standard log.Printf +func (l *stdLogger) Printf(format string, v ...any) { + if l.logLevel <= LevelInfo { + l.info.Printf("%s[INFO]%s %s", colorGreen, colorReset, fmt.Sprintf(format, v...)) + } +} + +// Println provides compatibility with standard log.Println +func (l *stdLogger) Println(v ...any) { + if l.logLevel <= LevelInfo { + message := fmt.Sprintln(v...) + // Remove trailing newline that fmt.Sprintln adds + message = strings.TrimSuffix(message, "\n") + l.info.Printf("%s[INFO]%s %s", colorGreen, colorReset, message) + } +} + // Debug logs a debug message func (l *stdLogger) Debug(msg string, args ...any) { if l.logLevel <= LevelDebug { diff --git a/internal/monitoring/monitoring.go b/internal/monitoring/monitoring.go new file mode 100644 index 0000000..ee0f282 --- /dev/null +++ b/internal/monitoring/monitoring.go @@ -0,0 +1,36 @@ +package monitoring + +import ( + "runtime" + "time" + + "github.com/katasec/dstream/internal/logging" +) + +var log = logging.GetLogger() + +// Monitor holds the configuration for memory logging +type Monitor struct { + interval time.Duration +} + +// NewMonitor creates a new Monitoring instance and starts logging +func NewMonitor(interval time.Duration) *Monitor { + m := &Monitor{ + interval: interval, + } + + return m +} + +// Start logs memory usage and the number of goroutines periodically +func (m *Monitor) Start() { + for { + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + log.Info("Number of Goroutines:", "Num", runtime.NumGoroutine()) + log.Info("Total Memory Allocated: ", "Alloc in MB", memStats.Alloc/1024/1024) + log.Info("Total Memory System: ", "Sys in MB", memStats.Sys/1024/1024) + time.Sleep(m.interval) + } +} diff --git a/todos/todos.md b/todos/todos.md new file mode 100644 index 0000000..96a6d54 --- /dev/null +++ b/todos/todos.md @@ -0,0 +1,141 @@ +## Transient vs. Persistent Errors +Current State: The system doesn't clearly distinguish between transient errors (like network timeouts) and persistent errors (like schema mismatches). + +### Design Ideas: + +Implement an error classification system that categorizes errors as transient or persistent +For transient errors, implement exponential backoff retry logic +For persistent errors, fail fast and provide clear error messages for troubleshooting + + +## Connection Management +Current State: Database connection failures might not be properly handled in all scenarios. + +### Design Ideas: + +Implement connection pooling with health checks +Add circuit breaker pattern to prevent cascading failures when the database is under stress +Implement automatic reconnection logic with configurable retry policies + +## Error Observability +Current State: Error logging is improved but could benefit from more structured error reporting. + +### Design Ideas: + +Enhance logging with correlation IDs to track errors across components +Implement metrics collection for error rates and types +Create a dashboard for monitoring error patterns and trends +Add alerting for critical error conditions + +## Graceful Degradation +Current State: The system might completely fail if certain components are unavailable. + +### Design Ideas: + +Implement feature flags to disable problematic components without taking down the entire system +Design fallback mechanisms for critical paths +Add configurable timeout settings for all external dependencies + +## Error Recovery Workflows +Current State: Recovery from serious errors often requires manual intervention. + +Design Ideas: + +Design self-healing mechanisms for common failure scenarios +Implement a recovery workflow system that can be triggered automatically or manually +Create tools for operators to safely replay failed batches + +## Validation and Constraints +Current State: Input validation might not catch all edge cases before they cause errors. + +### Design Ideas: + +Implement schema validation at multiple levels (not just database constraints) +Add pre-condition checks before critical operations +Design better error messages that suggest remediation steps + +## Resource Management +Current State: Resource leaks might occur during error conditions. + +### Design Ideas: + +Ensure proper resource cleanup in all error paths using defer statements +Implement resource usage monitoring and alerting +Add circuit breakers for resource-intensive operations +Implementation Priority +If you decide to implement these improvements, I would suggest prioritizing: + +Transient vs. Persistent error handling (biggest impact on reliability) +Error observability (helps diagnose issues faster) +Batch processing resilience (prevents data inconsistency) +Would you like to discuss any of these areas in more detail? + + +## CDC Monitoring System: Error Handling Implementation Plan + +### Phase 1: Transient vs. Persistent Error Handling +- Create an ErrorType enum (Transient, Persistent, Unknown) +- Implement a custom error wrapper that includes error type classification +- Add retry logic with exponential backoff for transient errors +- Create helper functions to classify common errors (DB timeouts, connection issues, etc.) +- Update error returns in BatchSizer to use the new error types +- Add maximum retry configuration settings + +### Phase 2: Error Observability +- Enhance logger to include correlation IDs for tracking request flows +- Add structured error fields (component, operation, error_type) +- Implement error metrics collection (error_count, error_type, component) +- Create Prometheus/Grafana dashboards for error monitoring +- Set up alerting for critical error thresholds +- Add context propagation to ensure errors maintain their trace context + +### Phase 3: Batch Processing Resilience +- Create a batch state tracking table in the database +- Implement batch checkpointing to record progress +- Add transaction support for atomic batch operations +- Create a recovery mechanism for failed batches +- Implement idempotent processing to prevent duplicate processing +- Add batch validation before processing starts + +### Phase 4: Connection Management +- Implement connection pooling with health checks +- Add circuit breaker pattern for database operations +- Create connection retry policies with configurable parameters +- Add connection timeout configurations +- Implement graceful connection closing during shutdown + +### Phase 5: Graceful Degradation +- Add feature flags for component-level disabling +- Implement fallback mechanisms for critical paths +- Create configurable timeout settings for all external dependencies +- Add partial success handling for batch operations +- Implement priority-based processing during degraded operations + +### Phase 6: Error Recovery Workflows +- Design self-healing mechanisms for common failures +- Create an admin API for manual recovery operations +- Implement a workflow system for complex recovery scenarios +- Add tools for operators to safely replay failed batches +- Create documentation for common recovery procedures + +### Phase 7: Validation and Constraints +- Implement schema validation at application level +- Add pre-condition checks before critical operations +- Enhance error messages with remediation suggestions +- Create validation helpers for common data types +- Add boundary checks for all configurable values + +### Phase 8: Resource Management +- Audit code for proper resource cleanup in error paths +- Implement resource usage monitoring +- Add circuit breakers for resource-intensive operations +- Create resource pools for expensive resources +- Add graceful shutdown procedures to release resources + +### Immediate Next Steps (Highest Priority) +- Create the error type classification system +- Implement retry logic for transient errors +- Enhance logging with correlation IDs and structured error fields +- Add batch state tracking and checkpointing +- Implement connection pooling with health checks +