Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions benchmark/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,14 @@ func BenchmarkStressForBasicFunc(b *testing.B) {
PulsarURL: "pulsar://localhost:6650",
RequestRate: 200000.0,
Func: &restclient.Function{
Archive: "../bin/example_basic.wasm",
Runtime: &restclient.FunctionRuntime{
Config: map[string]interface{}{
common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm",
},
},
Inputs: []string{inputTopic},
Output: outputTopic,
Replicas: &replicas,
Replicas: replicas,
},
}

Expand Down Expand Up @@ -132,10 +136,14 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {
pConfig := &perf.Config{
RequestRate: 200000.0,
Func: &restclient.Function{
Archive: "../bin/example_basic.wasm",
Runtime: &restclient.FunctionRuntime{
Config: map[string]interface{}{
common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm",
},
},
Inputs: []string{inputTopic},
Output: outputTopic,
Replicas: &replicas,
Replicas: replicas,
},
QueueBuilder: func(ctx context.Context, c *common.Config) (contube.TubeFactory, error) {
return memoryQueueFactory, nil
Expand Down
11 changes: 7 additions & 4 deletions cmd/client/create/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"github.com/functionstream/functionstream/cmd/client/common"
fs_cmmon "github.com/functionstream/functionstream/common"
"github.com/functionstream/functionstream/restclient"
"github.com/spf13/cobra"
"io"
Expand Down Expand Up @@ -63,10 +64,12 @@ func exec(_ *cobra.Command, _ []string) {
}}
cli := restclient.NewAPIClient(cfg)
f := restclient.Function{
Name: &config.name,
Archive: config.archive,
Inputs: config.inputs,
Output: config.output,
Name: &config.name,
Runtime: &restclient.FunctionRuntime{Config: map[string]interface{}{
fs_cmmon.RuntimeArchiveConfigKey: config.archive,
}},
Inputs: config.inputs,
Output: config.output,
}

res, err := cli.DefaultAPI.ApiV1FunctionFunctionNamePost(context.Background(), config.name).Function(f).Execute()
Expand Down
2 changes: 2 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ const (
DefaultAddr = "localhost:7300"
DefaultPulsarURL = "pulsar://localhost:6650"
DefaultTubeType = PulsarTubeType

RuntimeArchiveConfigKey = "archive"
)
6 changes: 4 additions & 2 deletions common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package common
import "errors"

var (
ErrorFunctionNotFound = errors.New("function not found")
ErrorFunctionExists = errors.New("function already exists")
ErrorFunctionNotFound = errors.New("function not found")
ErrorFunctionExists = errors.New("function already exists")
ErrorTubeFactoryNotFound = errors.New("tube factory not found")
ErrorRuntimeFactoryNotFound = errors.New("runtime factory not found")
)
32 changes: 22 additions & 10 deletions common/model/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,27 @@

package model

import "github.com/functionstream/functionstream/fs/contube"

type TubeConfig struct {
Config contube.ConfigMap `json:"config,omitempty" yaml:"config,omitempty"`
Type *string `json:"type,omitempty" yaml:"type,omitempty"` // Default to `default`
}

type ConfigMap map[string]interface{}

type RuntimeConfig struct {
Config ConfigMap `json:"config,omitempty" yaml:"config,omitempty"`
Type *string `json:"type,omitempty" yaml:"type,omitempty"`
}

type Function struct {
Name string
Archive string
Source map[string]any
Sink map[string]any
// Deprecate
Inputs []string
// Deprecate
Output string
Config map[string]string
Replicas int32
Name string `json:"name" yaml:"name"`
Runtime *RuntimeConfig `json:"runtime" yaml:"runtime"`
Source *TubeConfig `json:"source,omitempty" yaml:"source,omitempty"`
Sink *TubeConfig `json:"sink,omitempty" yaml:"sink,omitempty"`
Inputs []string `json:"inputs" yaml:"inputs"`
Output string `json:"output" yaml:"output"`
Config map[string]string `json:"config,omitempty" yaml:"config,omitempty"`
Replicas int32 `json:"replicas" yaml:"replicas"`
}
126 changes: 126 additions & 0 deletions common/model/function_serde_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2024 Function Stream Org.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package model

import (
"encoding/json"
"fmt"
"github.com/functionstream/functionstream/common"
"gopkg.in/yaml.v3"
"reflect"
"testing"
)

func TestFunctionSerde(t *testing.T) {
f := Function{
Name: "TestFunction",
Runtime: &RuntimeConfig{Type: common.OptionalStr("runtime"), Config: map[string]interface{}{"key": "value"}},
Source: &TubeConfig{Type: common.OptionalStr("source"), Config: map[string]interface{}{"key": "value"}},
Sink: &TubeConfig{Type: common.OptionalStr("sink"), Config: map[string]interface{}{"key": "value"}},
Inputs: []string{"input1", "input2"},
Output: "output",
Config: map[string]string{"key": "value"},
Replicas: 2,
}

// JSON Serialization
data, err := json.Marshal(f)
if err != nil {
t.Fatal("JSON Serialization error:", err)
}

fmt.Println(string(data))

// JSON Deserialization
var f2 Function
err = json.Unmarshal(data, &f2)
if err != nil {
t.Fatal("JSON Deserialization error:", err)
}

if !reflect.DeepEqual(f, f2) {
t.Error("JSON Deserialization does not match original")
}

// YAML Serialization
data, err = yaml.Marshal(f)
if err != nil {
t.Fatal("YAML Serialization error:", err)
}

fmt.Println(string(data))

// YAML Deserialization
err = yaml.Unmarshal(data, &f2)
if err != nil {
t.Fatal("YAML Deserialization error:", err)
}

if !reflect.DeepEqual(f, f2) {
t.Error("YAML Deserialization does not match original")
}
}

func TestFunctionSerdeWithNil(t *testing.T) {
f := Function{
Name: "TestFunction",
Runtime: nil,
Source: nil,
Sink: nil,
Inputs: []string{"input1", "input2"},
Output: "output",
Config: map[string]string{"key": "value"},
Replicas: 2,
}

// JSON Serialization
data, err := json.Marshal(f)
if err != nil {
t.Fatal("JSON Serialization error:", err)
}

fmt.Println(string(data))

// JSON Deserialization
var f2 Function
err = json.Unmarshal(data, &f2)
if err != nil {
t.Fatal("JSON Deserialization error:", err)
}

if !reflect.DeepEqual(f, f2) {
t.Error("JSON Deserialization does not match original")
}

// YAML Serialization
data, err = yaml.Marshal(f)
if err != nil {
t.Fatal("YAML Serialization error:", err)
}

fmt.Println(string(data))

// YAML Deserialization
err = yaml.Unmarshal(data, &f2)
if err != nil {
t.Fatal("YAML Deserialization error:", err)
}

if !reflect.DeepEqual(f, f2) {
t.Error("YAML Deserialization does not match original")
}
}
21 changes: 21 additions & 0 deletions common/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2024 Function Stream Org.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package common

func OptionalStr(s string) *string {
return &s
}
2 changes: 1 addition & 1 deletion fs/api/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ type FunctionInstance interface {
}

type FunctionInstanceFactory interface {
NewFunctionInstance(f *model.Function, queueFactory contube.TubeFactory, i int32) FunctionInstance
NewFunctionInstance(f *model.Function, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, i int32) FunctionInstance
}
21 changes: 20 additions & 1 deletion fs/contube/contube.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,35 @@ func (c *SinkQueueConfig) ToConfigMap() ConfigMap {

type ConfigMap map[string]interface{}

type TubeFactory interface {
// Merge merges multiple ConfigMap into one
func Merge(configs ...ConfigMap) ConfigMap {
result := ConfigMap{}
for _, config := range configs {
for k, v := range config {
result[k] = v
}
}
return result
}

type SourceTubeFactory interface {
// NewSourceTube returns a new channel that can be used to receive events
// The channel would be closed when the context is done
NewSourceTube(ctx context.Context, config ConfigMap) (<-chan Record, error)
}

type SinkTubeFactory interface {
// NewSinkTube returns a new channel that can be used to sink events
// The event.Commit() would be invoked after the event is sunk successfully
// The caller should close the channel when it is done
NewSinkTube(ctx context.Context, config ConfigMap) (chan<- Record, error)
}

type TubeFactory interface {
SourceTubeFactory
SinkTubeFactory
}

type RecordImpl struct {
payload []byte
commitFunc func()
Expand Down
41 changes: 25 additions & 16 deletions fs/instance_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import (
)

type FunctionInstanceImpl struct {
ctx context.Context
cancelFunc context.CancelFunc
definition *model.Function
tubeFactory contube.TubeFactory
readyCh chan error
index int32
ctx context.Context
cancelFunc context.CancelFunc
definition *model.Function
sourceFactory contube.SourceTubeFactory
sinkFactory contube.SinkTubeFactory
readyCh chan error
index int32
}

type DefaultInstanceFactory struct{}
Expand All @@ -42,19 +43,20 @@ func NewDefaultInstanceFactory() api.FunctionInstanceFactory {
return &DefaultInstanceFactory{}
}

func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, queueFactory contube.TubeFactory, index int32) api.FunctionInstance {
func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, index int32) api.FunctionInstance {
ctx, cancelFunc := context.WithCancel(context.Background())
ctx.Value(logrus.Fields{
"function-name": definition.Name,
"function-index": index,
})
return &FunctionInstanceImpl{
ctx: ctx,
cancelFunc: cancelFunc,
definition: definition,
tubeFactory: queueFactory,
readyCh: make(chan error),
index: index,
ctx: ctx,
cancelFunc: cancelFunc,
definition: definition,
sourceFactory: sourceFactory,
sinkFactory: sinkFactory,
readyCh: make(chan error),
index: index,
}
}

Expand All @@ -73,13 +75,20 @@ func (instance *FunctionInstanceImpl) Run(runtimeFactory api.FunctionRuntimeFact
instance.readyCh <- errors.Wrap(err, "Error creating runtime")
return
}

sourceChan, err := instance.tubeFactory.NewSourceTube(instance.ctx, (&contube.SourceQueueConfig{Topics: instance.definition.Inputs, SubName: fmt.Sprintf("function-stream-%s", instance.definition.Name)}).ToConfigMap())
getTubeConfig := func(config contube.ConfigMap, tubeConfig *model.TubeConfig) contube.ConfigMap {
if tubeConfig != nil && tubeConfig.Config != nil {
return contube.Merge(config, tubeConfig.Config)
}
return config
}
sourceConfig := (&contube.SourceQueueConfig{Topics: instance.definition.Inputs, SubName: fmt.Sprintf("function-stream-%s", instance.definition.Name)}).ToConfigMap()
sourceChan, err := instance.sourceFactory.NewSourceTube(instance.ctx, getTubeConfig(sourceConfig, instance.definition.Source))
if err != nil {
instance.readyCh <- errors.Wrap(err, "Error creating source event queue")
return
}
sinkChan, err := instance.tubeFactory.NewSinkTube(instance.ctx, (&contube.SinkQueueConfig{Topic: instance.definition.Output}).ToConfigMap())
sinkConfig := (&contube.SinkQueueConfig{Topic: instance.definition.Output}).ToConfigMap()
sinkChan, err := instance.sinkFactory.NewSinkTube(instance.ctx, getTubeConfig(sinkConfig, instance.definition.Sink))
if err != nil {
instance.readyCh <- errors.Wrap(err, "Error creating sink event queue")
return
Expand Down
Loading