diff --git a/Makefile b/Makefile index 44b0d9d..039259c 100644 --- a/Makefile +++ b/Makefile @@ -47,3 +47,8 @@ rm-test-db: .PHONY: run-migration run-migration: go run cmd/psqlqueue/main.go migrate + +.PHONY: create-mocks +create-mocks: + @rm -rf mocks + mockery --all diff --git a/domain/message.go b/domain/message.go index e2249ff..9203d3d 100644 --- a/domain/message.go +++ b/domain/message.go @@ -67,3 +67,11 @@ type MessageRepository interface { Ack(ctx context.Context, id string) error Nack(ctx context.Context, id string, visibilityTimeoutSeconds uint) error } + +// MessageService is the service interface for the Message entity. +type MessageService interface { + Create(ctx context.Context, message *Message) error + List(ctx context.Context, queueID, label *string, limit uint) ([]*Message, error) + Ack(ctx context.Context, id string) error + Nack(ctx context.Context, id string, visibilityTimeoutSeconds uint) error +} diff --git a/domain/queue.go b/domain/queue.go index 55a8a97..07ce3bc 100644 --- a/domain/queue.go +++ b/domain/queue.go @@ -47,3 +47,15 @@ type QueueRepository interface { Purge(ctx context.Context, id string) error Cleanup(ctx context.Context, id string) error } + +// QueueService is the service interface for the Queue entity. +type QueueService interface { + Create(ctx context.Context, queue *Queue) error + Update(ctx context.Context, queue *Queue) error + Get(ctx context.Context, id string) (*Queue, error) + List(ctx context.Context, offset, limit int) ([]*Queue, error) + Delete(ctx context.Context, id string) error + Stats(ctx context.Context, id string) (*QueueStats, error) + Purge(ctx context.Context, id string) error + Cleanup(ctx context.Context, id string) error +} diff --git a/go.mod b/go.mod index 71963c1..df5d0b7 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect go.uber.org/atomic v1.7.0 // indirect golang.org/x/crypto v0.17.0 // indirect diff --git a/mocks/MessageRepository.go b/mocks/MessageRepository.go new file mode 100644 index 0000000..f2f6759 --- /dev/null +++ b/mocks/MessageRepository.go @@ -0,0 +1,143 @@ +// Code generated by mockery v2.39.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + domain "github.com/allisson/psqlqueue/domain" + mock "github.com/stretchr/testify/mock" +) + +// MessageRepository is an autogenerated mock type for the MessageRepository type +type MessageRepository struct { + mock.Mock +} + +// Ack provides a mock function with given fields: ctx, id +func (_m *MessageRepository) Ack(ctx context.Context, id string) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for Ack") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Create provides a mock function with given fields: ctx, message +func (_m *MessageRepository) Create(ctx context.Context, message *domain.Message) error { + ret := _m.Called(ctx, message) + + if len(ret) == 0 { + panic("no return value specified for Create") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *domain.Message) error); ok { + r0 = rf(ctx, message) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Get provides a mock function with given fields: ctx, id +func (_m *MessageRepository) Get(ctx context.Context, id string) (*domain.Message, error) { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for Get") + } + + var r0 *domain.Message + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*domain.Message, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *domain.Message); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*domain.Message) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// List provides a mock function with given fields: ctx, queue, label, limit +func (_m *MessageRepository) List(ctx context.Context, queue *domain.Queue, label *string, limit uint) ([]*domain.Message, error) { + ret := _m.Called(ctx, queue, label, limit) + + if len(ret) == 0 { + panic("no return value specified for List") + } + + var r0 []*domain.Message + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *domain.Queue, *string, uint) ([]*domain.Message, error)); ok { + return rf(ctx, queue, label, limit) + } + if rf, ok := ret.Get(0).(func(context.Context, *domain.Queue, *string, uint) []*domain.Message); ok { + r0 = rf(ctx, queue, label, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*domain.Message) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *domain.Queue, *string, uint) error); ok { + r1 = rf(ctx, queue, label, limit) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Nack provides a mock function with given fields: ctx, id, visibilityTimeoutSeconds +func (_m *MessageRepository) Nack(ctx context.Context, id string, visibilityTimeoutSeconds uint) error { + ret := _m.Called(ctx, id, visibilityTimeoutSeconds) + + if len(ret) == 0 { + panic("no return value specified for Nack") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, uint) error); ok { + r0 = rf(ctx, id, visibilityTimeoutSeconds) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewMessageRepository creates a new instance of MessageRepository. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMessageRepository(t interface { + mock.TestingT + Cleanup(func()) +}) *MessageRepository { + mock := &MessageRepository{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/MessageService.go b/mocks/MessageService.go new file mode 100644 index 0000000..be7cc78 --- /dev/null +++ b/mocks/MessageService.go @@ -0,0 +1,113 @@ +// Code generated by mockery v2.39.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + domain "github.com/allisson/psqlqueue/domain" + mock "github.com/stretchr/testify/mock" +) + +// MessageService is an autogenerated mock type for the MessageService type +type MessageService struct { + mock.Mock +} + +// Ack provides a mock function with given fields: ctx, id +func (_m *MessageService) Ack(ctx context.Context, id string) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for Ack") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Create provides a mock function with given fields: ctx, message +func (_m *MessageService) Create(ctx context.Context, message *domain.Message) error { + ret := _m.Called(ctx, message) + + if len(ret) == 0 { + panic("no return value specified for Create") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *domain.Message) error); ok { + r0 = rf(ctx, message) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// List provides a mock function with given fields: ctx, queueID, label, limit +func (_m *MessageService) List(ctx context.Context, queueID *string, label *string, limit uint) ([]*domain.Message, error) { + ret := _m.Called(ctx, queueID, label, limit) + + if len(ret) == 0 { + panic("no return value specified for List") + } + + var r0 []*domain.Message + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *string, *string, uint) ([]*domain.Message, error)); ok { + return rf(ctx, queueID, label, limit) + } + if rf, ok := ret.Get(0).(func(context.Context, *string, *string, uint) []*domain.Message); ok { + r0 = rf(ctx, queueID, label, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*domain.Message) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *string, *string, uint) error); ok { + r1 = rf(ctx, queueID, label, limit) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Nack provides a mock function with given fields: ctx, id, visibilityTimeoutSeconds +func (_m *MessageService) Nack(ctx context.Context, id string, visibilityTimeoutSeconds uint) error { + ret := _m.Called(ctx, id, visibilityTimeoutSeconds) + + if len(ret) == 0 { + panic("no return value specified for Nack") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, uint) error); ok { + r0 = rf(ctx, id, visibilityTimeoutSeconds) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewMessageService creates a new instance of MessageService. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMessageService(t interface { + mock.TestingT + Cleanup(func()) +}) *MessageService { + mock := &MessageService{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/QueueRepository.go b/mocks/QueueRepository.go new file mode 100644 index 0000000..906070f --- /dev/null +++ b/mocks/QueueRepository.go @@ -0,0 +1,209 @@ +// Code generated by mockery v2.39.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + domain "github.com/allisson/psqlqueue/domain" + mock "github.com/stretchr/testify/mock" +) + +// QueueRepository is an autogenerated mock type for the QueueRepository type +type QueueRepository struct { + mock.Mock +} + +// Cleanup provides a mock function with given fields: ctx, id +func (_m *QueueRepository) Cleanup(ctx context.Context, id string) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for Cleanup") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Create provides a mock function with given fields: ctx, queue +func (_m *QueueRepository) Create(ctx context.Context, queue *domain.Queue) error { + ret := _m.Called(ctx, queue) + + if len(ret) == 0 { + panic("no return value specified for Create") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *domain.Queue) error); ok { + r0 = rf(ctx, queue) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Delete provides a mock function with given fields: ctx, id +func (_m *QueueRepository) Delete(ctx context.Context, id string) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for Delete") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Get provides a mock function with given fields: ctx, id +func (_m *QueueRepository) Get(ctx context.Context, id string) (*domain.Queue, error) { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for Get") + } + + var r0 *domain.Queue + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*domain.Queue, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *domain.Queue); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*domain.Queue) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// List provides a mock function with given fields: ctx, offset, limit +func (_m *QueueRepository) List(ctx context.Context, offset int, limit int) ([]*domain.Queue, error) { + ret := _m.Called(ctx, offset, limit) + + if len(ret) == 0 { + panic("no return value specified for List") + } + + var r0 []*domain.Queue + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int, int) ([]*domain.Queue, error)); ok { + return rf(ctx, offset, limit) + } + if rf, ok := ret.Get(0).(func(context.Context, int, int) []*domain.Queue); ok { + r0 = rf(ctx, offset, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*domain.Queue) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int, int) error); ok { + r1 = rf(ctx, offset, limit) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Purge provides a mock function with given fields: ctx, id +func (_m *QueueRepository) Purge(ctx context.Context, id string) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for Purge") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Stats provides a mock function with given fields: ctx, id +func (_m *QueueRepository) Stats(ctx context.Context, id string) (*domain.QueueStats, error) { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for Stats") + } + + var r0 *domain.QueueStats + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*domain.QueueStats, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *domain.QueueStats); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*domain.QueueStats) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Update provides a mock function with given fields: ctx, queue +func (_m *QueueRepository) Update(ctx context.Context, queue *domain.Queue) error { + ret := _m.Called(ctx, queue) + + if len(ret) == 0 { + panic("no return value specified for Update") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *domain.Queue) error); ok { + r0 = rf(ctx, queue) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewQueueRepository creates a new instance of QueueRepository. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewQueueRepository(t interface { + mock.TestingT + Cleanup(func()) +}) *QueueRepository { + mock := &QueueRepository{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/QueueService.go b/mocks/QueueService.go new file mode 100644 index 0000000..f15096b --- /dev/null +++ b/mocks/QueueService.go @@ -0,0 +1,209 @@ +// Code generated by mockery v2.39.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + domain "github.com/allisson/psqlqueue/domain" + mock "github.com/stretchr/testify/mock" +) + +// QueueService is an autogenerated mock type for the QueueService type +type QueueService struct { + mock.Mock +} + +// Cleanup provides a mock function with given fields: ctx, id +func (_m *QueueService) Cleanup(ctx context.Context, id string) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for Cleanup") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Create provides a mock function with given fields: ctx, queue +func (_m *QueueService) Create(ctx context.Context, queue *domain.Queue) error { + ret := _m.Called(ctx, queue) + + if len(ret) == 0 { + panic("no return value specified for Create") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *domain.Queue) error); ok { + r0 = rf(ctx, queue) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Delete provides a mock function with given fields: ctx, id +func (_m *QueueService) Delete(ctx context.Context, id string) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for Delete") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Get provides a mock function with given fields: ctx, id +func (_m *QueueService) Get(ctx context.Context, id string) (*domain.Queue, error) { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for Get") + } + + var r0 *domain.Queue + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*domain.Queue, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *domain.Queue); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*domain.Queue) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// List provides a mock function with given fields: ctx, offset, limit +func (_m *QueueService) List(ctx context.Context, offset int, limit int) ([]*domain.Queue, error) { + ret := _m.Called(ctx, offset, limit) + + if len(ret) == 0 { + panic("no return value specified for List") + } + + var r0 []*domain.Queue + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int, int) ([]*domain.Queue, error)); ok { + return rf(ctx, offset, limit) + } + if rf, ok := ret.Get(0).(func(context.Context, int, int) []*domain.Queue); ok { + r0 = rf(ctx, offset, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*domain.Queue) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int, int) error); ok { + r1 = rf(ctx, offset, limit) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Purge provides a mock function with given fields: ctx, id +func (_m *QueueService) Purge(ctx context.Context, id string) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for Purge") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Stats provides a mock function with given fields: ctx, id +func (_m *QueueService) Stats(ctx context.Context, id string) (*domain.QueueStats, error) { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for Stats") + } + + var r0 *domain.QueueStats + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*domain.QueueStats, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *domain.QueueStats); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*domain.QueueStats) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Update provides a mock function with given fields: ctx, queue +func (_m *QueueService) Update(ctx context.Context, queue *domain.Queue) error { + ret := _m.Called(ctx, queue) + + if len(ret) == 0 { + panic("no return value specified for Update") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *domain.Queue) error); ok { + r0 = rf(ctx, queue) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewQueueService creates a new instance of QueueService. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewQueueService(t interface { + mock.TestingT + Cleanup(func()) +}) *QueueService { + mock := &QueueService{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/repository/message.go b/repository/message.go index b16552e..94e63bd 100644 --- a/repository/message.go +++ b/repository/message.go @@ -24,7 +24,7 @@ func (m *Message) Get(ctx context.Context, id string) (*domain.Message, error) { message := domain.Message{} options := pgxutil.NewFindOptions().WithFilter("id", id) err := pgxutil.Get(ctx, m.pool, m.tableName, options, &message) - return &message, err + return &message, parseError(err, domain.ErrMessageNotFound, domain.ErrMessageAlreadyExists) } func (m *Message) List(ctx context.Context, queue *domain.Queue, label *string, limit uint) ([]*domain.Message, error) { diff --git a/service/message.go b/service/message.go new file mode 100644 index 0000000..8470465 --- /dev/null +++ b/service/message.go @@ -0,0 +1,54 @@ +package service + +import ( + "context" + "time" + + "github.com/allisson/psqlqueue/domain" +) + +// Message is an implementation of domain.MessageService +type Message struct { + messageRepository domain.MessageRepository + queueRepository domain.QueueRepository +} + +func (m *Message) Create(ctx context.Context, message *domain.Message) error { + if err := message.Validate(); err != nil { + return err + } + + queue, err := m.queueRepository.Get(ctx, message.QueueID) + if err != nil { + return err + } + + message.Enqueue(queue, time.Now().UTC()) + + return m.messageRepository.Create(ctx, message) +} + +func (m *Message) List(ctx context.Context, queueID string, label *string, limit uint) ([]*domain.Message, error) { + queue, err := m.queueRepository.Get(ctx, queueID) + if err != nil { + return nil, err + } + + return m.messageRepository.List(ctx, queue, label, limit) +} + +func (m *Message) Ack(ctx context.Context, id string) error { + return m.messageRepository.Ack(ctx, id) +} + +func (m *Message) Nack(ctx context.Context, id string, visibilityTimeoutSeconds uint) error { + return m.messageRepository.Nack(ctx, id, visibilityTimeoutSeconds) +} + +// NewMessage returns an implementation of domain.MessageService. +func NewMessage(messageRepository domain.MessageRepository, queueRepository domain.QueueRepository) *Message { + return &Message{ + messageRepository: messageRepository, + queueRepository: queueRepository, + } +} diff --git a/service/message_test.go b/service/message_test.go new file mode 100644 index 0000000..e2f9524 --- /dev/null +++ b/service/message_test.go @@ -0,0 +1,82 @@ +package service + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/allisson/psqlqueue/domain" + "github.com/allisson/psqlqueue/mocks" +) + +func nilString() *string { + var s *string = nil + return s +} + +func TestMessage(t *testing.T) { + ctx := context.Background() + + t.Run("Create", func(t *testing.T) { + messageRepository := mocks.NewMessageRepository(t) + queueRepository := mocks.NewQueueRepository(t) + messageService := NewMessage(messageRepository, queueRepository) + queue := makeQueue("my-queue", nil) + message := domain.Message{Body: `{"data": true}`, QueueID: queue.ID} + + queueRepository.On("Get", ctx, queue.ID).Return(queue, nil) + messageRepository.On("Create", ctx, mock.Anything).Return(nil) + + err := messageService.Create(ctx, &message) + assert.Nil(t, err) + }) + + t.Run("List", func(t *testing.T) { + messageRepository := mocks.NewMessageRepository(t) + queueRepository := mocks.NewQueueRepository(t) + messageService := NewMessage(messageRepository, queueRepository) + queue := makeQueue("my-queue", nil) + message1 := domain.Message{Body: `{"data": true}`} + message1.Enqueue(queue, time.Now().UTC()) + message2 := domain.Message{Body: `{"data": true}`} + message2.Enqueue(queue, time.Now().UTC()) + + queueRepository.On("Get", ctx, queue.ID).Return(queue, nil) + messageRepository.On("List", ctx, queue, nilString(), uint(10)).Return([]*domain.Message{&message1, &message2}, nil) + + messages, err := messageService.List(ctx, queue.ID, nilString(), 10) + assert.Nil(t, err) + assert.Len(t, messages, 2) + }) + + t.Run("Ack", func(t *testing.T) { + messageRepository := mocks.NewMessageRepository(t) + queueRepository := mocks.NewQueueRepository(t) + messageService := NewMessage(messageRepository, queueRepository) + queue := makeQueue("my-queue", nil) + message := domain.Message{Body: `{"data": true}`} + message.Enqueue(queue, time.Now().UTC()) + + messageRepository.On("Ack", ctx, message.ID).Return(nil) + + err := messageService.Ack(ctx, message.ID) + assert.Nil(t, err) + }) + + t.Run("Nack", func(t *testing.T) { + messageRepository := mocks.NewMessageRepository(t) + queueRepository := mocks.NewQueueRepository(t) + messageService := NewMessage(messageRepository, queueRepository) + queue := makeQueue("my-queue", nil) + message := domain.Message{Body: `{"data": true}`} + message.Enqueue(queue, time.Now().UTC()) + + messageRepository.On("Nack", ctx, message.ID, uint(0)).Return(nil) + + err := messageService.Nack(ctx, message.ID, uint(0)) + assert.Nil(t, err) + }) +} diff --git a/service/queue.go b/service/queue.go new file mode 100644 index 0000000..f377715 --- /dev/null +++ b/service/queue.go @@ -0,0 +1,92 @@ +package service + +import ( + "context" + "time" + + "github.com/allisson/psqlqueue/domain" +) + +// Queue is an implementation of domain.QueueService. +type Queue struct { + queueRepository domain.QueueRepository +} + +func (q *Queue) Create(ctx context.Context, queue *domain.Queue) error { + if err := queue.Validate(); err != nil { + return err + } + + now := time.Now().UTC() + queue.CreatedAt = now + queue.UpdatedAt = now + + return q.queueRepository.Create(ctx, queue) +} + +func (q *Queue) Update(ctx context.Context, queue *domain.Queue) error { + if err := queue.Validate(); err != nil { + return err + } + + queueFromDB, err := q.queueRepository.Get(ctx, queue.ID) + if err != nil { + return err + } + + queue.CreatedAt = queueFromDB.CreatedAt + queue.UpdatedAt = time.Now().UTC() + + return q.queueRepository.Update(ctx, queue) +} + +func (q *Queue) Get(ctx context.Context, id string) (*domain.Queue, error) { + return q.queueRepository.Get(ctx, id) +} + +func (q *Queue) List(ctx context.Context, offset, limit int) ([]*domain.Queue, error) { + return q.queueRepository.List(ctx, offset, limit) +} + +func (q *Queue) Delete(ctx context.Context, id string) error { + queue, err := q.queueRepository.Get(ctx, id) + if err != nil { + return err + } + + return q.queueRepository.Delete(ctx, queue.ID) + +} + +func (q *Queue) Stats(ctx context.Context, id string) (*domain.QueueStats, error) { + queue, err := q.queueRepository.Get(ctx, id) + if err != nil { + return nil, err + } + + return q.queueRepository.Stats(ctx, queue.ID) + +} + +func (q *Queue) Purge(ctx context.Context, id string) error { + queue, err := q.queueRepository.Get(ctx, id) + if err != nil { + return err + } + + return q.queueRepository.Purge(ctx, queue.ID) +} + +func (q *Queue) Cleanup(ctx context.Context, id string) error { + queue, err := q.queueRepository.Get(ctx, id) + if err != nil { + return err + } + + return q.queueRepository.Cleanup(ctx, queue.ID) +} + +// NewQueue returns an implementation of domain.QueueService. +func NewQueue(queueRepository domain.QueueRepository) *Queue { + return &Queue{queueRepository: queueRepository} +} diff --git a/service/queue_test.go b/service/queue_test.go new file mode 100644 index 0000000..1c4fab7 --- /dev/null +++ b/service/queue_test.go @@ -0,0 +1,124 @@ +package service + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/allisson/psqlqueue/domain" + "github.com/allisson/psqlqueue/mocks" +) + +func makeQueue(queueID string, topicID *string) *domain.Queue { + return &domain.Queue{ + ID: queueID, + AckDeadlineSeconds: 60, + MessageRetentionSeconds: 3600, + DeliveryDelaySeconds: 0, + CreatedAt: time.Now().UTC(), + UpdatedAt: time.Now().UTC(), + } +} + +func TestQueue(t *testing.T) { + ctx := context.Background() + + t.Run("Create", func(t *testing.T) { + queueRepository := mocks.NewQueueRepository(t) + queueService := NewQueue(queueRepository) + queue := makeQueue("my-queue", nil) + + queueRepository.On("Create", ctx, queue).Return(nil) + + err := queueService.Create(ctx, queue) + assert.Nil(t, err) + }) + + t.Run("Create with invalid queue", func(t *testing.T) { + expectedErrorPayload := `{"id":"must be in a valid format"}` + queueRepository := mocks.NewQueueRepository(t) + queueService := NewQueue(queueRepository) + queue := makeQueue("my@queue", nil) + + err := queueService.Create(ctx, queue) + assert.NotNil(t, err) + errorPayload, err := json.Marshal(err) + assert.Nil(t, err) + assert.Equal(t, expectedErrorPayload, string(errorPayload)) + }) + + t.Run("Update", func(t *testing.T) { + queueRepository := mocks.NewQueueRepository(t) + queueService := NewQueue(queueRepository) + queue := makeQueue("my-queue", nil) + + queueRepository.On("Get", ctx, queue.ID).Return(queue, nil) + queueRepository.On("Update", ctx, queue).Return(nil) + + err := queueService.Update(ctx, queue) + assert.Nil(t, err) + }) + + t.Run("Get", func(t *testing.T) { + queueRepository := mocks.NewQueueRepository(t) + queueService := NewQueue(queueRepository) + queue := makeQueue("my-queue", nil) + + queueRepository.On("Get", ctx, queue.ID).Return(queue, nil) + + _, err := queueService.Get(ctx, queue.ID) + assert.Nil(t, err) + }) + + t.Run("List", func(t *testing.T) { + queueRepository := mocks.NewQueueRepository(t) + queueService := NewQueue(queueRepository) + queue1 := makeQueue("my-queue-1", nil) + queue2 := makeQueue("my-queue-2", nil) + + queueRepository.On("List", ctx, 0, 10).Return([]*domain.Queue{queue1, queue2}, nil) + + queues, err := queueService.List(ctx, 0, 10) + assert.Nil(t, err) + assert.Len(t, queues, 2) + }) + + t.Run("Delete", func(t *testing.T) { + queueRepository := mocks.NewQueueRepository(t) + queueService := NewQueue(queueRepository) + queue := makeQueue("my-queue", nil) + + queueRepository.On("Get", ctx, queue.ID).Return(queue, nil) + queueRepository.On("Delete", ctx, queue.ID).Return(nil) + + err := queueService.Delete(ctx, queue.ID) + assert.Nil(t, err) + }) + + t.Run("Stats", func(t *testing.T) { + queueRepository := mocks.NewQueueRepository(t) + queueService := NewQueue(queueRepository) + queue := makeQueue("my-queue", nil) + + queueRepository.On("Get", ctx, queue.ID).Return(queue, nil) + queueRepository.On("Stats", ctx, queue.ID).Return(&domain.QueueStats{}, nil) + + _, err := queueService.Stats(ctx, queue.ID) + assert.Nil(t, err) + }) + + t.Run("Purge", func(t *testing.T) { + queueRepository := mocks.NewQueueRepository(t) + queueService := NewQueue(queueRepository) + queue := makeQueue("my-queue", nil) + + queueRepository.On("Get", ctx, queue.ID).Return(queue, nil) + queueRepository.On("Purge", ctx, queue.ID).Return(nil) + + err := queueService.Purge(ctx, queue.ID) + assert.Nil(t, err) + }) +}