diff --git a/acceptor.go b/acceptor.go index 0ce8e8724..24059b3cc 100644 --- a/acceptor.go +++ b/acceptor.go @@ -334,10 +334,10 @@ func (a *Acceptor) handleConnection(netConn net.Conn) { go func() { msgIn <- fixIn{msgBytes, parser.lastRead} - readLoop(parser, msgIn) + readLoop(parser, msgIn, session.sessionID.String()) }() - writeLoop(netConn, msgOut, a.globalLog) + writeLoop(netConn, msgOut, a.globalLog, 0, session.sessionID.String()) } func (a *Acceptor) dynamicSessionsLoop() { diff --git a/config/configuration.go b/config/configuration.go index ed7f1303e..90742c1c2 100644 --- a/config/configuration.go +++ b/config/configuration.go @@ -65,4 +65,5 @@ const ( RejectInvalidMessage string = "RejectInvalidMessage" DynamicSessions string = "DynamicSessions" DynamicQualifier string = "DynamicQualifier" + MaxMessagesPerSecond string = "MaxMessagesPerSecond" ) diff --git a/config/doc.go b/config/doc.go index c7223bbad..7b431bb0b 100644 --- a/config/doc.go +++ b/config/doc.go @@ -230,6 +230,12 @@ Time between reconnection attempts in seconds. Only used for initiators. Valu Defaults to 30 +MaxMessagesPerSecond + +Maximum number of messages to send per second. Required if the remote party enforces rate limiting for incoming messages. A value of 0 means no rate limiting is enforced. Only used for initiators at present. Value must be 0 or a positive integer. + +Defaults to 0. + LogoutTimeout Session setting for logout timeout in seconds. Only used for initiators. Value must be positive integer. diff --git a/connection.go b/connection.go index 4e5768a36..58da9ae41 100644 --- a/connection.go +++ b/connection.go @@ -1,28 +1,85 @@ package quickfix -import "io" +import ( + "context" + "io" + + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/time/rate" +) + +var ( + outgoingMessageCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "quickfix", + Name: "outgoing_message_count", + Help: "Count of messages sent inside quickfix writeLoop function", + }, []string{"sessionID"}) + + outgoingMessageBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "quickfix", + Name: "ougoing_message_bytes", + Help: "Count of bytes sent inside quickfix writeLoop function", + }, []string{"sessionID"}) + + incomingMessageCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "quickfix", + Name: "incoming_message_count", + Help: "Count of messages received inside quickfix writeLoop function", + }, []string{"sessionID"}) + + incomingMessageBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "quickfix", + Name: "incoming_message_bytes", + Help: "Count of bytes received inside quickfix readLoop function", + }, []string{"sessionID"}) +) + +func init() { + prometheus.MustRegister(outgoingMessageCount) + prometheus.MustRegister(outgoingMessageBytes) + prometheus.MustRegister(incomingMessageCount) + prometheus.MustRegister(incomingMessageBytes) +} + +func writeLoop(connection io.Writer, messageOut chan []byte, log Log, maxMessagesPerSecond int, sessionID string) { + limiter := rate.NewLimiter(rate.Limit(maxMessagesPerSecond), 1) -func writeLoop(connection io.Writer, messageOut chan []byte, log Log) { for { msg, ok := <-messageOut if !ok { return } + if maxMessagesPerSecond > 0 { + err := limiter.Wait(context.Background()) + if err != nil { + log.OnEvent(err.Error()) + continue + } + } + + outgoingMessageCount.WithLabelValues(sessionID).Inc() + outgoingMessageBytes.WithLabelValues(sessionID).Add(float64(len(msg))) + if _, err := connection.Write(msg); err != nil { log.OnEvent(err.Error()) } } } -func readLoop(parser *parser, msgIn chan fixIn) { +func readLoop(parser *parser, msgIn chan fixIn, sessionID string) { defer close(msgIn) for { + msg, err := parser.ReadMessage() if err != nil { return } + + incomingMessageCount.WithLabelValues(sessionID).Inc() + incomingMessageBytes.WithLabelValues(sessionID).Add(float64(msg.Len())) + msgIn <- fixIn{msg, parser.lastRead} } } diff --git a/connection_internal_test.go b/connection_internal_test.go index 54a9f347f..2f1b52b15 100644 --- a/connection_internal_test.go +++ b/connection_internal_test.go @@ -16,7 +16,7 @@ func TestWriteLoop(t *testing.T) { msgOut <- []byte("test msg 3") close(msgOut) }() - writeLoop(writer, msgOut, nullLog{}) + writeLoop(writer, msgOut, nullLog{}, 0, "sessionID") expected := "test msg 1 test msg 2 test msg 3" @@ -30,7 +30,7 @@ func TestReadLoop(t *testing.T) { stream := "hello8=FIX.4.09=5blah10=103garbage8=FIX.4.09=4foo10=103" parser := newParser(strings.NewReader(stream)) - go readLoop(parser, msgIn) + go readLoop(parser, msgIn, "sessionID") var tests = []struct { expectedMsg string diff --git a/go.mod b/go.mod index a47647044..1e1311686 100644 --- a/go.mod +++ b/go.mod @@ -7,13 +7,27 @@ require ( github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 github.com/mattn/go-sqlite3 v1.14.14 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.15.1 github.com/shopspring/decimal v1.3.1 github.com/stretchr/testify v1.8.0 - golang.org/x/net v0.0.0-20220708220712-1185a9018129 + golang.org/x/net v0.7.0 + golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/kr/text v0.2.0 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/common v0.42.0 // indirect + github.com/prometheus/procfs v0.9.0 // indirect + github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/stretchr/objx v0.4.0 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + golang.org/x/sys v0.6.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index d567c931c..d922b1aa8 100644 --- a/go.sum +++ b/go.sum @@ -1,41 +1,43 @@ github.com/armon/go-proxyproto v0.0.0-20210323213023-7e956b284f0a h1:AP/vsCIvJZ129pdm9Ek7bH7yutN3hByqsMoNrWAxRQc= github.com/armon/go-proxyproto v0.0.0-20210323213023-7e956b284f0a/go.mod h1:QmP9hvJ91BbJmGVGSbutW19IC0Q9phDCLGaomwTJbgU= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 h1:DujepqpGd1hyOd7aW59XpK7Qymp8iy83xq74fLr21is= github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-sqlite3 v1.14.14 h1:qZgc/Rwetq+MtyE18WhzjokPD93dNqLGNT3QJuLvBGw= github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/quickfixgo/enum v0.0.0-20210629025633-9afc8539baba h1:ysNRAW5kAhJ76Wo6LMAtbuFq/64s2E5KK00cS/rR/Po= -github.com/quickfixgo/enum v0.0.0-20210629025633-9afc8539baba/go.mod h1:65gdG2/8vr6uOYcjZBObVHMuTEYc5rr/+aKVWTrFIrQ= -github.com/quickfixgo/field v0.0.0-20171007195410-74cea5ec78c7 h1:a/qsvkJNoj1vcSFTzgLqNcwTRuiM1VjchoRjDOIMNyY= -github.com/quickfixgo/field v0.0.0-20171007195410-74cea5ec78c7/go.mod h1:7kiKeQwJLOrwVXqvt2GAnk4vOH0jErrB3qO6SERmq7c= -github.com/quickfixgo/fix40 v0.0.0-20171007200002-cce875b2c2e7 h1:csHnaP2l65lrchDUvpk2LbA7BF23wsl4aqFqGVX8r10= -github.com/quickfixgo/fix40 v0.0.0-20171007200002-cce875b2c2e7/go.mod h1:RC0yl+6EULF8t40eFen3UdouFVdZu1uVwMsk7O/6i5s= -github.com/quickfixgo/fix41 v0.0.0-20171007212429-b272ca343ed2 h1:dofXBfr8IrzTXvHyu0gV9KIgzBzjVyBd5rmxERw50rE= -github.com/quickfixgo/fix41 v0.0.0-20171007212429-b272ca343ed2/go.mod h1:2CARkVxpb7YV3Ib6qRjI2UFzvlIyhP0lx/nN6GDoZ7w= -github.com/quickfixgo/fix42 v0.0.0-20171007212724-86a4567f1c77 h1:mD360ECTwAK4jbOIrqAH2MdJF3RrH5KuboFNdxSmIDM= -github.com/quickfixgo/fix42 v0.0.0-20171007212724-86a4567f1c77/go.mod h1:RyVaOPb/+NasHAt2e5UJ9PjXT+5AeqyKZfNPOB4UspE= -github.com/quickfixgo/fix43 v0.0.0-20171007213001-a7ff4f2a2470 h1:1bkIwYMs5XrjvKouIiqn2Iiw46XKcNTS1as9hCZqnMg= -github.com/quickfixgo/fix43 v0.0.0-20171007213001-a7ff4f2a2470/go.mod h1:qpAUIIjRXEQRtuMpJolR+8VkDajoU8J7Iw55Gnwm15s= -github.com/quickfixgo/fix44 v0.0.0-20171007213039-f090a1006218 h1:zrm7CRhis2ArB/xjOj0EQJOuHq5fI0JpS4AILtjwUug= -github.com/quickfixgo/fix44 v0.0.0-20171007213039-f090a1006218/go.mod h1:KFN4LkI1sidKgWnUvmpDdvsa+aNvcbExtS8iPQvA9ys= -github.com/quickfixgo/fixt11 v0.0.0-20171007213433-d9788ca97f5d h1:PlymcwOkKZXnOI3uJZu0lpnvnweTkML9YxnTuYhhzIM= -github.com/quickfixgo/fixt11 v0.0.0-20171007213433-d9788ca97f5d/go.mod h1:/oN4Arv+/8zKshQTj+ggpWfjXuVv9bMUO93zOO6R+oM= -github.com/quickfixgo/tag v0.0.0-20171007194743-cbb465760521 h1:RXfjXtjvXb4wgzBHTTFbyW5uBP04Nrlky9e9lAe8mIE= -github.com/quickfixgo/tag v0.0.0-20171007194743-cbb465760521/go.mod h1:EKAI2kkSaIuSywW0WbIgXIcuA9vS4IXfCga9U9Oax2E= +github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI= +github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= +github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= +github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= +github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= +github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= +github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= +github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -44,11 +46,20 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -golang.org/x/net v0.0.0-20220708220712-1185a9018129 h1:vucSRfWwTsoXro7P+3Cjlr6flUMtzCwzlvkxEQtHHB0= -golang.org/x/net v0.0.0-20220708220712-1185a9018129/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 h1:ftMN5LMiBFjbzleLqtoBZk7KdJwhuybIU+FckUHgoyQ= +golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/initiator.go b/initiator.go index 3b6672395..cb63d8734 100644 --- a/initiator.go +++ b/initiator.go @@ -176,10 +176,10 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di goto reconnect } - go readLoop(newParser(bufio.NewReader(netConn)), msgIn) + go readLoop(newParser(bufio.NewReader(netConn)), msgIn, session.sessionID.String()) disconnected = make(chan interface{}) go func() { - writeLoop(netConn, msgOut, session.log) + writeLoop(netConn, msgOut, session.log, session.MaxMessagesPerSecond, session.sessionID.String()) if err := netConn.Close(); err != nil { session.log.OnEvent(err.Error()) } diff --git a/internal/session_settings.go b/internal/session_settings.go index 7547c0fa6..48c9bea9b 100644 --- a/internal/session_settings.go +++ b/internal/session_settings.go @@ -18,7 +18,7 @@ type SessionSettings struct { MaxLatency time.Duration DisableMessagePersist bool - //required on logon for FIX.T.1 messages + //required on logon for FIXT.1.1 messages DefaultApplVerID string //specific to initiators @@ -26,4 +26,5 @@ type SessionSettings struct { LogoutTimeout time.Duration LogonTimeout time.Duration SocketConnectAddress []string + MaxMessagesPerSecond int } diff --git a/session_factory.go b/session_factory.go index 8fc42d3c0..297d4b635 100644 --- a/session_factory.go +++ b/session_factory.go @@ -389,6 +389,20 @@ func (f sessionFactory) buildInitiatorSettings(session *session, settings *Sessi session.LogonTimeout = time.Duration(timeout) * time.Second } + session.MaxMessagesPerSecond = 0 + if settings.HasSetting(config.MaxMessagesPerSecond) { + maxMessages, err := settings.IntSetting(config.MaxMessagesPerSecond) + if err != nil { + return err + } + + if maxMessages < 0 { + return errors.New("MaxMessagesPerSecond must be greater or equal to zero") + } + + session.MaxMessagesPerSecond = maxMessages + } + return f.configureSocketConnectAddress(session, settings) } diff --git a/session_factory_test.go b/session_factory_test.go index a86cfe369..ec6f5cf66 100644 --- a/session_factory_test.go +++ b/session_factory_test.go @@ -53,6 +53,7 @@ func (s *SessionFactorySuite) TestDefaults() { s.Equal(120*time.Second, session.MaxLatency) s.False(session.DisableMessagePersist) s.False(session.HeartBtIntOverride) + s.Equal(0, session.MaxMessagesPerSecond) } func (s *SessionFactorySuite) TestResetOnLogon() { @@ -349,6 +350,7 @@ func (s *SessionFactorySuite) TestNewSessionBuildInitiators() { s.SessionSettings.Set(config.HeartBtInt, "34") s.SessionSettings.Set(config.SocketConnectHost, "127.0.0.1") s.SessionSettings.Set(config.SocketConnectPort, "5000") + s.SessionSettings.Set(config.MaxMessagesPerSecond, "200") session, err := s.newSession(s.SessionID, s.MessageStoreFactory, s.SessionSettings, s.LogFactory, s.App) s.Nil(err) @@ -358,6 +360,7 @@ func (s *SessionFactorySuite) TestNewSessionBuildInitiators() { s.Equal(10*time.Second, session.LogonTimeout) s.Equal(2*time.Second, session.LogoutTimeout) s.Equal("127.0.0.1:5000", session.SocketConnectAddress[0]) + s.Equal(200, session.MaxMessagesPerSecond) } func (s *SessionFactorySuite) TestNewSessionBuildAcceptors() {