Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 181 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,182 @@
# psqlqueue
Simple queue system powered by Golang and PostgreSQL
[![test](https://github.com/allisson/psqlqueue/actions/workflows/test.yml/badge.svg)](https://github.com/allisson/psqlqueue/actions/workflows/test.yml)
[![Go Report Card](https://goreportcard.com/badge/github.com/allisson/psqlqueue)](https://goreportcard.com/report/github.com/allisson/psqlqueue)
[![Docker Repository on Quay](https://quay.io/repository/allisson/psqlqueue/status "Docker Repository on Quay")](https://quay.io/repository/allisson/psqlqueue)

Simple queue system powered by Golang and PostgreSQL.

## quickstart

The idea of this service is to offer a simple queuing system using PostgreSQL as a backend.

First, we need a PostgreSQL database, for this, we will use docker:

```bash
docker run --name postgres-psqlqueue \
--restart unless-stopped \
-e POSTGRES_USER=psqlqueue \
-e POSTGRES_PASSWORD=psqlqueue \
-e POSTGRES_DB=psqlqueue \
-p 5432:5432 \
-d postgres:15-alpine
```

Now let's run the database migrations before starting the server:

```bash
docker run --rm \
-e PSQLQUEUE_DATABASE_URL='postgres://psqlqueue:psqlqueue@host.docker.internal:5432/psqlqueue?sslmode=disable' \
quay.io/allisson/psqlqueue migrate
```

```json
{"time":"2023-12-29T21:11:39.516360369Z","level":"INFO","msg":"migration process started"}
{"time":"2023-12-29T21:11:39.54908151Z","level":"INFO","msg":"migration process finished"}
```

Starting the server:

```bash
docker run --rm \
-e PSQLQUEUE_DATABASE_URL='postgres://psqlqueue:psqlqueue@host.docker.internal:5432/psqlqueue?sslmode=disable' \
-p 8000:8000 \
quay.io/allisson/psqlqueue server
```

```json
{"time":"2023-12-29T21:14:30.898080659Z","level":"INFO","msg":"http server starting","host":"0.0.0.0","port":8000}
```

For creating a new queue we have these fields:
- "id": The identifier of this new queue.
- "ack_deadline_seconds": The maximum time before the consumer should acknowledge the message, after this time the message will be delivered again to consumers.
- "message_retention_seconds": The maximum time in which the message must be delivered to consumers, after this time the message will be marked as expired.
- "delivery_delay_seconds": The number of seconds to postpone the delivery of new messages to consumers.

```bash
curl --location 'http://localhost:8000/v1/queues' \
--header 'Content-Type: application/json' \
--data '{
"id": "my-new-queue",
"ack_deadline_seconds": 30,
"message_retention_seconds": 1209600,
"delivery_delay_seconds": 0
}'
```

```json
{
"id": "my-new-queue",
"ack_deadline_seconds": 30,
"message_retention_seconds": 1209600,
"delivery_delay_seconds": 0,
"created_at": "2023-12-29T21:30:58.682194763Z",
"updated_at": "2023-12-29T21:30:58.682194763Z"
}
```

For creating a new message we have these fields:
- "body": The body of the message.
- "label": A label that allows this message to be filtered.
- "attributes": The message attributes.

```bash
curl --location 'http://localhost:8000/v1/queues/my-new-queue/messages' \
--header 'Content-Type: application/json' \
--data '{
"body": "message body",
"label": "my-label",
"attributes": {"attribute1": "attribute1", "attribute2": "attribute2"}
}'
```

For consuming the messages we have these filters:
- "label": To filter by the message label.
- "limit": To limit the number of messages.

```bash
curl --location 'http://localhost:8000/v1/queues/my-new-queue/messages?limit=1'
```

```json
{
"data": [
{
"id": "01HJVRCQVAD9VBT10MCS74T0EN",
"queue_id": "my-new-queue",
"label": "my-label",
"body": "message body",
"attributes": {
"attribute1": "attribute1",
"attribute2": "attribute2"
},
"delivery_attempts": 1,
"created_at": "2023-12-29T21:41:25.994731Z"
}
],
"limit": 1
}
```

Now you have 30 seconds to execute the ack or nack for this message, first we can do the nack:

```bash
curl --location --request PUT 'http://localhost:8000/v1/queues/my-new-queue/messages/01HJVRCQVAD9VBT10MCS74T0EN/nack' \
--header 'Content-Type: application/json' \
--data '{
"visibility_timeout_seconds": 30
}'
```

Now we need to wait 30 seconds before consuming this message again, after this time:

```bash
curl --location 'http://localhost:8000/v1/queues/my-new-queue/messages?limit=1'
```

```json
{
"data": [
{
"id": "01HJVRCQVAD9VBT10MCS74T0EN",
"queue_id": "my-new-queue",
"label": "my-label",
"body": "message body",
"attributes": {
"attribute1": "attribute1",
"attribute2": "attribute2"
},
"delivery_attempts": 2,
"created_at": "2023-12-29T21:41:25.994731Z"
}
],
"limit": 1
}
```

Now it's time to ack the message:

```bash
curl --location --request PUT 'http://localhost:8000/v1/queues/my-new-queue/messages/01HJVRCQVAD9VBT10MCS74T0EN/ack'
```

Let's try to consume the messages again:

```bash
curl --location 'http://localhost:8000/v1/queues/my-new-queue/messages/?limit=1'
```

```json
{
"data": [],
"limit": 1
}
```

After the ack, the message remains in the database marked as expired, to remove expired messages we can use the cleanup endpoint:

```bash
curl --location --request PUT 'http://localhost:8000/v1/queues/my-new-queue/cleanup'
```

This is the basics of using this service, I recommend that you check the swagger documentation at http://localhost:8000/v1/swagger/index.html to see more options.
2 changes: 1 addition & 1 deletion docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ const docTemplate = `{
},
{
"type": "string",
"description": "Label",
"description": "Filter by label",
"name": "label",
"in": "path"
},
Expand Down
2 changes: 1 addition & 1 deletion docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@
},
{
"type": "string",
"description": "Label",
"description": "Filter by label",
"name": "label",
"in": "path"
},
Expand Down
2 changes: 1 addition & 1 deletion docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ paths:
name: queue_id
required: true
type: string
- description: Label
- description: Filter by label
in: path
name: label
type: string
Expand Down
12 changes: 5 additions & 7 deletions http/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ type messageResponse struct {

// nolint:unused
type messageListRequest struct {
Label *string `form:"label" validate:"optional"`
Offset uint `form:"offset" validate:"required"`
Limit uint `form:"limit" validate:"required"`
Label *string `form:"label" validate:"optional"`
Limit uint `form:"limit" validate:"required"`
} //@name MessageListRequest

// nolint:unused
Expand Down Expand Up @@ -92,7 +91,7 @@ func (m *MessageHandler) Create(c *gin.Context) {
// @Accept json
// @Produce json
// @Param queue_id path string true "Queue id"
// @Param label path string false "Label"
// @Param label path string false "Filter by label"
// @Param limit query int false "The limit indicates the maximum number of items to return"
// @Success 200 {object} messageListResponse
// @Failure 404 {object} errorResponse
Expand All @@ -101,13 +100,12 @@ func (m *MessageHandler) Create(c *gin.Context) {
func (m *MessageHandler) List(c *gin.Context) {
queueID := c.Param("queue_id")

request := messageListRequest{Offset: 0, Limit: 10}
request := messageListRequest{Limit: 10}
if err := c.ShouldBindQuery(&request); err != nil {
slog.Warn("message list request error", "error", err)
}

request.Limit = min(request.Limit, m.cfg.QueueMaxNumberOfMessages)
request.Offset = 0

messages, err := m.messageService.List(c.Request.Context(), queueID, request.Label, request.Limit)
if err != nil {
Expand All @@ -116,7 +114,7 @@ func (m *MessageHandler) List(c *gin.Context) {
return
}

response := listResponse{Data: messages, Offset: request.Offset, Limit: request.Limit}
response := listResponse{Data: messages, Offset: 0, Limit: request.Limit}

c.JSON(http.StatusOK, response)
}
Expand Down