diff --git a/fs/contube/memory.go b/fs/contube/memory.go index 2ff42a89..e5886f53 100644 --- a/fs/contube/memory.go +++ b/fs/contube/memory.go @@ -80,15 +80,19 @@ func (f *MemoryQueueFactory) release(name string) { func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error) { config := NewSourceQueueConfig(configMap) result := make(chan Record) + + var wg sync.WaitGroup for _, topic := range config.Topics { t := topic + wg.Add(1) go func() { <-ctx.Done() f.release(t) }() + go func() { + defer wg.Done() c := f.getOrCreateChan(t) - defer close(result) for { select { case <-ctx.Done(): @@ -99,6 +103,12 @@ func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap Config } }() } + + go func() { + wg.Wait() + close(result) + }() + return result, nil } diff --git a/fs/contube/memory_test.go b/fs/contube/memory_test.go new file mode 100644 index 00000000..138e16f1 --- /dev/null +++ b/fs/contube/memory_test.go @@ -0,0 +1,85 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package contube + +import ( + "context" + "math/rand" + "strconv" + "sync" + "testing" + "time" +) + +func TestMemoryTube(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + tubeFactory := NewMemoryQueueFactory(ctx) + memoryQueueFactory := tubeFactory.(*MemoryQueueFactory) + + var wg sync.WaitGroup + var events []Record + + topics := []string{"topic1", "topic2", "topic3"} + source, err := memoryQueueFactory.NewSourceTube(ctx, (&SourceQueueConfig{Topics: topics, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) + if err != nil { + t.Fatal(err) + } + + for i, v := range topics { + wg.Add(1) + sink, err := memoryQueueFactory.NewSinkTube(ctx, (&SinkQueueConfig{Topic: v}).ToConfigMap()) + if err != nil { + t.Fatal(err) + } + go func(i int) { + defer wg.Done() + defer close(sink) + sink <- NewRecordImpl([]byte{byte(i + 1)}, func() {}) + }(i) + } + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case event := <-source: + events = append(events, event) + if len(events) == len(topics) { + return + } + default: + continue + } + } + }() + + wg.Wait() + cancel() + + // Give enough time to ensure that the goroutine execution within NewSource Tube and NewSinkTube is complete and the released queue is successful. + time.Sleep(100 * time.Millisecond) + + // assert the memoryQueueFactory.queues is empty. + memoryQueueFactory.mu.Lock() + if len(memoryQueueFactory.queues) != 0 { + t.Fatal("MemoryQueueFactory.queues is not empty") + } + memoryQueueFactory.mu.Unlock() + +}