Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions cdc/checkpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,46 @@ import (
"log"
)

// Default checkpoint table name
const defaultCheckpointTableName = "cdc_offsets"

// CheckpointManager manages checkpoint (LSN) persistence
type CheckpointManager struct {
dbConn *sql.DB
tableName string
dbConn *sql.DB
tableName string
checkpointTable string
}

// NewCheckpointManager initializes a new CheckpointManager
func NewCheckpointManager(dbConn *sql.DB, tableName string) *CheckpointManager {
func NewCheckpointManager(dbConn *sql.DB, tableName string, checkpointTableName ...string) *CheckpointManager {
// Use provided checkpoint table name if supplied; otherwise, use default
cpTable := defaultCheckpointTableName
if len(checkpointTableName) > 0 && checkpointTableName[0] != "" {
cpTable = checkpointTableName[0]
}

return &CheckpointManager{
dbConn: dbConn,
tableName: tableName,
dbConn: dbConn,
tableName: tableName,
checkpointTable: cpTable,
}
}

// InitializeCheckpointTable creates the checkpoint table if it does not exist
func (c *CheckpointManager) InitializeCheckpointTable() error {
query := `
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'cdc_offsets')
query := fmt.Sprintf(`
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = '%s')
BEGIN
CREATE TABLE cdc_offsets (
CREATE TABLE %s (
table_name NVARCHAR(255) PRIMARY KEY,
last_lsn VARBINARY(10),
updated_at DATETIME DEFAULT GETDATE()
);
END`
END`, c.checkpointTable, c.checkpointTable)

_, err := c.dbConn.Exec(query)
if err != nil {
return fmt.Errorf("failed to create cdc_offsets table: %w", err)
return fmt.Errorf("failed to create %s table: %w", c.checkpointTable, err)
}

log.Println("Initialized checkpoints table.")
Expand All @@ -46,7 +57,7 @@ func (c *CheckpointManager) InitializeCheckpointTable() error {
// LoadLastLSN retrieves the last known LSN for the specified table
func (c *CheckpointManager) LoadLastLSN(defaultStartLSN string) ([]byte, error) {
var lastLSN []byte
query := "SELECT last_lsn FROM cdc_offsets WHERE table_name = @tableName"
query := fmt.Sprintf("SELECT last_lsn FROM %s WHERE table_name = @tableName", c.checkpointTable)
err := c.dbConn.QueryRow(query, sql.Named("tableName", c.tableName)).Scan(&lastLSN)
if err == sql.ErrNoRows {
startLSNBytes, _ := hex.DecodeString(defaultStartLSN)
Expand All @@ -61,15 +72,15 @@ func (c *CheckpointManager) LoadLastLSN(defaultStartLSN string) ([]byte, error)

// SaveLastLSN updates the last known LSN for the specified table
func (c *CheckpointManager) SaveLastLSN(newLSN []byte) error {
upsertQuery := `
MERGE INTO cdc_offsets AS target
upsertQuery := fmt.Sprintf(`
MERGE INTO %s AS target
USING (VALUES (@tableName, @lastLSN, GETDATE())) AS source (table_name, last_lsn, updated_at)
ON target.table_name = source.table_name
WHEN MATCHED THEN
UPDATE SET last_lsn = source.last_lsn, updated_at = source.updated_at
WHEN NOT MATCHED THEN
INSERT (table_name, last_lsn, updated_at)
VALUES (source.table_name, source.last_lsn, source.updated_at);`
VALUES (source.table_name, source.last_lsn, source.updated_at);`, c.checkpointTable)

_, err := c.dbConn.Exec(upsertQuery, sql.Named("tableName", c.tableName), sql.Named("lastLSN", newLSN))
if err != nil {
Expand Down
118 changes: 55 additions & 63 deletions cdc/publishers/servicebus_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,115 +3,107 @@ package publishers
import (
"context"
"encoding/json"
"fmt"
"log"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

// ServiceBusPublisher is an asynchronous publisher for sending messages to Azure Service Bus
type ServiceBusPublisher struct {
client *azservicebus.Client
tableName string
messages chan map[string]interface{}
batchSize int
batchTimeout time.Duration
client *azservicebus.Client
topicName string
batchSize int
batchQueue chan map[string]interface{}
}

// NewServiceBusPublisher creates a new ServiceBusPublisher with the provided connection string and topic/queue name
func NewServiceBusPublisher(connectionString, tableName string) (*ServiceBusPublisher, error) {
// NewServiceBusPublisher creates a new ServiceBusPublisher with the provided connection string and topic name
func NewServiceBusPublisher(connectionString, topicName string) (*ServiceBusPublisher, error) {
client, err := azservicebus.NewClientFromConnectionString(connectionString, nil)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create Service Bus client: %w", err)
}

// Initialize the publisher with a buffered channel and batch settings
publisher := &ServiceBusPublisher{
client: client,
tableName: tableName,
messages: make(chan map[string]interface{}, 100), // Buffer size of 100; adjust as needed
batchSize: 10, // Max number of messages per batch
batchTimeout: 10 * time.Second, // Max wait time for batching
client: client,
topicName: topicName,
batchSize: 10, // Batch size for messages to send
batchQueue: make(chan map[string]interface{}, 100), // Buffered channel
}

// Start the background goroutine to process messages
go publisher.processMessages()

return publisher, nil
}

// PublishChange queues a message to be sent to Azure Service Bus
func (s *ServiceBusPublisher) PublishChange(data map[string]interface{}) {
select {
case s.messages <- data:
// Successfully queued message
log.Println("Queued message for asynchronous publishing.")
default:
// Channel is full; log or handle overflow
log.Println("Warning: message queue is full; dropping message")
}
// PublishChange sends the change data to the batchQueue for processing
func (s *ServiceBusPublisher) PublishChange(change map[string]interface{}) {
log.Println("Queuing message for Service Bus publishing...")
// Print message to console
prettyPrintJSON(change)

// Send the message to the channel to be processed in batches
s.batchQueue <- change
}

// processMessages runs as a background goroutine to send messages in batches
// processMessages reads from batchQueue and sends messages in batches
func (s *ServiceBusPublisher) processMessages() {
batch := make([]*azservicebus.Message, 0, s.batchSize)
timer := time.NewTimer(s.batchTimeout)
defer timer.Stop()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

var batch []map[string]interface{}

for {
select {
case data := <-s.messages:
// Convert data to JSON
jsonData, err := json.MarshalIndent(data, "", " ")
if err != nil {
log.Printf("Error formatting JSON data: %v", err)
continue
}

// Log the message to console as it’s being added to the batch
log.Printf("Console Output - Message:\n%s", string(jsonData))

// Add message to batch
message := &azservicebus.Message{Body: jsonData}
batch = append(batch, message)

// Send batch if it reaches the batch size limit
case change := <-s.batchQueue:
batch = append(batch, change)
if len(batch) >= s.batchSize {
s.sendBatch(batch)
batch = batch[:0] // Reset batch
timer.Reset(s.batchTimeout)
batch = nil
}

case <-timer.C:
// Send any remaining messages in the batch if timeout expires
case <-ticker.C:
if len(batch) > 0 {
s.sendBatch(batch)
batch = batch[:0] // Reset batch
batch = nil
}
// Reset the timer
timer.Reset(s.batchTimeout)
}
}
}

// sendBatch sends a batch of messages to the Service Bus
func (s *ServiceBusPublisher) sendBatch(batch []*azservicebus.Message) {
sender, err := s.client.NewSender(s.tableName, nil)
// sendBatch sends a batch of messages to the Service Bus topic
func (s *ServiceBusPublisher) sendBatch(batch []map[string]interface{}) {
sender, err := s.client.NewSender(s.topicName, nil)
if err != nil {
log.Printf("Failed to create Service Bus sender: %v", err)
return
}
defer sender.Close(context.Background())

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, change := range batch {
jsonData, err := json.Marshal(change)
if err != nil {
log.Printf("Error formatting JSON data: %v", err)
continue
}

message := &azservicebus.Message{Body: jsonData}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Send each message in the batch
for _, message := range batch {
if err := sender.SendMessage(ctx, message, nil); err != nil {
log.Printf("Failed to send message to Service Bus: %v", err)
} else {
log.Println("Sent message to Service Bus.")
continue
}
log.Printf("Sent message to Service Bus: %s", jsonData)
}
}

// prettyPrintJSON prints JSON in an indented format to the console
func prettyPrintJSON(data map[string]interface{}) {
jsonData, err := json.MarshalIndent(data, "", " ")
if err != nil {
log.Printf("Error printing JSON: %v", err)
return
}
fmt.Println("Message Data:\n", string(jsonData))
}
25 changes: 14 additions & 11 deletions cdc/sql_server_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,11 @@ func (monitor *SQLServerMonitor) fetchCDCChanges(lastLSN []byte) ([]map[string]i
case 1:
operationType = "Delete"
default:
// Skip any unknown operation types
continue
}

// Capture relevant data including the table name, operation ID, and operation type
data := map[string]interface{}{
"TableName": monitor.tableName,
"LSN": hex.EncodeToString(lsn),
"OperationID": operation, // Original operation ID
"OperationType": operationType, // Descriptive operation type
continue // Skip any unknown operation types
}

// Organize metadata and data separately in the output
data := make(map[string]interface{})
for i, colName := range monitor.columns {
if colValue, ok := columnData[i+2].(*sql.NullString); ok && colValue.Valid {
data[colName] = colValue.String
Expand All @@ -175,7 +168,17 @@ func (monitor *SQLServerMonitor) fetchCDCChanges(lastLSN []byte) ([]map[string]i
}
}

changes = append(changes, data)
change := map[string]interface{}{
"metadata": map[string]interface{}{
"TableName": monitor.tableName,
"LSN": hex.EncodeToString(lsn),
"OperationID": operation,
"OperationType": operationType,
},
"data": data,
}

changes = append(changes, change)
latestLSN = lsn
}

Expand Down