Skip to content

Commit 53c0fb5

Browse files
committed
lint fixes for amqp transport
1 parent 08fae8f commit 53c0fb5

7 files changed

Lines changed: 81 additions & 95 deletions

File tree

transport/amqp/doc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
// Package AMQP provides AMQP transport
1+
// Package amqp implements a go-kit transport for AMQP
22
package amqp

transport/amqp/encode-decode.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type EncodeRequestFunc func(context.Context, *amqp.Publishing, interface{}) erro
1616
// EncodeResponseFunc encodes the passed reponse object to
1717
// an AMQP channel for publishing. It is designed to be used in AMQP Subscribers
1818
type EncodeResponseFunc func(context.Context,
19-
*amqp.Delivery, AMQPChannel, *amqp.Publishing, interface{}) error
19+
*amqp.Delivery, Channel, *amqp.Publishing, interface{}) error
2020

2121
// DecodeResponseFunc extracts a user-domain response object from
2222
// an AMQP Delivery object. It is designed to be used in AMQP Publishers

transport/amqp/publisher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
// Publisher wraps AMQP channel and queue and provides a method that
1212
// implements endpoint.Endpoint
1313
type Publisher struct {
14-
ch AMQPChannel
14+
ch Channel
1515
q *amqp.Queue
1616
enc EncodeRequestFunc
1717
dec DecodeResponseFunc
@@ -22,7 +22,7 @@ type Publisher struct {
2222

2323
// NewPublisher constructs a usable Publisher for a single remote method.
2424
func NewPublisher(
25-
ch AMQPChannel,
25+
ch Channel,
2626
q *amqp.Queue,
2727
enc EncodeRequestFunc,
2828
dec DecodeResponseFunc,

transport/amqp/publisher_test.go

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ import (
1111
"github.com/streadway/amqp"
1212
)
1313

14-
import "log"
15-
1614
// TestBadEncode tests if encode errors are handled properly
1715
func TestBadEncode(t *testing.T) {
1816
ch := &mockChannel{f: nullFunc}
@@ -23,15 +21,15 @@ func TestBadEncode(t *testing.T) {
2321
func(context.Context, *amqp.Publishing, interface{}) error { return errors.New("err!") },
2422
func(context.Context, *amqp.Delivery) (response interface{}, err error) { return struct{}{}, nil },
2523
)
26-
err_chan := make(chan error, 1)
24+
errChan := make(chan error, 1)
2725
var err error
2826
go func() {
2927
_, err := pub.Endpoint()(context.Background(), struct{}{})
30-
err_chan <- err
28+
errChan <- err
3129

3230
}()
3331
select {
34-
case err = <-err_chan:
32+
case err = <-errChan:
3533
break
3634
case <-time.After(100 * time.Millisecond):
3735
t.Fatal("Timed out waiting for result")
@@ -66,20 +64,20 @@ func TestBadDecode(t *testing.T) {
6664
return struct{}{}, errors.New("err!")
6765
},
6866
amqptransport.PublisherBefore(
69-
amqptransport.SetCorrelationId(cid),
67+
amqptransport.SetCorrelationID(cid),
7068
),
7169
)
7270

7371
var err error
74-
err_chan := make(chan error, 1)
72+
errChan := make(chan error, 1)
7573
go func() {
7674
_, err := pub.Endpoint()(context.Background(), struct{}{})
77-
err_chan <- err
75+
errChan <- err
7876

7977
}()
8078

8179
select {
82-
case err = <-err_chan:
80+
case err = <-errChan:
8381
break
8482
case <-time.After(100 * time.Millisecond):
8583
t.Fatal("Timed out waiting for result")
@@ -113,15 +111,15 @@ func TestPublisherTimeout(t *testing.T) {
113111
)
114112

115113
var err error
116-
err_chan := make(chan error, 1)
114+
errChan := make(chan error, 1)
117115
go func() {
118116
_, err := pub.Endpoint()(context.Background(), struct{}{})
119-
err_chan <- err
117+
errChan <- err
120118

121119
}()
122120

123121
select {
124-
case err = <-err_chan:
122+
case err = <-errChan:
125123
break
126124
case <-time.After(100 * time.Millisecond):
127125
t.Fatal("timed out waiting for result")
@@ -165,31 +163,31 @@ func TestSuccessfulPublisher(t *testing.T) {
165163
testReqEncoder,
166164
testResDeliveryDecoder,
167165
amqptransport.PublisherBefore(
168-
amqptransport.SetCorrelationId(cid),
166+
amqptransport.SetCorrelationID(cid),
169167
),
170168
)
171169

172170
var res testRes
173171
var ok bool
174-
res_chan := make(chan interface{}, 1)
175-
err_chan := make(chan error, 1)
172+
resChan := make(chan interface{}, 1)
173+
errChan := make(chan error, 1)
176174
go func() {
177175
res, err := pub.Endpoint()(context.Background(), mockReq)
178176
if err != nil {
179-
err_chan <- err
177+
errChan <- err
180178
} else {
181-
res_chan <- res
179+
resChan <- res
182180
}
183181
}()
184182

185183
select {
186-
case response := <-res_chan:
184+
case response := <-resChan:
187185
res, ok = response.(testRes)
188186
if !ok {
189187
t.Error("failed to assert endpoint response type")
190188
}
191189
break
192-
case err = <-err_chan:
190+
case err = <-errChan:
193191
break
194192
case <-time.After(100 * time.Millisecond):
195193
t.Fatal("timed out waiting for result")

transport/amqp/request_response_func.go

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type RequestFunc func(context.Context, *amqp.Publishing) context.Context
1717
// subscribers, after invoking the endpoint but prior to publishing a reply.
1818
type SubscriberResponseFunc func(context.Context,
1919
*amqp.Delivery,
20-
AMQPChannel,
20+
Channel,
2121
*amqp.Publishing,
2222
) context.Context
2323

@@ -44,7 +44,7 @@ func SetPublishKey(publishKey string) RequestFunc {
4444
}
4545
}
4646

47-
// SetPersistentReply sets the delivery mode of a Publishing
47+
// SetPublishDeliveryMode sets the delivery mode of a Publishing
4848
// please refer to AMQP delivery mode constants in the AMQP package
4949
func SetPublishDeliveryMode(dmode uint8) RequestFunc {
5050
return func(ctx context.Context, pub *amqp.Publishing,
@@ -98,9 +98,9 @@ func SetContentType(contentType string) RequestFunc {
9898
}
9999
}
100100

101-
// SetCorrelationId returns a RequestFunc that sets the CorrelationId field
101+
// SetCorrelationID returns a RequestFunc that sets the CorrelationId field
102102
// of an AMQP Publishing
103-
func SetCorrelationId(cid string) RequestFunc {
103+
func SetCorrelationID(cid string) RequestFunc {
104104
return func(ctx context.Context, pub *amqp.Publishing,
105105
) context.Context {
106106
pub.CorrelationId = cid
@@ -115,7 +115,7 @@ func SetCorrelationId(cid string) RequestFunc {
115115
func SetAckAfterEndpoint(multiple bool) SubscriberResponseFunc {
116116
return func(ctx context.Context,
117117
deliv *amqp.Delivery,
118-
ch AMQPChannel,
118+
ch Channel,
119119
pub *amqp.Publishing,
120120
) context.Context {
121121
deliv.Ack(multiple)
@@ -126,62 +126,56 @@ func SetAckAfterEndpoint(multiple bool) SubscriberResponseFunc {
126126
func getPublishExchange(ctx context.Context) string {
127127
if exchange := ctx.Value(ContextKeyExchange); exchange != nil {
128128
return exchange.(string)
129-
} else {
130-
return ""
131129
}
130+
return ""
132131
}
133132

134133
func getPublishKey(ctx context.Context) string {
135134
if publishKey := ctx.Value(ContextKeyPublishKey); publishKey != nil {
136135
return publishKey.(string)
137-
} else {
138-
return ""
139136
}
137+
return ""
140138
}
141139

142140
func getNackSleepDuration(ctx context.Context) time.Duration {
143141
if duration := ctx.Value(ContextKeyNackSleepDuration); duration != nil {
144142
return duration.(time.Duration)
145-
} else {
146-
return 0
147143
}
144+
return 0
148145
}
149146

150147
func getConsumeAutoAck(ctx context.Context) bool {
151148
if autoAck := ctx.Value(ContextKeyAutoAck); autoAck != nil {
152149
return autoAck.(bool)
153-
} else {
154-
return false
155150
}
151+
return false
156152
}
157153

158154
func getConsumeArgs(ctx context.Context) amqp.Table {
159155
if args := ctx.Value(ContextKeyConsumeArgs); args != nil {
160156
return args.(amqp.Table)
161-
} else {
162-
return nil
163157
}
158+
return nil
164159
}
165160

166161
type contextKey int
167162

168163
const (
169-
// ContextKeyExchange
170-
// It is the value of the reply Exchange in amqp.Publish
164+
// ContextKeyExchange is the value of the reply Exchange in
165+
// amqp.Publish
171166
ContextKeyExchange contextKey = iota
172-
// ContextKeyPublishKey
173-
// It is the value of the ReplyTo field in amqp.Publish
167+
// ContextKeyPublishKey is the value of the ReplyTo field in
168+
// amqp.Publish
174169
ContextKeyPublishKey
175-
// ContextKeyNackSleepDuration
176-
// It is the duration to sleep for if the service
177-
// Nack and requeues a message.
170+
// ContextKeyNackSleepDuration is the duration to sleep for if the
171+
// service Nack and requeues a message.
178172
// This is to prevent sporadic send-resending of message
179173
// when a message is constantly Nack'd and requeued
180174
ContextKeyNackSleepDuration
181-
// ContextKeyAutoAck
182-
// It is the value of autoAck field when calling amqp.Channel.Consume
175+
// ContextKeyAutoAck is the value of autoAck field when calling
176+
// amqp.Channel.Consume
183177
ContextKeyAutoAck
184-
// ContextKeyConsumeArgs
185-
// It is the value of consumeArgs field when calling amqp.Channel.Consume
178+
// ContextKeyConsumeArgs is the value of consumeArgs field when calling
179+
// amqp.Channel.Consume
186180
ContextKeyConsumeArgs
187181
)

transport/amqp/subscriber.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ func SubscriberErrorLogger(logger log.Logger) SubscriberOption {
7575

7676
// ServeDelivery handles AMQP Delivery messages
7777
// It is strongly recommended to use *amqp.Channel as the
78-
// AMQPChannel interface implementation
79-
func (s Subscriber) ServeDelivery(ch AMQPChannel) func(deliv *amqp.Delivery) {
78+
// Channel interface implementation
79+
func (s Subscriber) ServeDelivery(ch Channel) func(deliv *amqp.Delivery) {
8080
return func(deliv *amqp.Delivery) {
8181
ctx, cancel := context.WithCancel(context.Background())
8282
defer cancel()
@@ -122,7 +122,7 @@ func (s Subscriber) ServeDelivery(ch AMQPChannel) func(deliv *amqp.Delivery) {
122122
func EncodeJSONResponse(
123123
ctx context.Context,
124124
deliv *amqp.Delivery,
125-
ch AMQPChannel,
125+
ch Channel,
126126
pub *amqp.Publishing,
127127
response interface{},
128128
) error {
@@ -155,7 +155,7 @@ func EncodeJSONResponse(
155155
func EncodeNopResponse(
156156
ctx context.Context,
157157
deliv *amqp.Delivery,
158-
ch AMQPChannel,
158+
ch Channel,
159159
pub *amqp.Publishing,
160160
response interface{},
161161
) error {
@@ -167,32 +167,30 @@ func EncodeNopResponse(
167167
// their replies, and will likely want to pass and check for their own error
168168
// types.
169169
type ErrorEncoder func(ctx context.Context,
170-
err error, deliv *amqp.Delivery, ch AMQPChannel, pub *amqp.Publishing)
170+
err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing)
171171

172172
// DefaultErrorEncoder simply ignores the message. It does not reply
173173
// nor Ack/Nack the message.
174174
func DefaultErrorEncoder(ctx context.Context,
175-
err error, deliv *amqp.Delivery, ch AMQPChannel, pub *amqp.Publishing) {
176-
return
175+
err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing) {
177176
}
178177

179-
// NackErrorEncoder issues a Nack to the delivery with multiple flag set as false
178+
// SingleNackRequeueErrorEncoder issues a Nack to the delivery with multiple flag set as false
180179
// and requeue flag set as true. It does not reply the message.
181180
func SingleNackRequeueErrorEncoder(ctx context.Context,
182-
err error, deliv *amqp.Delivery, ch AMQPChannel, pub *amqp.Publishing) {
181+
err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing) {
183182
deliv.Nack(
184183
false, //multiple
185184
true, //requeue
186185
)
187186
duration := getNackSleepDuration(ctx)
188187
time.Sleep(duration)
189-
return
190188
}
191189

192190
// ReplyErrorEncoder serializes the error message as a DefaultErrorResponse
193191
// JSON and sends the message to the ReplyTo address
194192
func ReplyErrorEncoder(ctx context.Context,
195-
err error, deliv *amqp.Delivery, ch AMQPChannel, pub *amqp.Publishing) {
193+
err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing) {
196194

197195
if pub.CorrelationId == "" {
198196
pub.CorrelationId = deliv.CorrelationId
@@ -221,21 +219,23 @@ func ReplyErrorEncoder(ctx context.Context,
221219
)
222220
}
223221

224-
// ReplyErrorEncoder serializes the error message as a DefaultErrorResponse
222+
// ReplyAndAckErrorEncoder serializes the error message as a DefaultErrorResponse
225223
// JSON and sends the message to the ReplyTo address then Acks the original
226224
// message
227-
func ReplyAndAckErrorEncoder(ctx context.Context, err error, deliv *amqp.Delivery, ch AMQPChannel, pub *amqp.Publishing) {
225+
func ReplyAndAckErrorEncoder(ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing) {
228226
ReplyErrorEncoder(ctx, err, deliv, ch, pub)
229227
deliv.Ack(false)
230228
}
231229

230+
// DefaultErrorResponse is the default structure of responses in the event
231+
// of an error
232232
type DefaultErrorResponse struct {
233233
Error string `json:"err"`
234234
}
235235

236-
// Channel interface to make testing possible
236+
// Channel is a channel interface to make testing possible
237237
// It is highly recommended to use *amqp.Channel as the interface implementation
238-
type AMQPChannel interface {
238+
type Channel interface {
239239
Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
240240
Consume(queue, consumer string, autoAck, exclusive, noLocal, noWail bool, args amqp.Table) (<-chan amqp.Delivery, error)
241241
}

0 commit comments

Comments
 (0)