Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions xstream/nodes/common_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (
//Blocking broadcast
func Broadcast(outputs map[string]chan<- interface{}, val interface{}, ctx api.StreamContext) {
logger := ctx.GetLogger()
var barrier sync.WaitGroup
barrier.Add(len(outputs))
var wg sync.WaitGroup
wg.Add(len(outputs))
for n, out := range outputs {
go func(wg *sync.WaitGroup) {
out <- val
go func(output chan<- interface{}) {
output <- val
wg.Done()
logger.Debugf("broadcast from %s to %s done", ctx.GetOpId(), n)
}(&barrier)
}(out)
}
logger.Debugf("broadcasting from %s", ctx.GetOpId())
barrier.Wait()
wg.Wait()
}
4 changes: 3 additions & 1 deletion xstream/server/server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ func createRestServer(port int) *http.Server {
r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)

return &http.Server{
server := &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", port),
// Good practice to set timeouts to avoid Slowloris attacks.
WriteTimeout: time.Second * 15,
ReadTimeout: time.Second * 15,
IdleTimeout: time.Second * 60,
Handler: r, // Pass our instance of gorilla/mux in.
}
server.SetKeepAlivesEnabled(false)
return server
}

//list or create streams
Expand Down
2 changes: 1 addition & 1 deletion xstream/sinks/mqtt_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (ms *MQTTSink) Open(ctx api.StreamContext) error {
func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
logger := ctx.GetLogger()
c := ms.conn
logger.Infof("publish %s", item)
logger.Debugf("%s publish %s", ctx.GetOpId(), item)
if token := c.Publish(ms.tpc, 0, false, item); token.Wait() && token.Error() != nil {
return fmt.Errorf("publish error: %s", token.Error())
}
Expand Down