diff --git a/queues/queues.go b/azureservicebus/azureservicebus.go similarity index 88% rename from queues/queues.go rename to azureservicebus/azureservicebus.go index 383b5fc..d4147fa 100644 --- a/queues/queues.go +++ b/azureservicebus/azureservicebus.go @@ -1,4 +1,4 @@ -package queue +package azureservicebus import ( "fmt" @@ -6,7 +6,7 @@ import ( "strings" ) -func GetServiceBusQueueName(connectionString string, tableName string) string { +func GenTopicName(connectionString string, tableName string) string { dbName, _ := extractDatabaseName(connectionString) return fmt.Sprintf("%s-%s-events", dbName, strings.ToLower(tableName)) } diff --git a/cdc/publishers/change_publisher_factory.go b/cdc/publishers/change_publisher_factory.go index c90ba93..554c0c7 100644 --- a/cdc/publishers/change_publisher_factory.go +++ b/cdc/publishers/change_publisher_factory.go @@ -2,11 +2,12 @@ package publishers import ( "errors" + "fmt" "log" "strings" + "github.com/katasec/dstream/azureservicebus" "github.com/katasec/dstream/config" - queues "github.com/katasec/dstream/queues" ) // ChangePublisherFactory is responsible for creating ChangePublisher instances based on config. @@ -21,34 +22,6 @@ func NewChangePublisherFactory(config *config.Config) *ChangePublisherFactory { } } -// // Create returns a ChangePublisher based on the Output.Type in config. -// func (f *ChangePublisherFactory) Create() (ChangePublisher, error) { -// switch strings.ToLower(f.config.Output.Type) { -// case "eventhub": -// log.Println("*** Creating EventHub publisher...") -// if f.config.Output.ConnectionString == "" { -// return nil, errors.New("EventHub connection string is required") -// } -// return NewEventHubPublisher(f.config.Output.ConnectionString), nil - -// case "servicebus": -// log.Println("*** Creating ServiceBus publisher...") -// if f.config.Output.ConnectionString == "" { -// return nil, errors.New("ServiceBus connection string is required") -// } -// p, err := NewServiceBusPublisher(f.config.Output.ConnectionString, "dstream-instance-1") -// if err != nil { -// log.Fatal(err) -// } -// return p, err - -// default: -// // Default to console if no specific provider is specified -// log.Println("Creating Console publisher...") -// return NewConsolePublisher(), nil -// } -// } - // Create returns a ChangePublisher based on the Output.Type in config. func (f *ChangePublisherFactory) Create(tableName string) (ChangePublisher, error) { switch strings.ToLower(f.config.Output.Type) { @@ -64,11 +37,16 @@ func (f *ChangePublisherFactory) Create(tableName string) (ChangePublisher, erro if f.config.Output.ConnectionString == "" { return nil, errors.New("ServiceBus connection string is required") } - p, err := NewServiceBusPublisher(f.config.Output.ConnectionString, queues.GetServiceBusQueueName(f.config.DBConnectionString, tableName)) + // Generate the topic name based on the database and table name + topicName := azureservicebus.GenTopicName(f.config.DBConnectionString, tableName) + log.Printf("Using topic: %s\n", topicName) + + // Create a new ServiceBusPublisher for the topic + publisher, err := NewServiceBusPublisher(f.config.Output.ConnectionString, topicName) if err != nil { - log.Fatal(err) + return nil, fmt.Errorf("failed to create ServiceBus publisher for topic %s: %w", topicName, err) } - return p, err + return publisher, nil default: // Default to console if no specific provider is specified diff --git a/cdc/publishers/servicebus_publisher.go b/cdc/publishers/servicebus_publisher.go index 589188c..954f0d4 100644 --- a/cdc/publishers/servicebus_publisher.go +++ b/cdc/publishers/servicebus_publisher.go @@ -3,48 +3,99 @@ package publishers import ( "context" "encoding/json" - "fmt" "log" "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" ) -// ServiceBusPublisher implements ChangePublisher, sends messages to Azure Service Bus +// ServiceBusPublisher is an asynchronous publisher for sending messages to Azure Service Bus type ServiceBusPublisher struct { - client *azservicebus.Client - tableName string + client *azservicebus.Client + tableName string + messages chan map[string]interface{} + batchSize int + batchTimeout time.Duration } -// NewServiceBusPublisher creates a new ServiceBusPublisher with the provided connection string -func NewServiceBusPublisher(connectionString, queueOrTopicName string) (*ServiceBusPublisher, error) { +// NewServiceBusPublisher creates a new ServiceBusPublisher with the provided connection string and topic/queue name +func NewServiceBusPublisher(connectionString, tableName string) (*ServiceBusPublisher, error) { client, err := azservicebus.NewClientFromConnectionString(connectionString, nil) if err != nil { - return nil, fmt.Errorf("failed to create Service Bus client: %w", err) + return nil, err } - return &ServiceBusPublisher{ - client: client, - tableName: queueOrTopicName, - }, nil + // Initialize the publisher with a buffered channel and batch settings + publisher := &ServiceBusPublisher{ + client: client, + tableName: tableName, + messages: make(chan map[string]interface{}, 100), // Buffer size of 100; adjust as needed + batchSize: 10, // Max number of messages per batch + batchTimeout: 10 * time.Second, // Max wait time for batching + } + + // Start the background goroutine to process messages + go publisher.processMessages() + + return publisher, nil } -// PublishChange sends the provided data to the Azure Service Bus +// PublishChange queues a message to be sent to Azure Service Bus func (s *ServiceBusPublisher) PublishChange(data map[string]interface{}) { - log.Println("ServiceBusPublisher is publishing an event...") - // Convert data to JSON - jsonData, err := json.MarshalIndent(data, "", " ") - if err != nil { - log.Printf("Error formatting JSON data: %v", err) - return + select { + case s.messages <- data: + // Successfully queued message + log.Println("Queued message for asynchronous publishing.") + default: + // Channel is full; log or handle overflow + log.Println("Warning: message queue is full; dropping message") } +} + +// processMessages runs as a background goroutine to send messages in batches +func (s *ServiceBusPublisher) processMessages() { + batch := make([]*azservicebus.Message, 0, s.batchSize) + timer := time.NewTimer(s.batchTimeout) + defer timer.Stop() + + for { + select { + case data := <-s.messages: + // Convert data to JSON + jsonData, err := json.MarshalIndent(data, "", " ") + if err != nil { + log.Printf("Error formatting JSON data: %v", err) + continue + } - // Create a message with the JSON data - message := &azservicebus.Message{ - Body: jsonData, + // Log the message to console as it’s being added to the batch + log.Printf("Console Output - Message:\n%s", string(jsonData)) + + // Add message to batch + message := &azservicebus.Message{Body: jsonData} + batch = append(batch, message) + + // Send batch if it reaches the batch size limit + if len(batch) >= s.batchSize { + s.sendBatch(batch) + batch = batch[:0] // Reset batch + timer.Reset(s.batchTimeout) + } + + case <-timer.C: + // Send any remaining messages in the batch if timeout expires + if len(batch) > 0 { + s.sendBatch(batch) + batch = batch[:0] // Reset batch + } + // Reset the timer + timer.Reset(s.batchTimeout) + } } +} - // Send the message to the Service Bus queue or topic +// sendBatch sends a batch of messages to the Service Bus +func (s *ServiceBusPublisher) sendBatch(batch []*azservicebus.Message) { sender, err := s.client.NewSender(s.tableName, nil) if err != nil { log.Printf("Failed to create Service Bus sender: %v", err) @@ -55,11 +106,12 @@ func (s *ServiceBusPublisher) PublishChange(data map[string]interface{}) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - err = sender.SendMessage(ctx, message, nil) - if err != nil { - log.Printf("Failed to send message to Service Bus: %v", err) - return + // Send each message in the batch + for _, message := range batch { + if err := sender.SendMessage(ctx, message, nil); err != nil { + log.Printf("Failed to send message to Service Bus: %v", err) + } else { + log.Println("Sent message to Service Bus.") + } } - - log.Printf("Sent to Azure Service Bus:\n%s", string(jsonData)) } diff --git a/config/config.go b/config/config.go index 59817a3..73a1bed 100644 --- a/config/config.go +++ b/config/config.go @@ -15,7 +15,7 @@ import ( "github.com/hashicorp/hcl/v2" "github.com/hashicorp/hcl/v2/gohcl" "github.com/hashicorp/hcl/v2/hclsyntax" - queues "github.com/katasec/dstream/queues" + queues "github.com/katasec/dstream/azureservicebus" ) // TableConfig represents individual table configurations in the HCL file @@ -62,7 +62,7 @@ func (c *Config) CheckConfig() { } } -// serviceBusConfigCheck validates the Service Bus configuration and ensures queues exist for each table +// serviceBusConfigCheck validates the Service Bus configuration and ensures topics exist for each table func (c *Config) serviceBusConfigCheck() { if c.Output.ConnectionString == "" { log.Fatalf("Error, %s connection string is required.", c.Output.Type) @@ -74,42 +74,37 @@ func (c *Config) serviceBusConfigCheck() { log.Fatalf("Failed to create Service Bus client: %v", err) } - // Ensure each queue exists or create it if not + // Ensure each topic exists or create it if not for _, table := range c.Tables { - //queueName := GetServiceBusQueueName(table.Name) - queueName := queues.GetServiceBusQueueName(c.DBConnectionString, table.Name) - log.Printf("Ensuring queue exists: %s\n", queueName) + topicName := queues.GenTopicName(c.DBConnectionString, table.Name) + log.Printf("Ensuring topic exists: %s\n", topicName) - // Check and create queue if it doesn't exist - if err := createQueueIfNotExists(client, queueName); err != nil { - log.Fatalf("Error ensuring queue %s exists: %v", queueName, err) + // Check and create topic if it doesn't exist + if err := createTopicIfNotExists(client, topicName); err != nil { + log.Fatalf("Error ensuring topic %s exists: %v", topicName, err) } } } -// createQueueIfNotExists checks if a queue exists and creates it if it doesn’t -func createQueueIfNotExists(client *admin.Client, queueName string) error { - // Check if the queue exists - response, err := client.GetQueue(context.TODO(), queueName, nil) - if err == nil && response != nil { - log.Printf("Queue %s already exists.\n", queueName) - return nil // Queue already exists +// createTopicIfNotExists checks if a topic exists and creates it if it doesn’t +func createTopicIfNotExists(client *admin.Client, topicName string) error { + // Check if the topic exists + response0, err := client.GetTopic(context.TODO(), topicName, nil) + if err == nil && response0 != nil { + log.Printf("Topic %s already exists.\n", topicName) + return nil // Topic already exists } - // Check if the error is a "not found" error based on HTTP status code - log.Printf("Queue %s does not exist. Creating...\n", queueName) - response2, err := client.CreateQueue(context.TODO(), queueName, nil) + + // If topic does not exist, create it + log.Printf("Topic %s does not exist. Creating...\n", topicName) + response, err := client.CreateTopic(context.TODO(), topicName, nil) if err != nil { - log.Fatalf("failed to create queue %s: %s", queueName, err) + return fmt.Errorf("failed to create topic %s: %w", topicName, err) } - fmt.Printf("Queue %s created successfully. Status: %d\n", queueName, response2.Status) + fmt.Printf("Topic %s created successfully. Status: %d\n", topicName, response.Status) return nil } -func GetServiceBusQueueName(tableName string) string { - tableName = strings.ToLower(tableName) - return fmt.Sprintf("%s-events", tableName) -} - // LoadConfig reads, processes the HCL configuration file, and replaces placeholders with environment variables func LoadConfig(filePath string) (*Config, error) { var config Config diff --git a/dstream b/dstream new file mode 100755 index 0000000..65ee455 Binary files /dev/null and b/dstream differ