diff --git a/redis/config_reader.go b/redis/config_reader.go new file mode 100644 index 0000000..7fcbec3 --- /dev/null +++ b/redis/config_reader.go @@ -0,0 +1,57 @@ +package main + +import ( + "os" + "bufio" + "strings" + "net" + "strconv" +) + +type RedisConfigServer struct { + host string + port uint16 +} + +type RedisConfig struct { + servers []*RedisConfigServer +} + +func NewConfig() *RedisConfig { + return &RedisConfig { + servers: make([]*RedisConfigServer, 0), + } +} + +func ConfigFromFile(path string) (*RedisConfig, error) { + var err error + cfg := NewConfig() + + file, err := os.Open(path) + if err != nil { + return nil, err + } + + reader := bufio.NewScanner(file) + for reader.Scan() { + serverLine := strings.TrimSpace(reader.Text()) + if len(serverLine) == 0 { + continue + } + serverHost, serverPortString, err := net.SplitHostPort(serverLine) + if err != nil { + return nil, err + } + serverPort, err := strconv.ParseInt(serverPortString, 10, 16) + if err != nil { + return nil, err + } + server := &RedisConfigServer { + host: serverHost, + port: uint16(serverPort), + } + cfg.servers = append(cfg.servers, server) + } + + return cfg, nil +} diff --git a/redis/config_reader_test.go b/redis/config_reader_test.go new file mode 100644 index 0000000..bd3d8a6 --- /dev/null +++ b/redis/config_reader_test.go @@ -0,0 +1,53 @@ +package main + +import ( + "testing" + "io/ioutil" + "os" + "reflect" +) + +func TestConfigReader_FromFile(t *testing.T) { + tests := []struct { + name string + config string + expectedCfg *RedisConfig + } { + { + name: "single-redis-test-server", + config: `127.0.0.1:6379`, + expectedCfg: &RedisConfig { + servers: []*RedisConfigServer{ + &RedisConfigServer{ + host: "127.0.0.1", + port: 6379, + }, + }, + }, + }, + } + + t.Parallel() + + for _, testCase := range tests { + t.Run(testCase.name, func(t *testing.T) { + tmpFile, err := ioutil.TempFile("/tmp", "single-redis-server-test-config") + if err != nil { + t.Fatalf("Failed to create a temp file: %s", err) + } + defer os.Remove(tmpFile.Name()) + + if err := ioutil.WriteFile(tmpFile.Name(), []byte(testCase.config), 0644); err != nil { + t.Fatalf("Failed to write the data to tmp file: %s", err) + } + + cfg, err := ConfigFromFile(tmpFile.Name()) + if err != nil { + t.Fatalf("Failed to read from the config: %s", err) + } + if !reflect.DeepEqual(cfg.servers, testCase.expectedCfg.servers) { + t.Errorf("Diverging redis servers: want: %+v, got: %+v", cfg.servers, testCase.expectedCfg.servers) + } + }) + } +} diff --git a/redis/configs/flow-redis-config-path.yml b/redis/configs/flow-redis-config-path.yml new file mode 100644 index 0000000..cf7a94b --- /dev/null +++ b/redis/configs/flow-redis-config-path.yml @@ -0,0 +1,29 @@ +system: + maxprocs: 2 + metrics: + enabled: true + interval: 1 + receiver: + type: graphite + params: + namespace: metrics.flowd + host: localhost + port: 2003 + admin: + enabled: true + bind_addr: :4101 + +components: + udp_rcv: + module: receiver.udp + params: + bind_addr: :3101 + redis: + plugin: redis + constructor: New + params: + config: /Users/ruppala/go/src/github.com/awesome-flow/flow-plugins/redis/configs/redis-server.conf + +pipeline: + udp_rcv: + connect: redis diff --git a/redis/configs/redis-server.conf b/redis/configs/redis-server.conf new file mode 100644 index 0000000..dec22a0 --- /dev/null +++ b/redis/configs/redis-server.conf @@ -0,0 +1 @@ +127.0.0.1:6379 diff --git a/redis/main.go b/redis/main.go new file mode 100644 index 0000000..3094898 --- /dev/null +++ b/redis/main.go @@ -0,0 +1,96 @@ +package main + +import ( + "fmt" + "github.com/awesome-flow/flow/pkg/core" + "github.com/go-redis/redis" + "strconv" + "bytes" +) + +type RedisLink struct { + name string + config *RedisConfig + endpoints []*redis.Client + *core.Connector +} + +func New(name string, params core.Params, context *core.Context) (core.Link, error) { + link, err := bootstrap(name, params, context) + return link, err +} + +func bootstrap(name string, params core.Params, context *core.Context) (core.Link, error) { + configPath, ok := params["config"] + if !ok { + return nil, fmt.Errorf("Missing redis config path") + } + config, err := ConfigFromFile(configPath.(string)) + if err != nil { + return nil, err + } + redisEndPoints, err := buildRedisConn(config) + if err != nil { + return nil, err + } + redisLink := &RedisLink { + name, + config, + redisEndPoints, + core.NewConnectorWithContext(context), + } + return redisLink, nil +} + +// Format of message payload (stream_name id key value) +func (rl *RedisLink) Recv(msg *core.Message) error { + messageParts := bytes.Split(msg.Payload, []byte(" ")) + if len(messageParts) != 4 { + return fmt.Errorf("Invalid message payload. It should be of length 4 in the format (stream_name id key value)") + } + redisClient := rl.endpoints[0] // For now, let's consider the first one. Later, we will migrate this to a cluster + + err := sendMessageToRedisStream(redisClient, messageParts) + if err != nil { + return msg.AckFailed() + } + + return msg.AckDone() +} + + +func sendMessageToRedisStream(rClient *redis.Client, messageParts[][]byte) error { + streamTopic, streamId, key, value := messageParts[0], messageParts[1], messageParts[2], messageParts[3] + + streamValues := map[string]interface{}{string(key) : string(value)} + + redisStreamData := &redis.XAddArgs { + Stream: string(streamTopic), + ID: string(streamId), + Values: streamValues, + } + err := rClient.XAdd(redisStreamData).Err() + if err != nil { + return err + } + + return nil +} + +func buildRedisConn(config *RedisConfig) ([]*redis.Client, error) { + endpoints := make([]*redis.Client, len(config.servers)) + + for ix, serverCfg := range config.servers { + serverHost, serverPort := serverCfg.host, serverCfg.port + redisEndPoint := redis.NewClient(&redis.Options { + Addr: serverHost + ":" + strconv.Itoa(int(serverPort)), + Password: "", + DB: 0, + }) + endpoints[ix] = redisEndPoint + } + + return endpoints, nil +} + +func main() {}