diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 8cc9a28bad90..5eff82f30ce5 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -11229,7 +11229,7 @@ int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq, bo TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId)); if (pReq->pStRtFuncInfo) { TAOS_CHECK_EXIT(tEncodeI32(&encoder, 1)); - TAOS_CHECK_EXIT(tSerializeStRtFuncInfo(&encoder, pReq->pStRtFuncInfo, pReq->reset && needStreamPesudoFuncVals)); + TAOS_CHECK_EXIT(tSerializeStRtFuncInfo(&encoder, pReq->pStRtFuncInfo, /* pReq->reset && */ needStreamPesudoFuncVals)); } else { TAOS_CHECK_EXIT(tEncodeI32(&encoder, 0)); } diff --git a/source/libs/executor/src/externalwindowoperator.c b/source/libs/executor/src/externalwindowoperator.c index 309373a6b26d..147b28a7f643 100644 --- a/source/libs/executor/src/externalwindowoperator.c +++ b/source/libs/executor/src/externalwindowoperator.c @@ -1786,7 +1786,7 @@ static int32_t extWinAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) blockDataCleanup(pBlock); taosArrayClear(pExtW->pWinRowIdx); - for (; pExtW->outputWinId < pExtW->pWins->size; pExtW->outputWinId += 1) { + for (; pExtW->outputWinId < pExtW->pWins->size; ++pExtW->outputWinId) { SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->outputWinId); int32_t winIdx = pWin->winOutIdx; if (winIdx < 0) { @@ -1817,6 +1817,7 @@ static int32_t extWinAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pBlock, pRow->winIdx, pRow->numOfRows)); if (pBlock->info.rows >= pOperator->resultInfo.threshold) { + ++pExtW->outputWinId; break; } } diff --git a/test/cases/18-StreamProcessing/02-Stream/stream_fetch.py b/test/cases/18-StreamProcessing/02-Stream/stream_fetch.py new file mode 100644 index 000000000000..4a60e44928da --- /dev/null +++ b/test/cases/18-StreamProcessing/02-Stream/stream_fetch.py @@ -0,0 +1,156 @@ +import time +import math +import random +from new_test_framework.utils import tdLog, tdSql, tdStream, streamUtil,StreamTableType, StreamTable, cluster +from random import randint +import os +import subprocess + +class TestStreamFetch: + def setup_class(cls): + tdLog.debug(f"start to execute {__file__}") + + def test_stream_fetch(self): + """Stream fetch data from runner + + 1. Check stream interval result + + + Since: v3.3.8.11 + + Labels: common,ci + + Jira: None + + History: + - 2026-01-06 Stephen Jin Created + + """ + + + self.prepareData() + # create the interval stream + self.createIntervalStream() + # check the interval stream result + self.checkIntervalResult() + self.prepareCountData() + self.createCountStream() + self.checkCountResult() + + def prepareData(self): + tdLog.info(f"prepare data") + + sqls = [ + "alter dnode 1 'debugflag 135';", + "create snode on dnode 1;", + "create database db vgroups 4;", + "create table db.meters (ts timestamp, current int) tags(`groupid` int);" + ] + + ts = 1767196800000 # int(time.time())*1000 + for t in range(0, 4): + sqls.append(f"create table db.d{t} using db.meters tags({t})") + for i in range(0, 12000000, 1000): + sqls.append(f"insert into db.d{t} values({ts+i},{i})") + + tdSql.executes(sqls) + tdLog.info(f"create successfully.") + + def createIntervalStream(self): + tdLog.info(f"create stream:") + sql = ( + f"create stream db.meters_stream_interval_3 interval(3s) sliding(3s) from db.meters partition by tbname ,groupid stream_options (fill_history ('2026-01-01 00:00:00') |event_type (window_close) |force_output |pre_filter (tbname in ('d0','d1','d2'))) into db.stream_meters_interval output_subtable (concat('run_s_3#', tbname)) tags (point_name varchar(128) as '全椒2#窑运行状态', point_id varchar(128) as 'run_state_2#') as select _twend as ts, last(case when tbname = 'd0' then current end) as kilnrun, last(case when tbname = 'd1' then current end) as flowfeedback, last(case when tbname = 'd2' then current end) as fanrun, cast(case when last(case when tbname = 'd0' then current end) = 1.0 and last(case when tbname = 'd1' then current end) > 120.0 and last(case when tbname = 'd2' then current end) = 1.0 then 1 else 0 end as int) as run_state from db.meters where tbname in ('d0','d1','d2') and ts >= _twstart and ts < _twend;" + ) + + tdLog.info(f"create stream:{sql}") + + try: + tdSql.execute(sql) + except Exception as e: + if "No stream available snode now" not in str(e): + raise Exception(f" user cant create stream no snode ,but create success") + + while True: + tdSql.query(f"select status from information_schema.ins_streams") + if tdSql.getData(0,0) == "Running": + tdLog.info("Stream is running!") + break + + tdLog.debug(f"current stream status: {tdSql.getData(0,0)}") + time.sleep(1) + + def checkIntervalResult(self): + tdLog.info(f"checkIntervalResult start") + + while True: + tdSql.query(f"select count(*) from db.`run_s_3#d0`") + if tdSql.getData(0,0) == 4000: + tdLog.info(f"get {tdSql.getData(0,0)} rows") + break + + tdLog.debug(f"current row count: {tdSql.getData(0,0)}") + time.sleep(1) + + tdSql.query(f"select * from db.`run_s_3#d0` order by ts limit 3065,10") + tdSql.checkData(7, 1, 9218000) + + tdLog.info(f"check stream result successfully.") + + def prepareCountData(self): + tdLog.info(f"prepare count data") + + sqls = [ + "alter dnode 1 'debugflag 135';", + "drop database db", + "create database db vgroups 4;", + "create table db.meters (ts timestamp, current int) tags(`groupid` int);" + ] + + ts = 1767196800000 # int(time.time())*1000 + for t in range(0, 4): + sqls.append(f"create table db.d{t} using db.meters tags({t})") + for i in range(0, 4000000, 1000): + sqls.append(f"insert into db.d{t} values({ts+i},{i})") + + tdSql.executes(sqls) + tdLog.info(f"create successfully.") + + def createCountStream(self): + tdLog.info(f"create stream:") + sql = ( + f"create stream db.meters_stream_count count_window(2,1) from db.meters partition by tbname ,groupid stream_options (fill_history ('2026-01-01 00:00:00') |low_latency_calc|watermark(10s)) into db.stream_meters_count output_subtable (concat('run_c_3#', tbname)) tags (groupid int as groupid) as select _twstart as ts, last(current) as now_batch, first(current) as previous_batch, _twstart as starttime, _twend as endtime from %%tbname where ts >= _twstart and ts<=_twend " + ) + + tdLog.info(f"create stream:{sql}") + + try: + tdSql.execute(sql) + except Exception as e: + if "No stream available snode now" not in str(e): + raise Exception(f" user cant create stream no snode ,but create success") + + while True: + tdSql.query(f"select status from information_schema.ins_streams") + if tdSql.getData(0,0) == "Running": + tdLog.info("Stream is running!") + break + + tdLog.debug(f"current stream status: {tdSql.getData(0,0)}") + time.sleep(1) + + def checkCountResult(self): + tdLog.info(f"checkIntervalResult start") + + while True: + tdSql.query(f"select count(*) from db.`run_c_3#d0`") + if tdSql.getData(0,0) == 3999: + tdLog.info(f"get {tdSql.getData(0,0)} rows") + break + + tdLog.debug(f"current row count: {tdSql.getData(0,0)}") + time.sleep(1) + + tdSql.query(f"select * from db.`run_c_3#d0` order by ts limit 3065,10") + tdSql.checkData(7, 1, 3073000) + + tdLog.info(f"check stream result successfully.") diff --git a/test/ci/cases.task b/test/ci/cases.task index 14cb149ac9f4..cd5aeb2d2b91 100644 --- a/test/ci/cases.task +++ b/test/ci/cases.task @@ -654,6 +654,7 @@ ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/test_stream_no_snode.py ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/test_stream_same_name.py ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/test_stream_output_table.py +,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/stream_fetch.py ## 03-TriggerMode #,,n,.,pytest cases/18-StreamProcessing/03-TriggerMode/test_count_new.py ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/03-TriggerMode/test_count.py