Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dstream.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ ingester {
tables_overrides {
overrides {
table_name = "Persons"
poll_interval = "1s"
max_poll_interval = "5s"
poll_interval = "10s"
max_poll_interval = "300s"
}
}
}
Expand Down
39 changes: 23 additions & 16 deletions internal/cdc/locking/blob_locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package locking
import (
"context"
"fmt"
"log"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/lease"

"github.com/katasec/dstream/internal/logging"
)

type BlobLocker struct {
Expand Down Expand Up @@ -68,7 +69,8 @@ func NewBlobLocker(connectionString, containerName, lockName string) (*BlobLocke

// AcquireLock tries to acquire a lock on the blob and stores the lease ID
func (bl *BlobLocker) AcquireLock(ctx context.Context, lockName string) (string, error) {
log.Printf("Attempting to acquire lock for blob %s", bl.lockName)
logger := logging.GetLogger()
logger.Printf("Attempting to acquire lock for blob %s", bl.lockName)

// Try to acquire lease
resp, err := bl.blobLeaseClient.AcquireLease(bl.ctx, int32(bl.lockTTL.Seconds()), nil)
Expand All @@ -85,10 +87,10 @@ func (bl *BlobLocker) AcquireLock(ctx context.Context, lockName string) (string,
// Check if the lock is older than 2 minutes
lastModified := props.LastModified
lockAge := time.Since(*lastModified)
log.Printf("Lock on %s was last modified at: %v (%.2f minutes ago)", bl.lockName, lastModified.Format(time.RFC3339), lockAge.Minutes())
logger.Printf("Lock on %s was last modified at: %v (%.2f minutes ago)", bl.lockName, lastModified.Format(time.RFC3339), lockAge.Minutes())

if lockAge > 2*time.Minute {
log.Printf("Lock on %s is older than 2 minutes (last modified: %v). Breaking lease...", bl.lockName, *lastModified)
logger.Printf("Lock on %s is older than 2 minutes (last modified: %v). Breaking lease...", bl.lockName, lastModified.Format(time.RFC3339))

// Break the lease
_, err = bl.blobLeaseClient.BreakLease(ctx, nil)
Expand All @@ -104,44 +106,47 @@ func (bl *BlobLocker) AcquireLock(ctx context.Context, lockName string) (string,
if err != nil {
return "", fmt.Errorf("failed to acquire lease after breaking for %s: %w", bl.lockName, err)
}
log.Printf("Successfully acquired lock after breaking old lease for %s", bl.lockName)
logger.Printf("Successfully acquired lock after breaking old lease for %s", bl.lockName)
return *resp.LeaseID, nil
}

log.Printf("Table %s is already locked and the lock is still valid (%.2f minutes old, within 2 minute TTL). Skipping...", bl.lockName, lockAge.Minutes())
logger.Printf("Table %s is already locked and the lock is still valid (%.2f minutes old, within 2 minute TTL). Skipping...", bl.lockName, lockAge.Minutes())
return "", nil
}
return "", fmt.Errorf("failed to acquire lock for blob %s: %w", bl.lockName, err)
}

log.Printf("Lock acquired for blob %s with Lease ID: %s", bl.lockName, *resp.LeaseID)
logger.Printf("Lock acquired for blob %s with Lease ID: %s", bl.lockName, *resp.LeaseID)
return *resp.LeaseID, nil
}

func (bl *BlobLocker) RenewLock(ctx context.Context, lockName string) error {
logger := logging.GetLogger()
_, err := bl.blobLeaseClient.RenewLease(bl.ctx, nil)
if err != nil {
return fmt.Errorf("failed to renew lock for blob %s: %w", lockName, err)
}

log.Printf("Lock renewed for blob %s", lockName)
logger.Printf("Lock renewed for blob %s", lockName)
return nil
}

// ReleaseLock releases the lock associated with the provided lease ID for the specified blob (lockName)
func (bl *BlobLocker) ReleaseLock(tx context.Context, lockName string, leaseID string) error {

logger := logging.GetLogger()
_, err := bl.blobLeaseClient.ReleaseLease(bl.ctx, &lease.BlobReleaseOptions{})
if err != nil {
return fmt.Errorf("failed to release lock for blob %s: %w", bl.lockName, err)
} else {
log.Printf("Lock released successfully for blob %s !\n", bl.lockName)
logger.Printf("Lock released successfully for blob %s", bl.lockName)
}
return nil
}

func (bl *BlobLocker) StartLockRenewal(ctx context.Context, lockName string) {
log.Printf("Starting lock renewal for blob %s", lockName)
logger := logging.GetLogger()
logger.Printf("Starting lock renewal for blob %s", lockName)
go func() {
ticker := time.NewTicker(bl.lockTTL / 2)
defer ticker.Stop()
Expand All @@ -150,10 +155,10 @@ func (bl *BlobLocker) StartLockRenewal(ctx context.Context, lockName string) {
select {
case <-ticker.C:
if err := bl.RenewLock(bl.ctx, bl.lockName); err != nil {
log.Printf("Failed to renew lock for blob %s: %v", lockName, err)
logger.Printf("Failed to renew lock for blob %s: %v", lockName, err)
}
case <-ctx.Done():
log.Printf("Stopping lock renewal for blob %s", lockName)
logger.Printf("Stopping lock renewal for blob %s", lockName)
return
}
}
Expand Down Expand Up @@ -183,7 +188,8 @@ func (bl *BlobLocker) GetLockedTables(tableNames []string) ([]string, error) {
if strings.Contains(err.Error(), "BlobNotFound") {
continue
}
log.Printf("Failed to get properties for blob %s: %v\n", lockName, err)
logger := logging.GetLogger()
logger.Printf("Failed to get properties for blob %s: %v", lockName, err)
continue
}

Expand All @@ -194,17 +200,18 @@ func (bl *BlobLocker) GetLockedTables(tableNames []string) ([]string, error) {
lockAge := time.Since(*lastModified)

if *leaseStatus == "locked" && *leaseState == "leased" {
log.Printf("Table %s is locked (last modified: %v, %.2f minutes ago)",
logger := logging.GetLogger()
logger.Printf("Table %s is locked (last modified: %v, %.2f minutes ago)",
tableName,
lastModified.Format(time.RFC3339),
lockAge.Minutes())

// Only consider the table locked if the lock is less than 2 minutes old
if lockAge <= 2*time.Minute {
log.Printf(" - Lock is still valid (within 2 minute TTL)")
logger.Printf(" - Lock is still valid (within 2 minute TTL)")
lockedTables = append(lockedTables, lockName)
} else {
log.Printf(" - Lock is stale (older than 2 minute TTL), will be broken when acquired")
logger.Printf(" - Lock is stale (older than 2 minute TTL), will be broken when acquired")
}
}
}
Expand Down
30 changes: 18 additions & 12 deletions internal/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"os/signal"
"sync"
"syscall"
"time"

"github.com/katasec/dstream/internal/logging"
"github.com/katasec/dstream/internal/monitoring"

"github.com/katasec/dstream/internal/cdc/locking"
"github.com/katasec/dstream/internal/cdc/orchestrator"
Expand All @@ -33,7 +35,8 @@ type Ingester struct {
dbConn *sql.DB
lockerFactory *locking.LockerFactory
// cancelFunc context.CancelFunc
wg *sync.WaitGroup
wg *sync.WaitGroup
monitor *monitoring.Monitor
}

// NewIngester initializes the ingester, loads the configuration, and creates the locker factory.
Expand All @@ -58,7 +61,7 @@ func NewIngester() *Ingester {
}

// Initialize LeaseDBManager
//leaseDB := locking.NewLeaseDBManager(dbConn)
monitor := monitoring.NewMonitor(time.Second * 60)

// Initialize LockerFactory with config and LeaseDBManager
configType := config.Ingester.Locks.Type
Expand All @@ -72,6 +75,7 @@ func NewIngester() *Ingester {
dbConn: dbConn,
lockerFactory: lockerFactory,
wg: &sync.WaitGroup{},
monitor: monitor,
}
}

Expand All @@ -84,14 +88,16 @@ func NewIngester() *Ingester {
// 5. Sets up signal handling for graceful shutdown
//
// The goroutine used to start the orchestrator is critical to the design as it:
// - Allows the main thread to proceed to signal handling
// - Enables non-blocking orchestration of the monitoring process
// - Maintains the ability to propagate cancellation signals to all monitoring activities
// - Allows the main thread to proceed to signal handling
// - Enables non-blocking orchestration of the monitoring process
// - Maintains the ability to propagate cancellation signals to all monitoring activities
//
// The method returns only after a shutdown signal is received and processed.
func (i *Ingester) Start() error {
defer i.dbConn.Close()

go i.monitor.Start()

// Create a new context with cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -141,9 +147,9 @@ func (i *Ingester) Start() error {
// 4. Returns only the tables that are available for monitoring
//
// This function plays an important role in the distributed architecture by:
// - Ensuring tables are not monitored by multiple instances simultaneously
// - Allowing the system to scale horizontally across multiple instances
// - Providing automatic work distribution among available instances
// - Ensuring tables are not monitored by multiple instances simultaneously
// - Allowing the system to scale horizontally across multiple instances
// - Providing automatic work distribution among available instances
func (i *Ingester) getTablesToMonitor() []config.ResolvedTableConfig {

// Get list of table names from config
Expand Down Expand Up @@ -196,10 +202,10 @@ func (i *Ingester) getTablesToMonitor() []config.ResolvedTableConfig {
// 4. Ensures all distributed locks are properly released
//
// This function is a critical part of the Ingester's architecture as it:
// - Provides a clean shutdown mechanism for the entire system
// - Ensures resources are properly released (particularly distributed locks)
// - Prevents resource leaks and allows other instances to take over monitoring
// - Completes the lifecycle management responsibility of the Ingester
// - Provides a clean shutdown mechanism for the entire system
// - Ensures resources are properly released (particularly distributed locks)
// - Prevents resource leaks and allows other instances to take over monitoring
// - Completes the lifecycle management responsibility of the Ingester
func (i *Ingester) handleShutdown(cancel context.CancelFunc, tableService *orchestrator.TableMonitoringOrchestrator) {
// Capture SIGINT and SIGTERM signals
signalChan := make(chan os.Signal, 1)
Expand Down
22 changes: 22 additions & 0 deletions internal/logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ var (

// Logger interface defines the logging methods
type Logger interface {
// Standard log package compatible methods
Printf(format string, v ...any)
Println(v ...any)

// Structured logging methods
Debug(msg string, args ...any)
Info(msg string, args ...any)
Warn(msg string, args ...any)
Expand Down Expand Up @@ -121,6 +126,23 @@ func formatMessage(msg string, args ...any) string {
return msg
}

// Printf provides compatibility with standard log.Printf
func (l *stdLogger) Printf(format string, v ...any) {
if l.logLevel <= LevelInfo {
l.info.Printf("%s[INFO]%s %s", colorGreen, colorReset, fmt.Sprintf(format, v...))
}
}

// Println provides compatibility with standard log.Println
func (l *stdLogger) Println(v ...any) {
if l.logLevel <= LevelInfo {
message := fmt.Sprintln(v...)
// Remove trailing newline that fmt.Sprintln adds
message = strings.TrimSuffix(message, "\n")
l.info.Printf("%s[INFO]%s %s", colorGreen, colorReset, message)
}
}

// Debug logs a debug message
func (l *stdLogger) Debug(msg string, args ...any) {
if l.logLevel <= LevelDebug {
Expand Down
36 changes: 36 additions & 0 deletions internal/monitoring/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package monitoring

import (
"runtime"
"time"

"github.com/katasec/dstream/internal/logging"
)

var log = logging.GetLogger()

// Monitor holds the configuration for memory logging
type Monitor struct {
interval time.Duration
}

// NewMonitor creates a new Monitoring instance and starts logging
func NewMonitor(interval time.Duration) *Monitor {
m := &Monitor{
interval: interval,
}

return m
}

// Start logs memory usage and the number of goroutines periodically
func (m *Monitor) Start() {
for {
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
log.Info("Number of Goroutines:", "Num", runtime.NumGoroutine())
log.Info("Total Memory Allocated: ", "Alloc in MB", memStats.Alloc/1024/1024)
log.Info("Total Memory System: ", "Sys in MB", memStats.Sys/1024/1024)
time.Sleep(m.interval)
}
}
Loading