Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions queues/queues.go → azureservicebus/azureservicebus.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package queue
package azureservicebus

import (
"fmt"
"net/url"
"strings"
)

func GetServiceBusQueueName(connectionString string, tableName string) string {
func GenTopicName(connectionString string, tableName string) string {
dbName, _ := extractDatabaseName(connectionString)
return fmt.Sprintf("%s-%s-events", dbName, strings.ToLower(tableName))
}
Expand Down
42 changes: 10 additions & 32 deletions cdc/publishers/change_publisher_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package publishers

import (
"errors"
"fmt"
"log"
"strings"

"github.com/katasec/dstream/azureservicebus"
"github.com/katasec/dstream/config"
queues "github.com/katasec/dstream/queues"
)

// ChangePublisherFactory is responsible for creating ChangePublisher instances based on config.
Expand All @@ -21,34 +22,6 @@ func NewChangePublisherFactory(config *config.Config) *ChangePublisherFactory {
}
}

// // Create returns a ChangePublisher based on the Output.Type in config.
// func (f *ChangePublisherFactory) Create() (ChangePublisher, error) {
// switch strings.ToLower(f.config.Output.Type) {
// case "eventhub":
// log.Println("*** Creating EventHub publisher...")
// if f.config.Output.ConnectionString == "" {
// return nil, errors.New("EventHub connection string is required")
// }
// return NewEventHubPublisher(f.config.Output.ConnectionString), nil

// case "servicebus":
// log.Println("*** Creating ServiceBus publisher...")
// if f.config.Output.ConnectionString == "" {
// return nil, errors.New("ServiceBus connection string is required")
// }
// p, err := NewServiceBusPublisher(f.config.Output.ConnectionString, "dstream-instance-1")
// if err != nil {
// log.Fatal(err)
// }
// return p, err

// default:
// // Default to console if no specific provider is specified
// log.Println("Creating Console publisher...")
// return NewConsolePublisher(), nil
// }
// }

