Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions redis/config_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"os"
"bufio"
"strings"
"net"
"strconv"
)

type RedisConfigServer struct {
host string
port uint16
}

type RedisConfig struct {
servers []*RedisConfigServer
}

func NewConfig() *RedisConfig {
return &RedisConfig {
servers: make([]*RedisConfigServer, 0),
}
}

func ConfigFromFile(path string) (*RedisConfig, error) {
var err error
cfg := NewConfig()

file, err := os.Open(path)
if err != nil {
return nil, err
}

reader := bufio.NewScanner(file)
for reader.Scan() {
serverLine := strings.TrimSpace(reader.Text())
if len(serverLine) == 0 {
continue
}
serverHost, serverPortString, err := net.SplitHostPort(serverLine)
if err != nil {
return nil, err
}
serverPort, err := strconv.ParseInt(serverPortString, 10, 16)
if err != nil {
return nil, err
}
server := &RedisConfigServer {
host: serverHost,
port: uint16(serverPort),
}
cfg.servers = append(cfg.servers, server)
}

return cfg, nil
}
53 changes: 53 additions & 0 deletions redis/config_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"testing"
"io/ioutil"
"os"
"reflect"
)

func TestConfigReader_FromFile(t *testing.T) {
tests := []struct {
name string
config string
expectedCfg *RedisConfig
} {
{
name: "single-redis-test-server",
config: `127.0.0.1:6379`,
expectedCfg: &RedisConfig {
servers: []*RedisConfigServer{
&RedisConfigServer{
host: "127.0.0.1",
port: 6379,
},
},
},
},
}

t.Parallel()

for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
tmpFile, err := ioutil.TempFile("/tmp", "single-redis-server-test-config")
if err != nil {
t.Fatalf("Failed to create a temp file: %s", err)
}
defer os.Remove(tmpFile.Name())

if err := ioutil.WriteFile(tmpFile.Name(), []byte(testCase.config), 0644); err != nil {
t.Fatalf("Failed to write the data to tmp file: %s", err)
}

cfg, err := ConfigFromFile(tmpFile.Name())
if err != nil {
t.Fatalf("Failed to read from the config: %s", err)
}
if !reflect.DeepEqual(cfg.servers, testCase.expectedCfg.servers) {
t.Errorf("Diverging redis servers: want: %+v, got: %+v", cfg.servers, testCase.expectedCfg.servers)
}
})
}
}
29 changes: 29 additions & 0 deletions redis/configs/flow-redis-config-path.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
system:
maxprocs: 2
metrics:
enabled: true
interval: 1
receiver:
type: graphite
params:
namespace: metrics.flowd
host: localhost
port: 2003
admin:
enabled: true
bind_addr: :4101

components:
udp_rcv:
module: receiver.udp
params:
bind_addr: :3101
redis:
plugin: redis
constructor: New
params:
config: /Users/ruppala/go/src/github.com/awesome-flow/flow-plugins/redis/configs/redis-server.conf

pipeline:
udp_rcv:
connect: redis
1 change: 1 addition & 0 deletions redis/configs/redis-server.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
127.0.0.1:6379
96 changes: 96 additions & 0 deletions redis/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package main

import (
"fmt"
"github.com/awesome-flow/flow/pkg/core"
"github.com/go-redis/redis"
"strconv"
"bytes"
)

type RedisLink struct {
name string
config *RedisConfig
endpoints []*redis.Client
*core.Connector
}

func New(name string, params core.Params, context *core.Context) (core.Link, error) {
link, err := bootstrap(name, params, context)
return link, err
}

func bootstrap(name string, params core.Params, context *core.Context) (core.Link, error) {
configPath, ok := params["config"]
if !ok {
return nil, fmt.Errorf("Missing redis config path")
}
config, err := ConfigFromFile(configPath.(string))
if err != nil {
return nil, err
}
redisEndPoints, err := buildRedisConn(config)
if err != nil {
return nil, err
}
redisLink := &RedisLink {
name,
config,
redisEndPoints,
core.NewConnectorWithContext(context),
}
return redisLink, nil
}

// Format of message payload (stream_name id key value)
func (rl *RedisLink) Recv(msg *core.Message) error {
messageParts := bytes.Split(msg.Payload, []byte(" "))
if len(messageParts) != 4 {
return fmt.Errorf("Invalid message payload. It should be of length 4 in the format (stream_name id key value)")
}
redisClient := rl.endpoints[0] // For now, let's consider the first one. Later, we will migrate this to a cluster

err := sendMessageToRedisStream(redisClient, messageParts)
if err != nil {
return msg.AckFailed()
}

return msg.AckDone()
}


func sendMessageToRedisStream(rClient *redis.Client, messageParts[][]byte) error {
streamTopic, streamId, key, value := messageParts[0], messageParts[1], messageParts[2], messageParts[3]

streamValues := map[string]interface{}{string(key) : string(value)}

redisStreamData := &redis.XAddArgs {
Stream: string(streamTopic),
ID: string(streamId),
Values: streamValues,
}
err := rClient.XAdd(redisStreamData).Err()
if err != nil {
return err
}

return nil
}

func buildRedisConn(config *RedisConfig) ([]*redis.Client, error) {
endpoints := make([]*redis.Client, len(config.servers))

for ix, serverCfg := range config.servers {
serverHost, serverPort := serverCfg.host, serverCfg.port
redisEndPoint := redis.NewClient(&redis.Options {
Addr: serverHost + ":" + strconv.Itoa(int(serverPort)),
Password: "",
DB: 0,
})
endpoints[ix] = redisEndPoint
}

return endpoints, nil
}

func main() {}