Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/en_US/guide/streams/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Below is the list of data types supported.
| TIMESTAMP | true | The field to represent the event's timestamp. If specified, the rule will run with event time. Otherwise, it will run with processing time. Please refer to [timestamp management](../../sqls/windows.md#timestamp-management) for details. |
| TIMESTAMP_FORMAT | true | The default format to be used when converting string to or from datetime type. |
| VERSION | true | Version of the stream, check [versioning](#versioning)。 |
| TEMP | true | Whether the stream is temporary. Temporary streams are stored in memory only and will be lost when eKuiper restarts. Default is false. Check [Temporary Streams](#temporary-streams) for more details. |

**Example 1,**

Expand Down Expand Up @@ -202,6 +203,28 @@ demoBin (

If "BINARY" format stream is defined as schemaless, a default field named `self` will be assigned for the binary payload.

## Temporary Streams

Temporary streams are in-memory streams that are not persisted to disk. They are useful for intermediate data processing
or testing scenarios where persistence is not required. Temporary streams have the following characteristics:

- **In-memory storage**: Stream definitions are stored in memory only and will be lost when eKuiper restarts.
- **Cannot be replaced**: Once created, temporary streams cannot be replaced using `REPLACE STREAM` statement.
- **Usage restriction**: Temporary streams can only be used by temporary rules. Non-temporary rules cannot reference
temporary streams.

### Creating a Temporary Stream

To create a temporary stream, set the `TEMP` option to `true`:

```sql
CREATE
STREAM temp_sensor (
temperature FLOAT,
humidity FLOAT
) WITH (DATASOURCE="sensor/data", FORMAT="json", TEMP="true");
```

## Versioning

The stream can have an optional **version** field to control updates. When you update a stream, the system compares the
Expand Down
21 changes: 21 additions & 0 deletions docs/zh_CN/guide/streams/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ CREATE STREAM
| TIMESTAMP | 是 | 代表该事件时间戳的字段名。如果有设置,则使用此流的规则将采用事件时间;否则将采用处理时间。详情请看[时间戳管理](../../sqls/windows.md#时间戳管理)。 |
| TIMESTAMP_FORMAT | 是 | 字符串和时间格式转换时使用的默认格式。 |
| VERSION | 是 | 流的版本号,详情情况[版本控制](#版本控制)。 |
| TEMP | 是 | 流是否为临时流。临时流仅存储在内存中,eKuiper 重启后会丢失。默认为 false。详情请参阅[临时流](#临时流)。 |

**示例1**

Expand Down Expand Up @@ -196,6 +197,26 @@ demoBin (

如果 "BINARY" 格式流定义为 schemaless,数据将会解析到默认的名为 `self` 的字段。

## 临时流

临时流是仅存储在内存中的流,不会持久化到磁盘。它们适用于中间数据处理或测试场景,在这些场景中不需要持久化。临时流具有以下特性:

- **内存存储**: 流定义仅存储在内存中,eKuiper 重启后会丢失。
- **不可替换**: 一旦创建,临时流不能使用 `REPLACE STREAM` 语句进行替换。
- **使用限制**: 临时流只能被临时规则使用。非临时规则不能引用临时流。

### 创建临时流

要创建临时流,请将 `TEMP` 选项设置为 `true`:

```sql
CREATE
STREAM temp_sensor (
temperature FLOAT,
humidity FLOAT
) WITH (DATASOURCE="sensor/data", FORMAT="json", TEMP="true");
```

## 版本控制

Stream 可以包含一个可选的 **version** 字段,用于控制更新逻辑。当您更新一个流时,系统会将新的版本字符串与现有版本进行比较。只有当新版本在
Expand Down
3 changes: 2 additions & 1 deletion fvt/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,15 @@ func (s *RuleTestSuite) TestRuleDisableBufferFullDiscard() {
resp, err := client.CreateConf("sources/simulator/confKeys/sim1", conf)
s.Require().NoError(err)
s.Require().Equal(http.StatusOK, resp.StatusCode)
streamSql := `{"sql": "create stream sim1() WITH (TYPE=\"simulator\", CONF_KEY=\"sim1\")"}`
streamSql := `{"sql": "create stream sim1() WITH (TYPE=\"simulator\", CONF_KEY=\"sim1\", TEMP=\"true\")"}`
resp, err = client.CreateStream(streamSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)
ruleSql := `{
"id": "ruleSim1",
"sql": "SELECT * FROM sim1",
"temp":true,
"actions": [
{
"memory":{
Expand Down
128 changes: 128 additions & 0 deletions internal/pkg/store/memory/memoryKv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package memory

import (
"fmt"
"strings"
"sync"

"github.com/lf-edge/ekuiper/v2/pkg/kv"
)

type memoryKvStore struct {
data map[string]string
mu sync.RWMutex
}

func NewMemoryKV() kv.KeyValue {
return &memoryKvStore{
data: make(map[string]string),
}
}

func (m *memoryKvStore) Open() error {
return nil
}

func (m *memoryKvStore) Close() error {
return nil
}

func (m *memoryKvStore) Set(key string, value interface{}) error {
m.mu.Lock()
defer m.mu.Unlock()
if v, ok := value.(string); ok {
m.data[key] = v
return nil
}
return fmt.Errorf("value must be string")
}

func (m *memoryKvStore) Setnx(key string, value interface{}) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.data[key]; ok {
return fmt.Errorf("key %s already exists", key)
}
if v, ok := value.(string); ok {
m.data[key] = v
return nil
}
return fmt.Errorf("value must be string")
}

func (m *memoryKvStore) Get(key string, value interface{}) (bool, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if v, ok := m.data[key]; ok {
if ptr, ok := value.(*string); ok && ptr != nil {
*ptr = v
}
return true, nil
}
return false, nil
}

func (m *memoryKvStore) Delete(key string) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.data, key)
return nil
}

func (m *memoryKvStore) Keys() ([]string, error) {
m.mu.RLock()
defer m.mu.RUnlock()
keys := make([]string, 0, len(m.data))
for k := range m.data {
keys = append(keys, k)
}
return keys, nil
}

func (m *memoryKvStore) All() (map[string]string, error) {
m.mu.RLock()
defer m.mu.RUnlock()
// Return a copy to avoid race conditions if the caller modifies the map (though signature returns map[string]string, usually it's better to copy)
// But matching the interface, let's just return a copy.
result := make(map[string]string, len(m.data))
for k, v := range m.data {
result[k] = v
}
return result, nil
}

func (m *memoryKvStore) Clean() error {
m.mu.Lock()
defer m.mu.Unlock()
m.data = make(map[string]string)
return nil
}

func (m *memoryKvStore) Drop() error {
return m.Clean()
}

func (m *memoryKvStore) SetKeyedState(key string, value interface{}) error {
return m.Set(key, value)
}

func (m *memoryKvStore) GetKeyedState(key string) (interface{}, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if v, ok := m.data[key]; ok {
return v, nil
}
return nil, nil
}

func (m *memoryKvStore) GetByPrefix(prefix string) (map[string][]byte, error) {
m.mu.RLock()
defer m.mu.RUnlock()
result := make(map[string][]byte)
for k, v := range m.data {
if strings.HasPrefix(k, prefix) {
result[k] = []byte(v)
}
}
return result, nil
}
Loading