diff --git a/cdc/checkpoint_manager.go b/cdc/checkpoint_manager.go index ca9b616..1ef33b5 100644 --- a/cdc/checkpoint_manager.go +++ b/cdc/checkpoint_manager.go @@ -8,35 +8,46 @@ import ( "log" ) +// Default checkpoint table name +const defaultCheckpointTableName = "cdc_offsets" + // CheckpointManager manages checkpoint (LSN) persistence type CheckpointManager struct { - dbConn *sql.DB - tableName string + dbConn *sql.DB + tableName string + checkpointTable string } // NewCheckpointManager initializes a new CheckpointManager -func NewCheckpointManager(dbConn *sql.DB, tableName string) *CheckpointManager { +func NewCheckpointManager(dbConn *sql.DB, tableName string, checkpointTableName ...string) *CheckpointManager { + // Use provided checkpoint table name if supplied; otherwise, use default + cpTable := defaultCheckpointTableName + if len(checkpointTableName) > 0 && checkpointTableName[0] != "" { + cpTable = checkpointTableName[0] + } + return &CheckpointManager{ - dbConn: dbConn, - tableName: tableName, + dbConn: dbConn, + tableName: tableName, + checkpointTable: cpTable, } } // InitializeCheckpointTable creates the checkpoint table if it does not exist func (c *CheckpointManager) InitializeCheckpointTable() error { - query := ` - IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'cdc_offsets') + query := fmt.Sprintf(` + IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = '%s') BEGIN - CREATE TABLE cdc_offsets ( + CREATE TABLE %s ( table_name NVARCHAR(255) PRIMARY KEY, last_lsn VARBINARY(10), updated_at DATETIME DEFAULT GETDATE() ); - END` + END`, c.checkpointTable, c.checkpointTable) _, err := c.dbConn.Exec(query) if err != nil { - return fmt.Errorf("failed to create cdc_offsets table: %w", err) + return fmt.Errorf("failed to create %s table: %w", c.checkpointTable, err) } log.Println("Initialized checkpoints table.") @@ -46,7 +57,7 @@ func (c *CheckpointManager) InitializeCheckpointTable() error { // LoadLastLSN retrieves the last known LSN for the specified table func (c *CheckpointManager) LoadLastLSN(defaultStartLSN string) ([]byte, error) { var lastLSN []byte - query := "SELECT last_lsn FROM cdc_offsets WHERE table_name = @tableName" + query := fmt.Sprintf("SELECT last_lsn FROM %s WHERE table_name = @tableName", c.checkpointTable) err := c.dbConn.QueryRow(query, sql.Named("tableName", c.tableName)).Scan(&lastLSN) if err == sql.ErrNoRows { startLSNBytes, _ := hex.DecodeString(defaultStartLSN) @@ -61,15 +72,15 @@ func (c *CheckpointManager) LoadLastLSN(defaultStartLSN string) ([]byte, error) // SaveLastLSN updates the last known LSN for the specified table func (c *CheckpointManager) SaveLastLSN(newLSN []byte) error { - upsertQuery := ` - MERGE INTO cdc_offsets AS target + upsertQuery := fmt.Sprintf(` + MERGE INTO %s AS target USING (VALUES (@tableName, @lastLSN, GETDATE())) AS source (table_name, last_lsn, updated_at) ON target.table_name = source.table_name WHEN MATCHED THEN UPDATE SET last_lsn = source.last_lsn, updated_at = source.updated_at WHEN NOT MATCHED THEN INSERT (table_name, last_lsn, updated_at) - VALUES (source.table_name, source.last_lsn, source.updated_at);` + VALUES (source.table_name, source.last_lsn, source.updated_at);`, c.checkpointTable) _, err := c.dbConn.Exec(upsertQuery, sql.Named("tableName", c.tableName), sql.Named("lastLSN", newLSN)) if err != nil { diff --git a/cdc/publishers/servicebus_publisher.go b/cdc/publishers/servicebus_publisher.go index 954f0d4..ff48b2f 100644 --- a/cdc/publishers/servicebus_publisher.go +++ b/cdc/publishers/servicebus_publisher.go @@ -3,115 +3,107 @@ package publishers import ( "context" "encoding/json" + "fmt" "log" "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" ) -// ServiceBusPublisher is an asynchronous publisher for sending messages to Azure Service Bus type ServiceBusPublisher struct { - client *azservicebus.Client - tableName string - messages chan map[string]interface{} - batchSize int - batchTimeout time.Duration + client *azservicebus.Client + topicName string + batchSize int + batchQueue chan map[string]interface{} } -// NewServiceBusPublisher creates a new ServiceBusPublisher with the provided connection string and topic/queue name -func NewServiceBusPublisher(connectionString, tableName string) (*ServiceBusPublisher, error) { +// NewServiceBusPublisher creates a new ServiceBusPublisher with the provided connection string and topic name +func NewServiceBusPublisher(connectionString, topicName string) (*ServiceBusPublisher, error) { client, err := azservicebus.NewClientFromConnectionString(connectionString, nil) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create Service Bus client: %w", err) } - // Initialize the publisher with a buffered channel and batch settings publisher := &ServiceBusPublisher{ - client: client, - tableName: tableName, - messages: make(chan map[string]interface{}, 100), // Buffer size of 100; adjust as needed - batchSize: 10, // Max number of messages per batch - batchTimeout: 10 * time.Second, // Max wait time for batching + client: client, + topicName: topicName, + batchSize: 10, // Batch size for messages to send + batchQueue: make(chan map[string]interface{}, 100), // Buffered channel } - // Start the background goroutine to process messages go publisher.processMessages() return publisher, nil } -// PublishChange queues a message to be sent to Azure Service Bus -func (s *ServiceBusPublisher) PublishChange(data map[string]interface{}) { - select { - case s.messages <- data: - // Successfully queued message - log.Println("Queued message for asynchronous publishing.") - default: - // Channel is full; log or handle overflow - log.Println("Warning: message queue is full; dropping message") - } +// PublishChange sends the change data to the batchQueue for processing +func (s *ServiceBusPublisher) PublishChange(change map[string]interface{}) { + log.Println("Queuing message for Service Bus publishing...") + // Print message to console + prettyPrintJSON(change) + + // Send the message to the channel to be processed in batches + s.batchQueue <- change } -// processMessages runs as a background goroutine to send messages in batches +// processMessages reads from batchQueue and sends messages in batches func (s *ServiceBusPublisher) processMessages() { - batch := make([]*azservicebus.Message, 0, s.batchSize) - timer := time.NewTimer(s.batchTimeout) - defer timer.Stop() + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + var batch []map[string]interface{} for { select { - case data := <-s.messages: - // Convert data to JSON - jsonData, err := json.MarshalIndent(data, "", " ") - if err != nil { - log.Printf("Error formatting JSON data: %v", err) - continue - } - - // Log the message to console as it’s being added to the batch - log.Printf("Console Output - Message:\n%s", string(jsonData)) - - // Add message to batch - message := &azservicebus.Message{Body: jsonData} - batch = append(batch, message) - - // Send batch if it reaches the batch size limit + case change := <-s.batchQueue: + batch = append(batch, change) if len(batch) >= s.batchSize { s.sendBatch(batch) - batch = batch[:0] // Reset batch - timer.Reset(s.batchTimeout) + batch = nil } - - case <-timer.C: - // Send any remaining messages in the batch if timeout expires + case <-ticker.C: if len(batch) > 0 { s.sendBatch(batch) - batch = batch[:0] // Reset batch + batch = nil } - // Reset the timer - timer.Reset(s.batchTimeout) } } } -// sendBatch sends a batch of messages to the Service Bus -func (s *ServiceBusPublisher) sendBatch(batch []*azservicebus.Message) { - sender, err := s.client.NewSender(s.tableName, nil) +// sendBatch sends a batch of messages to the Service Bus topic +func (s *ServiceBusPublisher) sendBatch(batch []map[string]interface{}) { + sender, err := s.client.NewSender(s.topicName, nil) if err != nil { log.Printf("Failed to create Service Bus sender: %v", err) return } defer sender.Close(context.Background()) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + for _, change := range batch { + jsonData, err := json.Marshal(change) + if err != nil { + log.Printf("Error formatting JSON data: %v", err) + continue + } + + message := &azservicebus.Message{Body: jsonData} + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() - // Send each message in the batch - for _, message := range batch { if err := sender.SendMessage(ctx, message, nil); err != nil { log.Printf("Failed to send message to Service Bus: %v", err) - } else { - log.Println("Sent message to Service Bus.") + continue } + log.Printf("Sent message to Service Bus: %s", jsonData) + } +} + +// prettyPrintJSON prints JSON in an indented format to the console +func prettyPrintJSON(data map[string]interface{}) { + jsonData, err := json.MarshalIndent(data, "", " ") + if err != nil { + log.Printf("Error printing JSON: %v", err) + return } + fmt.Println("Message Data:\n", string(jsonData)) } diff --git a/cdc/sql_server_monitor.go b/cdc/sql_server_monitor.go index 7a21ae0..1bf6d26 100644 --- a/cdc/sql_server_monitor.go +++ b/cdc/sql_server_monitor.go @@ -155,18 +155,11 @@ func (monitor *SQLServerMonitor) fetchCDCChanges(lastLSN []byte) ([]map[string]i case 1: operationType = "Delete" default: - // Skip any unknown operation types - continue - } - - // Capture relevant data including the table name, operation ID, and operation type - data := map[string]interface{}{ - "TableName": monitor.tableName, - "LSN": hex.EncodeToString(lsn), - "OperationID": operation, // Original operation ID - "OperationType": operationType, // Descriptive operation type + continue // Skip any unknown operation types } + // Organize metadata and data separately in the output + data := make(map[string]interface{}) for i, colName := range monitor.columns { if colValue, ok := columnData[i+2].(*sql.NullString); ok && colValue.Valid { data[colName] = colValue.String @@ -175,7 +168,17 @@ func (monitor *SQLServerMonitor) fetchCDCChanges(lastLSN []byte) ([]map[string]i } } - changes = append(changes, data) + change := map[string]interface{}{ + "metadata": map[string]interface{}{ + "TableName": monitor.tableName, + "LSN": hex.EncodeToString(lsn), + "OperationID": operation, + "OperationType": operationType, + }, + "data": data, + } + + changes = append(changes, change) latestLSN = lsn }