From 535a7a55c48d2ce4a0ffff0603c9d5cbbbc81833 Mon Sep 17 00:00:00 2001 From: Ameer Deen Date: Sun, 2 Mar 2025 17:06:03 +0400 Subject: [PATCH 1/7] updating some logging statements --- internal/cdc/locking/blob_locker.go | 45 +++++++++++++++++------------ 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/internal/cdc/locking/blob_locker.go b/internal/cdc/locking/blob_locker.go index 627f04d..60d54aa 100644 --- a/internal/cdc/locking/blob_locker.go +++ b/internal/cdc/locking/blob_locker.go @@ -3,13 +3,14 @@ package locking import ( "context" "fmt" - "log" "strings" "time" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/lease" + + "github.com/katasec/dstream/internal/logging" ) type BlobLocker struct { @@ -68,7 +69,8 @@ func NewBlobLocker(connectionString, containerName, lockName string) (*BlobLocke // AcquireLock tries to acquire a lock on the blob and stores the lease ID func (bl *BlobLocker) AcquireLock(ctx context.Context, lockName string) (string, error) { - log.Printf("Attempting to acquire lock for blob %s", bl.lockName) + logger := logging.GetLogger() + logger.Info("Attempting to acquire lock for blob", "lockName", bl.lockName) // Try to acquire lease resp, err := bl.blobLeaseClient.AcquireLease(bl.ctx, int32(bl.lockTTL.Seconds()), nil) @@ -85,10 +87,10 @@ func (bl *BlobLocker) AcquireLock(ctx context.Context, lockName string) (string, // Check if the lock is older than 2 minutes lastModified := props.LastModified lockAge := time.Since(*lastModified) - log.Printf("Lock on %s was last modified at: %v (%.2f minutes ago)", bl.lockName, lastModified.Format(time.RFC3339), lockAge.Minutes()) + logger.Info("Lock was last modified", "lockName", bl.lockName, "lastModified", lastModified.Format(time.RFC3339), "ageMinutes", lockAge.Minutes()) if lockAge > 2*time.Minute { - log.Printf("Lock on %s is older than 2 minutes (last modified: %v). Breaking lease...", bl.lockName, *lastModified) + logger.Info("Lock is older than 2 minutes, breaking lease", "lockName", bl.lockName, "lastModified", lastModified.Format(time.RFC3339)) // Break the lease _, err = bl.blobLeaseClient.BreakLease(ctx, nil) @@ -104,44 +106,47 @@ func (bl *BlobLocker) AcquireLock(ctx context.Context, lockName string) (string, if err != nil { return "", fmt.Errorf("failed to acquire lease after breaking for %s: %w", bl.lockName, err) } - log.Printf("Successfully acquired lock after breaking old lease for %s", bl.lockName) + logger.Info("Successfully acquired lock after breaking old lease", "lockName", bl.lockName) return *resp.LeaseID, nil } - log.Printf("Table %s is already locked and the lock is still valid (%.2f minutes old, within 2 minute TTL). Skipping...", bl.lockName, lockAge.Minutes()) + logger.Info("Table is already locked and the lock is still valid", "lockName", bl.lockName, "ageMinutes", lockAge.Minutes(), "ttlMinutes", 2) return "", nil } return "", fmt.Errorf("failed to acquire lock for blob %s: %w", bl.lockName, err) } - log.Printf("Lock acquired for blob %s with Lease ID: %s", bl.lockName, *resp.LeaseID) + logger.Info("Lock acquired for blob", "lockName", bl.lockName, "leaseID", *resp.LeaseID) return *resp.LeaseID, nil } func (bl *BlobLocker) RenewLock(ctx context.Context, lockName string) error { + logger := logging.GetLogger() _, err := bl.blobLeaseClient.RenewLease(bl.ctx, nil) if err != nil { return fmt.Errorf("failed to renew lock for blob %s: %w", lockName, err) } - log.Printf("Lock renewed for blob %s", lockName) + logger.Info("Lock renewed for blob", "lockName", lockName) return nil } // ReleaseLock releases the lock associated with the provided lease ID for the specified blob (lockName) func (bl *BlobLocker) ReleaseLock(tx context.Context, lockName string, leaseID string) error { + logger := logging.GetLogger() _, err := bl.blobLeaseClient.ReleaseLease(bl.ctx, &lease.BlobReleaseOptions{}) if err != nil { return fmt.Errorf("failed to release lock for blob %s: %w", bl.lockName, err) } else { - log.Printf("Lock released successfully for blob %s !\n", bl.lockName) + logger.Info("Lock released successfully for blob", "lockName", bl.lockName) } return nil } func (bl *BlobLocker) StartLockRenewal(ctx context.Context, lockName string) { - log.Printf("Starting lock renewal for blob %s", lockName) + logger := logging.GetLogger() + logger.Info("Starting lock renewal for blob", "lockName", lockName) go func() { ticker := time.NewTicker(bl.lockTTL / 2) defer ticker.Stop() @@ -150,10 +155,10 @@ func (bl *BlobLocker) StartLockRenewal(ctx context.Context, lockName string) { select { case <-ticker.C: if err := bl.RenewLock(bl.ctx, bl.lockName); err != nil { - log.Printf("Failed to renew lock for blob %s: %v", lockName, err) + logger.Error("Failed to renew lock for blob", "lockName", lockName, "error", err) } case <-ctx.Done(): - log.Printf("Stopping lock renewal for blob %s", lockName) + logger.Info("Stopping lock renewal for blob", "lockName", lockName) return } } @@ -183,7 +188,8 @@ func (bl *BlobLocker) GetLockedTables(tableNames []string) ([]string, error) { if strings.Contains(err.Error(), "BlobNotFound") { continue } - log.Printf("Failed to get properties for blob %s: %v\n", lockName, err) + logger := logging.GetLogger() + logger.Error("Failed to get properties for blob", "lockName", lockName, "error", err) continue } @@ -194,17 +200,18 @@ func (bl *BlobLocker) GetLockedTables(tableNames []string) ([]string, error) { lockAge := time.Since(*lastModified) if *leaseStatus == "locked" && *leaseState == "leased" { - log.Printf("Table %s is locked (last modified: %v, %.2f minutes ago)", - tableName, - lastModified.Format(time.RFC3339), - lockAge.Minutes()) + logger := logging.GetLogger() + logger.Info("Table is locked", + "tableName", tableName, + "lastModified", lastModified.Format(time.RFC3339), + "ageMinutes", lockAge.Minutes()) // Only consider the table locked if the lock is less than 2 minutes old if lockAge <= 2*time.Minute { - log.Printf(" - Lock is still valid (within 2 minute TTL)") + logger.Info("Lock is still valid", "ttlMinutes", 2) lockedTables = append(lockedTables, lockName) } else { - log.Printf(" - Lock is stale (older than 2 minute TTL), will be broken when acquired") + logger.Info("Lock is stale, will be broken when acquired", "ttlMinutes", 2) } } } From 730aa71ba9ca7c248af0f3feae6d33e20365f04d Mon Sep 17 00:00:00 2001 From: Ameer Deen Date: Sun, 2 Mar 2025 17:11:51 +0400 Subject: [PATCH 2/7] Changed logger signature --- internal/cdc/locking/blob_locker.go | 36 ++++++++++++++--------------- internal/logging/logger.go | 22 ++++++++++++++++++ 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/internal/cdc/locking/blob_locker.go b/internal/cdc/locking/blob_locker.go index 60d54aa..bec486e 100644 --- a/internal/cdc/locking/blob_locker.go +++ b/internal/cdc/locking/blob_locker.go @@ -70,7 +70,7 @@ func NewBlobLocker(connectionString, containerName, lockName string) (*BlobLocke // AcquireLock tries to acquire a lock on the blob and stores the lease ID func (bl *BlobLocker) AcquireLock(ctx context.Context, lockName string) (string, error) { logger := logging.GetLogger() - logger.Info("Attempting to acquire lock for blob", "lockName", bl.lockName) + logger.Printf("Attempting to acquire lock for blob %s", bl.lockName) // Try to acquire lease resp, err := bl.blobLeaseClient.AcquireLease(bl.ctx, int32(bl.lockTTL.Seconds()), nil) @@ -87,10 +87,10 @@ func (bl *BlobLocker) AcquireLock(ctx context.Context, lockName string) (string, // Check if the lock is older than 2 minutes lastModified := props.LastModified lockAge := time.Since(*lastModified) - logger.Info("Lock was last modified", "lockName", bl.lockName, "lastModified", lastModified.Format(time.RFC3339), "ageMinutes", lockAge.Minutes()) + logger.Printf("Lock on %s was last modified at: %v (%.2f minutes ago)", bl.lockName, lastModified.Format(time.RFC3339), lockAge.Minutes()) if lockAge > 2*time.Minute { - logger.Info("Lock is older than 2 minutes, breaking lease", "lockName", bl.lockName, "lastModified", lastModified.Format(time.RFC3339)) + logger.Printf("Lock on %s is older than 2 minutes (last modified: %v). Breaking lease...", bl.lockName, lastModified.Format(time.RFC3339)) // Break the lease _, err = bl.blobLeaseClient.BreakLease(ctx, nil) @@ -106,17 +106,17 @@ func (bl *BlobLocker) AcquireLock(ctx context.Context, lockName string) (string, if err != nil { return "", fmt.Errorf("failed to acquire lease after breaking for %s: %w", bl.lockName, err) } - logger.Info("Successfully acquired lock after breaking old lease", "lockName", bl.lockName) + logger.Printf("Successfully acquired lock after breaking old lease for %s", bl.lockName) return *resp.LeaseID, nil } - logger.Info("Table is already locked and the lock is still valid", "lockName", bl.lockName, "ageMinutes", lockAge.Minutes(), "ttlMinutes", 2) + logger.Printf("Table %s is already locked and the lock is still valid (%.2f minutes old, within 2 minute TTL). Skipping...", bl.lockName, lockAge.Minutes()) return "", nil } return "", fmt.Errorf("failed to acquire lock for blob %s: %w", bl.lockName, err) } - logger.Info("Lock acquired for blob", "lockName", bl.lockName, "leaseID", *resp.LeaseID) + logger.Printf("Lock acquired for blob %s with Lease ID: %s", bl.lockName, *resp.LeaseID) return *resp.LeaseID, nil } @@ -127,7 +127,7 @@ func (bl *BlobLocker) RenewLock(ctx context.Context, lockName string) error { return fmt.Errorf("failed to renew lock for blob %s: %w", lockName, err) } - logger.Info("Lock renewed for blob", "lockName", lockName) + logger.Printf("Lock renewed for blob %s", lockName) return nil } @@ -139,14 +139,14 @@ func (bl *BlobLocker) ReleaseLock(tx context.Context, lockName string, leaseID s if err != nil { return fmt.Errorf("failed to release lock for blob %s: %w", bl.lockName, err) } else { - logger.Info("Lock released successfully for blob", "lockName", bl.lockName) + logger.Printf("Lock released successfully for blob %s", bl.lockName) } return nil } func (bl *BlobLocker) StartLockRenewal(ctx context.Context, lockName string) { logger := logging.GetLogger() - logger.Info("Starting lock renewal for blob", "lockName", lockName) + logger.Printf("Starting lock renewal for blob %s", lockName) go func() { ticker := time.NewTicker(bl.lockTTL / 2) defer ticker.Stop() @@ -155,10 +155,10 @@ func (bl *BlobLocker) StartLockRenewal(ctx context.Context, lockName string) { select { case <-ticker.C: if err := bl.RenewLock(bl.ctx, bl.lockName); err != nil { - logger.Error("Failed to renew lock for blob", "lockName", lockName, "error", err) + logger.Printf("Failed to renew lock for blob %s: %v", lockName, err) } case <-ctx.Done(): - logger.Info("Stopping lock renewal for blob", "lockName", lockName) + logger.Printf("Stopping lock renewal for blob %s", lockName) return } } @@ -189,7 +189,7 @@ func (bl *BlobLocker) GetLockedTables(tableNames []string) ([]string, error) { continue } logger := logging.GetLogger() - logger.Error("Failed to get properties for blob", "lockName", lockName, "error", err) + logger.Printf("Failed to get properties for blob %s: %v", lockName, err) continue } @@ -201,17 +201,17 @@ func (bl *BlobLocker) GetLockedTables(tableNames []string) ([]string, error) { if *leaseStatus == "locked" && *leaseState == "leased" { logger := logging.GetLogger() - logger.Info("Table is locked", - "tableName", tableName, - "lastModified", lastModified.Format(time.RFC3339), - "ageMinutes", lockAge.Minutes()) + logger.Printf("Table %s is locked (last modified: %v, %.2f minutes ago)", + tableName, + lastModified.Format(time.RFC3339), + lockAge.Minutes()) // Only consider the table locked if the lock is less than 2 minutes old if lockAge <= 2*time.Minute { - logger.Info("Lock is still valid", "ttlMinutes", 2) + logger.Printf(" - Lock is still valid (within 2 minute TTL)") lockedTables = append(lockedTables, lockName) } else { - logger.Info("Lock is stale, will be broken when acquired", "ttlMinutes", 2) + logger.Printf(" - Lock is stale (older than 2 minute TTL), will be broken when acquired") } } } diff --git a/internal/logging/logger.go b/internal/logging/logger.go index fe595a3..199f313 100644 --- a/internal/logging/logger.go +++ b/internal/logging/logger.go @@ -37,6 +37,11 @@ var ( // Logger interface defines the logging methods type Logger interface { + // Standard log package compatible methods + Printf(format string, v ...any) + Println(v ...any) + + // Structured logging methods Debug(msg string, args ...any) Info(msg string, args ...any) Warn(msg string, args ...any) @@ -121,6 +126,23 @@ func formatMessage(msg string, args ...any) string { return msg } +// Printf provides compatibility with standard log.Printf +func (l *stdLogger) Printf(format string, v ...any) { + if l.logLevel <= LevelInfo { + l.info.Printf("%s[INFO]%s %s", colorGreen, colorReset, fmt.Sprintf(format, v...)) + } +} + +// Println provides compatibility with standard log.Println +func (l *stdLogger) Println(v ...any) { + if l.logLevel <= LevelInfo { + message := fmt.Sprintln(v...) + // Remove trailing newline that fmt.Sprintln adds + message = strings.TrimSuffix(message, "\n") + l.info.Printf("%s[INFO]%s %s", colorGreen, colorReset, message) + } +} + // Debug logs a debug message func (l *stdLogger) Debug(msg string, args ...any) { if l.logLevel <= LevelDebug { From 0f65d1ead5f0010a52a05cd2ac8f32796f7c384a Mon Sep 17 00:00:00 2001 From: Ameer Deen Date: Sun, 2 Mar 2025 22:22:46 +0400 Subject: [PATCH 3/7] Adding some todos for future --- todos/todos.md | 149 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 todos/todos.md diff --git a/todos/todos.md b/todos/todos.md new file mode 100644 index 0000000..e4b72d3 --- /dev/null +++ b/todos/todos.md @@ -0,0 +1,149 @@ +## Transient vs. Persistent Errors +Current State: The system doesn't clearly distinguish between transient errors (like network timeouts) and persistent errors (like schema mismatches). + +### Design Ideas: + +Implement an error classification system that categorizes errors as transient or persistent +For transient errors, implement exponential backoff retry logic +For persistent errors, fail fast and provide clear error messages for troubleshooting + +## Batch Processing Resilience +Current State: If a batch fails partway through processing, there's potential for partial updates. + +### Design Ideas: + +Implement a two-phase commit pattern where changes are staged before being committed +Track batch processing state in a separate table to allow for resuming after failures +Consider implementing compensating transactions for rollback scenarios + + +## Connection Management +Current State: Database connection failures might not be properly handled in all scenarios. + +### Design Ideas: + +Implement connection pooling with health checks +Add circuit breaker pattern to prevent cascading failures when the database is under stress +Implement automatic reconnection logic with configurable retry policies + +## Error Observability +Current State: Error logging is improved but could benefit from more structured error reporting. + +### Design Ideas: + +Enhance logging with correlation IDs to track errors across components +Implement metrics collection for error rates and types +Create a dashboard for monitoring error patterns and trends +Add alerting for critical error conditions + +## Graceful Degradation +Current State: The system might completely fail if certain components are unavailable. + +### Design Ideas: + +Implement feature flags to disable problematic components without taking down the entire system +Design fallback mechanisms for critical paths +Add configurable timeout settings for all external dependencies + +## Error Recovery Workflows +Current State: Recovery from serious errors often requires manual intervention. + +Design Ideas: + +Design self-healing mechanisms for common failure scenarios +Implement a recovery workflow system that can be triggered automatically or manually +Create tools for operators to safely replay failed batches + +## Validation and Constraints +Current State: Input validation might not catch all edge cases before they cause errors. + +### Design Ideas: + +Implement schema validation at multiple levels (not just database constraints) +Add pre-condition checks before critical operations +Design better error messages that suggest remediation steps + +## Resource Management +Current State: Resource leaks might occur during error conditions. + +### Design Ideas: + +Ensure proper resource cleanup in all error paths using defer statements +Implement resource usage monitoring and alerting +Add circuit breakers for resource-intensive operations +Implementation Priority +If you decide to implement these improvements, I would suggest prioritizing: + +Transient vs. Persistent error handling (biggest impact on reliability) +Error observability (helps diagnose issues faster) +Batch processing resilience (prevents data inconsistency) +Would you like to discuss any of these areas in more detail? + + +## CDC Monitoring System: Error Handling Implementation Plan + +### Phase 1: Transient vs. Persistent Error Handling +- Create an ErrorType enum (Transient, Persistent, Unknown) +- Implement a custom error wrapper that includes error type classification +- Add retry logic with exponential backoff for transient errors +- Create helper functions to classify common errors (DB timeouts, connection issues, etc.) +- Update error returns in BatchSizer to use the new error types +- Add maximum retry configuration settings + +### Phase 2: Error Observability +- Enhance logger to include correlation IDs for tracking request flows +- Add structured error fields (component, operation, error_type) +- Implement error metrics collection (error_count, error_type, component) +- Create Prometheus/Grafana dashboards for error monitoring +- Set up alerting for critical error thresholds +- Add context propagation to ensure errors maintain their trace context + +### Phase 3: Batch Processing Resilience +- Create a batch state tracking table in the database +- Implement batch checkpointing to record progress +- Add transaction support for atomic batch operations +- Create a recovery mechanism for failed batches +- Implement idempotent processing to prevent duplicate processing +- Add batch validation before processing starts + +### Phase 4: Connection Management +- Implement connection pooling with health checks +- Add circuit breaker pattern for database operations +- Create connection retry policies with configurable parameters +- Add connection timeout configurations +- Implement graceful connection closing during shutdown + +### Phase 5: Graceful Degradation +- Add feature flags for component-level disabling +- Implement fallback mechanisms for critical paths +- Create configurable timeout settings for all external dependencies +- Add partial success handling for batch operations +- Implement priority-based processing during degraded operations + +### Phase 6: Error Recovery Workflows +- Design self-healing mechanisms for common failures +- Create an admin API for manual recovery operations +- Implement a workflow system for complex recovery scenarios +- Add tools for operators to safely replay failed batches +- Create documentation for common recovery procedures + +### Phase 7: Validation and Constraints +- Implement schema validation at application level +- Add pre-condition checks before critical operations +- Enhance error messages with remediation suggestions +- Create validation helpers for common data types +- Add boundary checks for all configurable values + +### Phase 8: Resource Management +- Audit code for proper resource cleanup in error paths +- Implement resource usage monitoring +- Add circuit breakers for resource-intensive operations +- Create resource pools for expensive resources +- Add graceful shutdown procedures to release resources + +### Immediate Next Steps (Highest Priority) +- Create the error type classification system +- Implement retry logic for transient errors +- Enhance logging with correlation IDs and structured error fields +- Add batch state tracking and checkpointing +- Implement connection pooling with health checks \ No newline at end of file From 25224581d3e1623b22d975e5dff2a8b1f85c29ba Mon Sep 17 00:00:00 2001 From: Ameer Deen Date: Sun, 2 Mar 2025 22:26:15 +0400 Subject: [PATCH 4/7] Adding some todos for future --- todos/todos.md | 9 --------- 1 file changed, 9 deletions(-) diff --git a/todos/todos.md b/todos/todos.md index e4b72d3..182814e 100644 --- a/todos/todos.md +++ b/todos/todos.md @@ -7,15 +7,6 @@ Implement an error classification system that categorizes errors as transient or For transient errors, implement exponential backoff retry logic For persistent errors, fail fast and provide clear error messages for troubleshooting -## Batch Processing Resilience -Current State: If a batch fails partway through processing, there's potential for partial updates. - -### Design Ideas: - -Implement a two-phase commit pattern where changes are staged before being committed -Track batch processing state in a separate table to allow for resuming after failures -Consider implementing compensating transactions for rollback scenarios - ## Connection Management Current State: Database connection failures might not be properly handled in all scenarios. From d64cd489812ebb423ed624a370b367c2a2a5d87d Mon Sep 17 00:00:00 2001 From: Ameer Deen Date: Sun, 2 Mar 2025 22:54:55 +0400 Subject: [PATCH 5/7] updating some logging statements --- internal/ingester/ingester.go | 30 ++++++++++++++---------- internal/monitoring/monitoring.go | 39 +++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 12 deletions(-) create mode 100644 internal/monitoring/monitoring.go diff --git a/internal/ingester/ingester.go b/internal/ingester/ingester.go index 8a807f8..9e5052f 100644 --- a/internal/ingester/ingester.go +++ b/internal/ingester/ingester.go @@ -7,8 +7,10 @@ import ( "os/signal" "sync" "syscall" + "time" "github.com/katasec/dstream/internal/logging" + "github.com/katasec/dstream/internal/monitoring" "github.com/katasec/dstream/internal/cdc/locking" "github.com/katasec/dstream/internal/cdc/orchestrator" @@ -33,7 +35,8 @@ type Ingester struct { dbConn *sql.DB lockerFactory *locking.LockerFactory // cancelFunc context.CancelFunc - wg *sync.WaitGroup + wg *sync.WaitGroup + monitor *monitoring.Monitor } // NewIngester initializes the ingester, loads the configuration, and creates the locker factory. @@ -58,7 +61,7 @@ func NewIngester() *Ingester { } // Initialize LeaseDBManager - //leaseDB := locking.NewLeaseDBManager(dbConn) + monitor := monitoring.NewMonitor(time.Second * 60) // Initialize LockerFactory with config and LeaseDBManager configType := config.Ingester.Locks.Type @@ -72,6 +75,7 @@ func NewIngester() *Ingester { dbConn: dbConn, lockerFactory: lockerFactory, wg: &sync.WaitGroup{}, + monitor: monitor, } } @@ -84,14 +88,16 @@ func NewIngester() *Ingester { // 5. Sets up signal handling for graceful shutdown // // The goroutine used to start the orchestrator is critical to the design as it: -// - Allows the main thread to proceed to signal handling -// - Enables non-blocking orchestration of the monitoring process -// - Maintains the ability to propagate cancellation signals to all monitoring activities +// - Allows the main thread to proceed to signal handling +// - Enables non-blocking orchestration of the monitoring process +// - Maintains the ability to propagate cancellation signals to all monitoring activities // // The method returns only after a shutdown signal is received and processed. func (i *Ingester) Start() error { defer i.dbConn.Close() + go i.monitor.Start() + // Create a new context with cancellation ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -141,9 +147,9 @@ func (i *Ingester) Start() error { // 4. Returns only the tables that are available for monitoring // // This function plays an important role in the distributed architecture by: -// - Ensuring tables are not monitored by multiple instances simultaneously -// - Allowing the system to scale horizontally across multiple instances -// - Providing automatic work distribution among available instances +// - Ensuring tables are not monitored by multiple instances simultaneously +// - Allowing the system to scale horizontally across multiple instances +// - Providing automatic work distribution among available instances func (i *Ingester) getTablesToMonitor() []config.ResolvedTableConfig { // Get list of table names from config @@ -196,10 +202,10 @@ func (i *Ingester) getTablesToMonitor() []config.ResolvedTableConfig { // 4. Ensures all distributed locks are properly released // // This function is a critical part of the Ingester's architecture as it: -// - Provides a clean shutdown mechanism for the entire system -// - Ensures resources are properly released (particularly distributed locks) -// - Prevents resource leaks and allows other instances to take over monitoring -// - Completes the lifecycle management responsibility of the Ingester +// - Provides a clean shutdown mechanism for the entire system +// - Ensures resources are properly released (particularly distributed locks) +// - Prevents resource leaks and allows other instances to take over monitoring +// - Completes the lifecycle management responsibility of the Ingester func (i *Ingester) handleShutdown(cancel context.CancelFunc, tableService *orchestrator.TableMonitoringOrchestrator) { // Capture SIGINT and SIGTERM signals signalChan := make(chan os.Signal, 1) diff --git a/internal/monitoring/monitoring.go b/internal/monitoring/monitoring.go new file mode 100644 index 0000000..7b9e97e --- /dev/null +++ b/internal/monitoring/monitoring.go @@ -0,0 +1,39 @@ +package monitoring + +import ( + "runtime" + "time" + + "github.com/katasec/dstream/internal/logging" +) + +var log = logging.GetLogger() + +// Monitor holds the configuration for memory logging +type Monitor struct { + interval time.Duration +} + +// NewMonitor creates a new Monitoring instance and starts logging +func NewMonitor(interval time.Duration) *Monitor { + m := &Monitor{ + interval: interval, + } + + // Start memory logging in a separate goroutine + go m.Start() + + return m +} + +// Start logs memory usage and the number of goroutines periodically +func (m *Monitor) Start() { + for { + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + log.Info("Number of Goroutines:", "Num", runtime.NumGoroutine()) + log.Info("Total Memory Allocated: ", "Alloc", memStats.Alloc/1024/1024, "MB") + log.Info("Total Memory System: ", "Sys", memStats.Sys/1024/1024, "MB") + time.Sleep(m.interval) + } +} From a56237d151e949720b459e697046d6d36839d3f1 Mon Sep 17 00:00:00 2001 From: Ameer Deen Date: Sun, 2 Mar 2025 23:00:40 +0400 Subject: [PATCH 6/7] Adding some monitoring debug statements --- dstream.hcl | 4 ++-- internal/monitoring/monitoring.go | 7 ++----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/dstream.hcl b/dstream.hcl index 70ed245..6f69011 100644 --- a/dstream.hcl +++ b/dstream.hcl @@ -33,8 +33,8 @@ ingester { tables_overrides { overrides { table_name = "Persons" - poll_interval = "1s" - max_poll_interval = "5s" + poll_interval = "10s" + max_poll_interval = "300s" } } } diff --git a/internal/monitoring/monitoring.go b/internal/monitoring/monitoring.go index 7b9e97e..ee0f282 100644 --- a/internal/monitoring/monitoring.go +++ b/internal/monitoring/monitoring.go @@ -20,9 +20,6 @@ func NewMonitor(interval time.Duration) *Monitor { interval: interval, } - // Start memory logging in a separate goroutine - go m.Start() - return m } @@ -32,8 +29,8 @@ func (m *Monitor) Start() { var memStats runtime.MemStats runtime.ReadMemStats(&memStats) log.Info("Number of Goroutines:", "Num", runtime.NumGoroutine()) - log.Info("Total Memory Allocated: ", "Alloc", memStats.Alloc/1024/1024, "MB") - log.Info("Total Memory System: ", "Sys", memStats.Sys/1024/1024, "MB") + log.Info("Total Memory Allocated: ", "Alloc in MB", memStats.Alloc/1024/1024) + log.Info("Total Memory System: ", "Sys in MB", memStats.Sys/1024/1024) time.Sleep(m.interval) } } From 4fa4564c8603712d3d2e68b5cf65efad75dc071f Mon Sep 17 00:00:00 2001 From: Ameer Deen Date: Mon, 3 Mar 2025 22:19:04 +0400 Subject: [PATCH 7/7] Updating docs --- todos/todos.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/todos/todos.md b/todos/todos.md index 182814e..96a6d54 100644 --- a/todos/todos.md +++ b/todos/todos.md @@ -137,4 +137,5 @@ Would you like to discuss any of these areas in more detail? - Implement retry logic for transient errors - Enhance logging with correlation IDs and structured error fields - Add batch state tracking and checkpointing -- Implement connection pooling with health checks \ No newline at end of file +- Implement connection pooling with health checks +