Skip to content

Commit cc0b3f4

Browse files
authored
feat: add grpc runtime (#135)
Signed-off-by: Zike Yang <[email protected]>
1 parent 575aafc commit cc0b3f4

29 files changed

+1960
-255
lines changed

Makefile

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,18 @@ gen_rest_client:
4141
rm -r restclient/go.mod restclient/go.sum restclient/.travis.yml restclient/.openapi-generator-ignore \
4242
restclient/git_push.sh restclient/.openapi-generator restclient/api restclient/test
4343

44+
proto:
45+
for PROTO_FILE in $$(find . -name '*.proto'); do \
46+
echo "generating codes for $$PROTO_FILE"; \
47+
protoc \
48+
--go_out=. \
49+
--go_opt paths=source_relative \
50+
--plugin protoc-gen-go="${GOPATH}/bin/protoc-gen-go" \
51+
--go-grpc_out=. \
52+
--go-grpc_opt paths=source_relative \
53+
--plugin protoc-gen-go-grpc="${GOPATH}/bin/protoc-gen-go-grpc" \
54+
$$PROTO_FILE; \
55+
done
56+
4457
license:
4558
./license-checker/license-checker.sh

benchmark/bench_test.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ import (
3535
)
3636

3737
func BenchmarkStressForBasicFunc(b *testing.B) {
38-
s := server.New(server.LoadConfigFromEnv())
38+
s, err := server.NewServer(server.LoadConfigFromEnv())
39+
if err != nil {
40+
b.Fatal(err)
41+
}
3942
svrCtx, svrCancel := context.WithCancel(context.Background())
4043
go s.Run(svrCtx)
4144
defer func() {
@@ -103,14 +106,18 @@ func BenchmarkStressForBasicFunc(b *testing.B) {
103106
func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {
104107
memoryQueueFactory := contube.NewMemoryQueueFactory(context.Background())
105108

106-
svrConf := &fs.Config{
109+
svrConf := &common.Config{
107110
ListenAddr: common.DefaultAddr,
108-
TubeBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) {
109-
return memoryQueueFactory, nil
110-
},
111111
}
112112

113-
s := server.New(svrConf)
113+
fm, err := fs.NewFunctionManager(fs.WithDefaultTubeFactory(memoryQueueFactory))
114+
if err != nil {
115+
b.Fatal(err)
116+
}
117+
s, err := server.NewServer(svrConf, server.WithFunctionManager(fm))
118+
if err != nil {
119+
b.Fatal(err)
120+
}
114121
svrCtx, svrCancel := context.WithCancel(context.Background())
115122
go s.Run(svrCtx)
116123
defer func() {
@@ -130,7 +137,7 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {
130137
Output: outputTopic,
131138
Replicas: &replicas,
132139
},
133-
QueueBuilder: func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) {
140+
QueueBuilder: func(ctx context.Context, c *common.Config) (contube.TubeFactory, error) {
134141
return memoryQueueFactory, nil
135142
},
136143
}

cmd/server/cmd.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ var (
3535

3636
func exec(*cobra.Command, []string) {
3737
common.RunProcess(func() (io.Closer, error) {
38-
s := server.New(server.LoadConfigFromEnv())
38+
s, err := server.NewServer(server.LoadConfigFromEnv())
39+
if err != nil {
40+
return nil, err
41+
}
3942
go s.Run(context.Background())
4043
return s, nil
4144
})

cmd/standalone/cmd.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ var (
3535

3636
func exec(*cobra.Command, []string) {
3737
common.RunProcess(func() (io.Closer, error) {
38-
s := server.New(server.LoadStandaloneConfigFromEnv())
38+
s, err := server.NewServer(server.LoadStandaloneConfigFromEnv())
39+
if err != nil {
40+
return nil, err
41+
}
3942
go s.Run(context.Background())
4043
return s, nil
4144
})

fs/config.go renamed to common/config.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,11 @@
1414
* limitations under the License.
1515
*/
1616

17-
package fs
18-
19-
import (
20-
"context"
21-
"github.com/functionstream/functionstream/fs/contube"
22-
)
23-
24-
type TubeBuilder func(ctx context.Context, config *Config) (contube.TubeFactory, error)
17+
package common
2518

2619
// Config is a struct that holds the configuration for a function stream.
2720
type Config struct {
28-
ListenAddr string // ListenAddr is the address that the function stream REST service will listen on.
29-
PulsarURL string // PulsarURL is the URL of the Pulsar service. It's used for the pulsar_tube
30-
TubeBuilder TubeBuilder // TubeBuilder is a function that will be used to build the tube.
21+
ListenAddr string // ListenAddr is the address that the function stream REST service will listen on.
22+
PulsarURL string // PulsarURL is the URL of the Pulsar service. It's used for the pulsar_tube
23+
TubeType string
3124
}

common/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package common
1818

1919
const (
2020
PulsarTubeType = "pulsar"
21+
MemoryTubeType = "memory"
2122

2223
DefaultAddr = "localhost:7300"
2324
DefaultPulsarURL = "pulsar://localhost:6650"

common/model/function.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717
package model
1818

1919
type Function struct {
20-
Name string
21-
Archive string
22-
Inputs []string
20+
Name string
21+
Archive string
22+
Source map[string]any
23+
Sink map[string]any
24+
// Deprecate
25+
Inputs []string
26+
// Deprecate
2327
Output string
2428
Config map[string]string
2529
Replicas int32

fs/api/instance.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2024 Function Stream Org.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"github.com/functionstream/functionstream/common/model"
21+
"github.com/functionstream/functionstream/fs/contube"
22+
"golang.org/x/net/context"
23+
)
24+
25+
type FunctionInstance interface {
26+
Context() context.Context
27+
Definition() *model.Function
28+
Stop()
29+
Run(factory FunctionRuntimeFactory)
30+
WaitForReady() <-chan error
31+
}
32+
33+
type FunctionInstanceFactory interface {
34+
NewFunctionInstance(f *model.Function, queueFactory contube.TubeFactory, i int32) FunctionInstance
35+
}

fs/api/runtime.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2024 Function Stream Org.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"github.com/functionstream/functionstream/fs/contube"
21+
)
22+
23+
type FunctionRuntime interface {
24+
WaitForReady() <-chan error
25+
Call(e contube.Record) (contube.Record, error)
26+
Stop()
27+
}
28+
29+
type FunctionRuntimeFactory interface {
30+
NewFunctionRuntime(instance FunctionInstance) (FunctionRuntime, error)
31+
}

0 commit comments

Comments
 (0)