// Create returns a ChangePublisher based on the Output.Type in config.
func (f *ChangePublisherFactory) Create(tableName string) (ChangePublisher, error) {
switch strings.ToLower(f.config.Output.Type) {
Expand All @@ -64,11 +37,16 @@ func (f *ChangePublisherFactory) Create(tableName string) (ChangePublisher, erro
if f.config.Output.ConnectionString == "" {
return nil, errors.New("ServiceBus connection string is required")
}
p, err := NewServiceBusPublisher(f.config.Output.ConnectionString, queues.GetServiceBusQueueName(f.config.DBConnectionString, tableName))
// Generate the topic name based on the database and table name
topicName := azureservicebus.GenTopicName(f.config.DBConnectionString, tableName)
log.Printf("Using topic: %s\n", topicName)

// Create a new ServiceBusPublisher for the topic
publisher, err := NewServiceBusPublisher(f.config.Output.ConnectionString, topicName)
if err != nil {
log.Fatal(err)
return nil, fmt.Errorf("failed to create ServiceBus publisher for topic %s: %w", topicName, err)
}
return p, err
return publisher, nil

default:
// Default to console if no specific provider is specified
Expand Down
108 changes: 80 additions & 28 deletions cdc/publishers/servicebus_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,99 @@ package publishers
import (
"context"
"encoding/json"
"fmt"
"log"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

// ServiceBusPublisher implements ChangePublisher, sends messages to Azure Service Bus
// ServiceBusPublisher is an asynchronous publisher for sending messages to Azure Service Bus
type ServiceBusPublisher struct {
client *azservicebus.Client
tableName string
client *azservicebus.Client
tableName string
messages chan map[string]interface{}
batchSize int
batchTimeout time.Duration
}

// NewServiceBusPublisher creates a new ServiceBusPublisher with the provided connection string
func NewServiceBusPublisher(connectionString, queueOrTopicName string) (*ServiceBusPublisher, error) {
// NewServiceBusPublisher creates a new ServiceBusPublisher with the provided connection string and topic/queue name
func NewServiceBusPublisher(connectionString, tableName string) (*ServiceBusPublisher, error) {
client, err := azservicebus.NewClientFromConnectionString(connectionString, nil)
if err != nil {
return nil, fmt.Errorf("failed to create Service Bus client: %w", err)
return nil, err
}

return &ServiceBusPublisher{
client: client,
tableName: queueOrTopicName,
}, nil
// Initialize the publisher with a buffered channel and batch settings
publisher := &ServiceBusPublisher{
client: client,
tableName: tableName,
messages: make(chan map[string]interface{}, 100), // Buffer size of 100; adjust as needed
batchSize: 10, // Max number of messages per batch
batchTimeout: 10 * time.Second, // Max wait time for batching
}

// Start the background goroutine to process messages
go publisher.processMessages()

return publisher, nil
}

// PublishChange sends the provided data to the Azure Service Bus
// PublishChange queues a message to be sent to Azure Service Bus
func (s *ServiceBusPublisher) PublishChange(data map[string]interface{}) {
log.Println("ServiceBusPublisher is publishing an event...")
// Convert data to JSON
jsonData, err := json.MarshalIndent(data, "", " ")
if err != nil {
log.Printf("Error formatting JSON data: %v", err)
return
select {
case s.messages <- data:
// Successfully queued message
log.Println("Queued message for asynchronous publishing.")
default:
// Channel is full; log or handle overflow
log.Println("Warning: message queue is full; dropping message")
}
}

// processMessages runs as a background goroutine to send messages in batches
func (s *ServiceBusPublisher) processMessages() {
batch := make([]*azservicebus.Message, 0, s.batchSize)
timer := time.NewTimer(s.batchTimeout)
defer timer.Stop()

for {
select {
case data := <-s.messages:
// Convert data to JSON
jsonData, err := json.MarshalIndent(data, "", " ")
if err != nil {
log.Printf("Error formatting JSON data: %v", err)
continue
}

// Create a message with the JSON data
message := &azservicebus.Message{
Body: jsonData,
// Log the message to console as it’s being added to the batch
log.Printf("Console Output - Message:\n%s", string(jsonData))

// Add message to batch
message := &azservicebus.Message{Body: jsonData}
batch = append(batch, message)

// Send batch if it reaches the batch size limit
if len(batch) >= s.batchSize {
s.sendBatch(batch)
batch = batch[:0] // Reset batch
timer.Reset(s.batchTimeout)
}

case <-timer.C:
// Send any remaining messages in the batch if timeout expires
if len(batch) > 0 {
s.sendBatch(batch)
batch = batch[:0] // Reset batch
}
// Reset the timer
timer.Reset(s.batchTimeout)
}
}
}

// Send the message to the Service Bus queue or topic
// sendBatch sends a batch of messages to the Service Bus
func (s *ServiceBusPublisher) sendBatch(batch []*azservicebus.Message) {
sender, err := s.client.NewSender(s.tableName, nil)
if err != nil {
log.Printf("Failed to create Service Bus sender: %v", err)
Expand All @@ -55,11 +106,12 @@ func (s *ServiceBusPublisher) PublishChange(data map[string]interface{}) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

err = sender.SendMessage(ctx, message, nil)
if err != nil {
log.Printf("Failed to send message to Service Bus: %v", err)
return
// Send each message in the batch
for _, message := range batch {
if err := sender.SendMessage(ctx, message, nil); err != nil {
log.Printf("Failed to send message to Service Bus: %v", err)
} else {
log.Println("Sent message to Service Bus.")
}
}

log.Printf("Sent to Azure Service Bus:\n%s", string(jsonData))
}
47 changes: 21 additions & 26 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/hashicorp/hcl/v2"
"github.com/hashicorp/hcl/v2/gohcl"
"github.com/hashicorp/hcl/v2/hclsyntax"
queues "github.com/katasec/dstream/queues"
queues "github.com/katasec/dstream/azureservicebus"
)

// TableConfig represents individual table configurations in the HCL file
Expand Down Expand Up @@ -62,7 +62,7 @@ func (c *Config) CheckConfig() {
}
}

// serviceBusConfigCheck validates the Service Bus configuration and ensures queues exist for each table
// serviceBusConfigCheck validates the Service Bus configuration and ensures topics exist for each table
func (c *Config) serviceBusConfigCheck() {
if c.Output.ConnectionString == "" {
log.Fatalf("Error, %s connection string is required.", c.Output.Type)
Expand All @@ -74,42 +74,37 @@ func (c *Config) serviceBusConfigCheck() {
log.Fatalf("Failed to create Service Bus client: %v", err)
}

// Ensure each queue exists or create it if not
// Ensure each topic exists or create it if not
for _, table := range c.Tables {
//queueName := GetServiceBusQueueName(table.Name)
queueName := queues.GetServiceBusQueueName(c.DBConnectionString, table.Name)
log.Printf("Ensuring queue exists: %s\n", queueName)
topicName := queues.GenTopicName(c.DBConnectionString, table.Name)
log.Printf("Ensuring topic exists: %s\n", topicName)

// Check and create queue if it doesn't exist
if err := createQueueIfNotExists(client, queueName); err != nil {
log.Fatalf("Error ensuring queue %s exists: %v", queueName, err)
// Check and create topic if it doesn't exist
if err := createTopicIfNotExists(client, topicName); err != nil {
log.Fatalf("Error ensuring topic %s exists: %v", topicName, err)
}
}
}

// createQueueIfNotExists checks if a queue exists and creates it if it doesn’t
func createQueueIfNotExists(client *admin.Client, queueName string) error {
// Check if the queue exists
response, err := client.GetQueue(context.TODO(), queueName, nil)
if err == nil && response != nil {
log.Printf("Queue %s already exists.\n", queueName)
return nil // Queue already exists
// createTopicIfNotExists checks if a topic exists and creates it if it doesn’t
func createTopicIfNotExists(client *admin.Client, topicName string) error {
// Check if the topic exists
response0, err := client.GetTopic(context.TODO(), topicName, nil)
if err == nil && response0 != nil {
log.Printf("Topic %s already exists.\n", topicName)
return nil // Topic already exists
}
// Check if the error is a "not found" error based on HTTP status code
log.Printf("Queue %s does not exist. Creating...\n", queueName)
response2, err := client.CreateQueue(context.TODO(), queueName, nil)

// If topic does not exist, create it
log.Printf("Topic %s does not exist. Creating...\n", topicName)
response, err := client.CreateTopic(context.TODO(), topicName, nil)
if err != nil {
log.Fatalf("failed to create queue %s: %s", queueName, err)
return fmt.Errorf("failed to create topic %s: %w", topicName, err)
}
fmt.Printf("Queue %s created successfully. Status: %d\n", queueName, response2.Status)
fmt.Printf("Topic %s created successfully. Status: %d\n", topicName, response.Status)
return nil
}

func GetServiceBusQueueName(tableName string) string {
tableName = strings.ToLower(tableName)
return fmt.Sprintf("%s-events", tableName)
}

// LoadConfig reads, processes the HCL configuration file, and replaces placeholders with environment variables
func LoadConfig(filePath string) (*Config, error) {
var config Config
Expand Down
Binary file added dstream
Binary file not shown.