summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/result.json63
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_hopping_window-GroupByHoppingWindowPolicyBad-default.txt_/err_file.out6
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_hopping_window-GroupByHoppingWindowTimeLimitBad-default.txt_/err_file.out6
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_watermarks-bad-watermarks-no-rd-default.txt_/err_file.out6
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/ast.txt54
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/plan.json106
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/ast.txt54
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/plan.json106
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/ast.txt54
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/plan.json106
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/ast.txt55
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/plan.json106
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/ast.txt55
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/plan.json106
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/ast.txt21
-rw-r--r--ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/plan.json93
-rw-r--r--ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowAllKeys.sql39
-rw-r--r--ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowPolicyBad.sqlx36
-rw-r--r--ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowSizeLimit.sql37
-rw-r--r--ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowTimeLimit.sql37
-rw-r--r--ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowTimeLimitBad.sqlx36
-rw-r--r--ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowWatermark.sql37
-rw-r--r--ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowWatermarkNoRd.sql37
-rw-r--r--ydb/tests/fq/streaming_optimize/suites/watermarks/bad-watermarks-no-rd.sqlx16
-rw-r--r--ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks-no-rd.sql16
-rw-r--r--ydb/tests/fq/streaming_optimize/test_sql_negative.py2
-rw-r--r--ydb/tests/fq/streaming_optimize/test_sql_streaming.py2
27 files changed, 1292 insertions, 0 deletions
diff --git a/ydb/tests/fq/streaming_optimize/canondata/result.json b/ydb/tests/fq/streaming_optimize/canondata/result.json
index bd426686daf..b02d10a09fb 100644
--- a/ydb/tests/fq/streaming_optimize/canondata/result.json
+++ b/ydb/tests/fq/streaming_optimize/canondata/result.json
@@ -1,4 +1,19 @@
{
+ "test_sql_negative.test[hopping_window-GroupByHoppingWindowPolicyBad-default.txt]": [
+ {
+ "uri": "file://test_sql_negative.test_hopping_window-GroupByHoppingWindowPolicyBad-default.txt_/err_file.out"
+ }
+ ],
+ "test_sql_negative.test[hopping_window-GroupByHoppingWindowTimeLimitBad-default.txt]": [
+ {
+ "uri": "file://test_sql_negative.test_hopping_window-GroupByHoppingWindowTimeLimitBad-default.txt_/err_file.out"
+ }
+ ],
+ "test_sql_negative.test[watermarks-bad-watermarks-no-rd-default.txt]": [
+ {
+ "uri": "file://test_sql_negative.test_watermarks-bad-watermarks-no-rd-default.txt_/err_file.out"
+ }
+ ],
"test_sql_negative.test[watermarks-bad_column-default.txt]": [
{
"uri": "file://test_sql_negative.test_watermarks-bad_column-default.txt_/err_file.out"
@@ -81,6 +96,14 @@
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindow-default.txt_/plan.json"
}
},
+ "test_sql_streaming.test[hopping_window-GroupByHoppingWindowAllKeys-default.txt]": {
+ "Ast": {
+ "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/ast.txt"
+ },
+ "Plan": {
+ "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/plan.json"
+ }
+ },
"test_sql_streaming.test[hopping_window-GroupByHoppingWindowByStringKey-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowByStringKey-default.txt_/ast.txt"
@@ -121,6 +144,14 @@
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowPercentile-default.txt_/plan.json"
}
},
+ "test_sql_streaming.test[hopping_window-GroupByHoppingWindowSizeLimit-default.txt]": {
+ "Ast": {
+ "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/ast.txt"
+ },
+ "Plan": {
+ "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/plan.json"
+ }
+ },
"test_sql_streaming.test[hopping_window-GroupByHoppingWindowTimeExtractorUnusedColumns-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeExtractorUnusedColumns-default.txt_/ast.txt"
@@ -129,6 +160,30 @@
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeExtractorUnusedColumns-default.txt_/plan.json"
}
},
+ "test_sql_streaming.test[hopping_window-GroupByHoppingWindowTimeLimit-default.txt]": {
+ "Ast": {
+ "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/ast.txt"
+ },
+ "Plan": {
+ "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/plan.json"
+ }
+ },
+ "test_sql_streaming.test[hopping_window-GroupByHoppingWindowWatermark-default.txt]": {
+ "Ast": {
+ "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/ast.txt"
+ },
+ "Plan": {
+ "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/plan.json"
+ }
+ },
+ "test_sql_streaming.test[hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt]": {
+ "Ast": {
+ "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/ast.txt"
+ },
+ "Plan": {
+ "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/plan.json"
+ }
+ },
"test_sql_streaming.test[pq-ReadTopic-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_pq-ReadTopic-default.txt_/ast.txt"
@@ -233,6 +288,14 @@
"uri": "file://test_sql_streaming.test_watermarks-watermarks-default.txt_/plan.json"
}
},
+ "test_sql_streaming.test[watermarks-watermarks-no-rd-default.txt]": {
+ "Ast": {
+ "uri": "file://test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/ast.txt"
+ },
+ "Plan": {
+ "uri": "file://test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/plan.json"
+ }
+ },
"test_sql_streaming.test[watermarks-watermarks_adjust-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_watermarks-watermarks_adjust-default.txt_/ast.txt"
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_hopping_window-GroupByHoppingWindowPolicyBad-default.txt_/err_file.out b/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_hopping_window-GroupByHoppingWindowPolicyBad-default.txt_/err_file.out
new file mode 100644
index 00000000000..478da564231
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_hopping_window-GroupByHoppingWindowPolicyBad-default.txt_/err_file.out
@@ -0,0 +1,6 @@
+Failed to execute query, invalid final status FAILED, issues:
+<main>: Error: Query failed with code ABORTED at ISOTIME
+ <main>: Error: Run query failed: Error
+ <main>: Error: Optimization, code: 1070
+ <main>:27:10: Error: HoppingWindow: EarlyPolicy: expected one of 'drop', 'adjust', 'close', but got foobar
+
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_hopping_window-GroupByHoppingWindowTimeLimitBad-default.txt_/err_file.out b/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_hopping_window-GroupByHoppingWindowTimeLimitBad-default.txt_/err_file.out
new file mode 100644
index 00000000000..9244747eaed
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_hopping_window-GroupByHoppingWindowTimeLimitBad-default.txt_/err_file.out
@@ -0,0 +1,6 @@
+Failed to execute query, invalid final status FAILED, issues:
+<main>: Error: Query failed with code ABORTED at ISOTIME
+ <main>: Error: Failed to parse query
+ <main>: Error: Parse Sql
+ <main>:31:25: Error: Invalid value "barfoo" for type Interval
+
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_watermarks-bad-watermarks-no-rd-default.txt_/err_file.out b/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_watermarks-bad-watermarks-no-rd-default.txt_/err_file.out
new file mode 100644
index 00000000000..022933aabd3
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_watermarks-bad-watermarks-no-rd-default.txt_/err_file.out
@@ -0,0 +1,6 @@
+Failed to execute query, invalid final status FAILED, issues:
+<main>: Error: Query failed with code ABORTED at ISOTIME
+ <main>: Error: Run query failed: Error
+ <main>: Error: Optimization, code: 1070
+ <main>:7:6: Error: Unrecognized watermark expression, flexible watermark expressions are only implemented in shared reading mode, please use WATERMARK = (SystemMetadata('write_time') - Interval('PT5S'))
+
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/ast.txt
new file mode 100644
index 00000000000..29eca805544
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/ast.txt
@@ -0,0 +1,54 @@
+(
+(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force"))
+(let $2 (DataSource '"dq" '"$all"))
+(let $3 (Configure! $1 $2 '"Attr" '"maxtasksperstage" '"2"))
+(let $4 (Configure! $3 $2 '"Attr" '"watermarksmode" '"default"))
+(let $5 (Configure! $4 $2 '"Attr" '"computeactortype" '"async"))
+(let $6 (Configure! $5 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client"))
+(let $7 (DataSource '"pq" '"pq"))
+(let $8 '('('"PartitionsCount" '"1")))
+(let $9 (DataType 'String))
+(let $10 '('"k" (OptionalType $9)))
+(let $11 (DataType 'Uint64))
+(let $12 (OptionalType $11))
+(let $13 '('"t" $12))
+(let $14 (StructType $10 $13 '('"v" $12)))
+(let $15 (PqTopic '"pq" '"local" '"test_topic_input" $8 '() $14))
+(let $16 '('"k" '"t" '"v"))
+(let $17 '('"Endpoint" '"<pq_pq_endpoint>"))
+(let $18 '('"SharedReading" '"1"))
+(let $19 '('"UseSsl" '"1"))
+(let $20 '('('"Consumer" '"test_client") $17 $18 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $19))
+(let $21 (SecureParam '"cluster:default_pq"))
+(let $22 (DqPqTopicSource $6 $15 $16 $20 $21 '"" $14 '""))
+(let $23 (DqStage '((DqSource $7 $22)) (lambda '($27) (block '(
+ (let $28 '('('"format" '"json_each_row") '('"formatSettings" '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX"))) '('"settings" '($18))))
+ (let $29 (DqSourceWideWrap $27 $7 $14 $28))
+ (return (NarrowMap $29 (lambda '($30 $31 $32) (AsStruct '('"k" $30) '('"t" $31) '('"v" $32)))))
+))) '('('"_logical_id" '0))))
+(let $24 (DataSink '"pq" '"pq"))
+(let $25 (PqTopic '"pq" '"local" '"test_topic_output" $8 '() (StructType '('"Data" $9))))
+(let $26 (DqPqTopicSink $25 '($17 $19) $21))
+(return (Commit! (DqQuery! $6 '((DqStage '((DqCnHashShuffle (TDqOutput $23 '0) '('"k"))) (lambda '($33) (block '(
+ (let $34 '('"strict"))
+ (let $35 (lambda '($46) $46))
+ (let $36 (MultiHoppingCore (FromFlow $33) (lambda '($37) (Member $37 '"k")) (lambda '($38) (FlatMap (Member (SafeCast $38 (StructType $13)) '"t") (lambda '($39) (block '(
+ (let $40 '($11 '"" '"1"))
+ (let $41 (CallableType '() '((OptionalType (DataType 'Timestamp))) $40))
+ (let $42 (Udf '"DateTime2.FromMilliseconds" (Void) (VoidType) '"" $41 (VoidType) '"" '('('"blocks") $34)))
+ (return (Apply $42 $39))
+ ))))) (Interval '"5000") (Interval '"10000") (Interval '"5000000") 'true (lambda '($43) (AsStruct '('Sum0 (Member $43 '"v")))) (lambda '($44 $45) (AsStruct '('Sum0 (AggrAdd (Member $44 '"v") (Member $45 'Sum0))))) $35 $35 (lambda '($47 $48) (AsStruct '('Sum0 (AggrAdd (Member $47 'Sum0) (Member $48 'Sum0))))) (lambda '($49 $50 $51) (AsStruct '('Sum0 (Member $50 'Sum0)) '('"group0" $51) '('"k" $49))) '"1" '"group0" (Uint64 '12345678) (Interval '"3600000000") (Uint32 '0) (Uint32 '"1")))
+ (return (FlatMap (ExtractMembers $36 '('Sum0 '"k")) (lambda '($52) (block '(
+ (let $53 (ResourceType '"Yson2.Node"))
+ (let $54 '($53 '"" '"1"))
+ (let $55 (CallableType '() '((DataType 'Yson)) $54))
+ (let $56 '($34))
+ (let $57 (Udf '"Yson2.SerializeText" (Void) (VoidType) '"" $55 (VoidType) '"" $56))
+ (let $58 (StructType $10 '('"sum" $12)))
+ (let $59 (TupleType (TupleType $58) (StructType) (TupleType)))
+ (let $60 (CallableType '() '($53) '($58)))
+ (let $61 (Udf '"Yson2.From" (Void) $59 '"" $60 (VoidType) '"" $56))
+ (return (Just (AsStruct '('"column0" (Apply $57 (Apply $61 (AsStruct '('"k" (Member $52 '"k")) '('"sum" (Member $52 'Sum0)))))))))
+ )))))
+))) '('('"_logical_id" '0)) '((DqSink '0 $24 $26))))) $24))
+)
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/plan.json
new file mode 100644
index 00000000000..48462f24e7c
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/plan.json
@@ -0,0 +1,106 @@
+{
+ "Detailed" : {
+ "Operations" : [
+ {
+ "Id" : 5,
+ "Name" : "DqStage",
+ "Streams" : {
+ "Program" : [
+ {
+ "Name" : "DqSourceWideWrap"
+ },
+ {
+ "Name" : "NarrowMap"
+ }
+ ]
+ }
+ },
+ {
+ "Id" : 3,
+ "Name" : "DqStage",
+ "Streams" : {
+ "Program" : [
+ {
+ "Name" : "FromFlow"
+ },
+ {
+ "Name" : "MultiHoppingCore"
+ },
+ {
+ "Name" : "ExtractMembers"
+ },
+ {
+ "Name" : "FlatMap"
+ }
+ ]
+ },
+ "DependsOn" : [
+ 5
+ ]
+ },
+ {
+ "Id" : 2,
+ "Name" : "DqQuery!",
+ "DependsOn" : [
+ 3
+ ]
+ },
+ {
+ "Id" : 1,
+ "Name" : "Commit!",
+ "DependsOn" : [
+ 2
+ ]
+ }
+ ],
+ "OperationRoot" : 1,
+ "Providers" : [ ],
+ "OperationStats" : {
+ "Commit!" : 1,
+ "DqQuery!" : 1,
+ "DqStage" : 2
+ }
+ },
+ "Basic" : {
+ "nodes" : [
+ {
+ "id" : 5,
+ "level" : 1,
+ "name" : "DqStage #5",
+ "type" : "op"
+ },
+ {
+ "id" : 3,
+ "level" : 2,
+ "name" : "DqStage #3",
+ "type" : "op"
+ },
+ {
+ "id" : 2,
+ "level" : 3,
+ "name" : "DqQuery!",
+ "type" : "op"
+ },
+ {
+ "id" : 1,
+ "level" : 4,
+ "name" : "Commit!",
+ "type" : "op"
+ }
+ ],
+ "links" : [
+ {
+ "source" : 5,
+ "target" : 3
+ },
+ {
+ "source" : 3,
+ "target" : 2
+ },
+ {
+ "source" : 2,
+ "target" : 1
+ }
+ ]
+ }
+} \ No newline at end of file
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/ast.txt
new file mode 100644
index 00000000000..1c0153bd55e
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/ast.txt
@@ -0,0 +1,54 @@
+(
+(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force"))
+(let $2 (DataSource '"dq" '"$all"))
+(let $3 (Configure! $1 $2 '"Attr" '"maxtasksperstage" '"2"))
+(let $4 (Configure! $3 $2 '"Attr" '"watermarksmode" '"default"))
+(let $5 (Configure! $4 $2 '"Attr" '"computeactortype" '"async"))
+(let $6 (Configure! $5 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client"))
+(let $7 (DataSource '"pq" '"pq"))
+(let $8 '('('"PartitionsCount" '"1")))
+(let $9 (DataType 'String))
+(let $10 '('"k" (OptionalType $9)))
+(let $11 (DataType 'Uint64))
+(let $12 (OptionalType $11))
+(let $13 '('"t" $12))
+(let $14 (StructType $10 $13 '('"v" $12)))
+(let $15 (PqTopic '"pq" '"local" '"test_topic_input" $8 '() $14))
+(let $16 '('"k" '"t" '"v"))
+(let $17 '('"Endpoint" '"<pq_pq_endpoint>"))
+(let $18 '('"SharedReading" '"1"))
+(let $19 '('"UseSsl" '"1"))
+(let $20 '('('"Consumer" '"test_client") $17 $18 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $19))
+(let $21 (SecureParam '"cluster:default_pq"))
+(let $22 (DqPqTopicSource $6 $15 $16 $20 $21 '"" $14 '""))
+(let $23 (DqStage '((DqSource $7 $22)) (lambda '($27) (block '(
+ (let $28 '('('"format" '"json_each_row") '('"formatSettings" '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX"))) '('"settings" '($18))))
+ (let $29 (DqSourceWideWrap $27 $7 $14 $28))
+ (return (NarrowMap $29 (lambda '($30 $31 $32) (AsStruct '('"k" $30) '('"t" $31) '('"v" $32)))))
+))) '('('"_logical_id" '0))))
+(let $24 (DataSink '"pq" '"pq"))
+(let $25 (PqTopic '"pq" '"local" '"test_topic_output" $8 '() (StructType '('"Data" $9))))
+(let $26 (DqPqTopicSink $25 '($17 $19) $21))
+(return (Commit! (DqQuery! $6 '((DqStage '((DqCnHashShuffle (TDqOutput $23 '0) '('"k"))) (lambda '($33) (block '(
+ (let $34 '('"strict"))
+ (let $35 (lambda '($46) $46))
+ (let $36 (MultiHoppingCore (FromFlow $33) (lambda '($37) (Member $37 '"k")) (lambda '($38) (FlatMap (Member (SafeCast $38 (StructType $13)) '"t") (lambda '($39) (block '(
+ (let $40 '($11 '"" '"1"))
+ (let $41 (CallableType '() '((OptionalType (DataType 'Timestamp))) $40))
+ (let $42 (Udf '"DateTime2.FromMilliseconds" (Void) (VoidType) '"" $41 (VoidType) '"" '('('"blocks") $34)))
+ (return (Apply $42 $39))
+ ))))) (Interval '"5000") (Interval '"10000") (Interval '"5000000") 'true (lambda '($43) (AsStruct '('Sum0 (Member $43 '"v")))) (lambda '($44 $45) (AsStruct '('Sum0 (AggrAdd (Member $44 '"v") (Member $45 'Sum0))))) $35 $35 (lambda '($47 $48) (AsStruct '('Sum0 (AggrAdd (Member $47 'Sum0) (Member $48 'Sum0))))) (lambda '($49 $50 $51) (AsStruct '('Sum0 (Member $50 'Sum0)) '('"group0" $51) '('"k" $49))) '"1" '"group0" (Uint64 '"18446744073709551615") (Void) (Void) (Uint32 '"1")))
+ (return (FlatMap (ExtractMembers $36 '('Sum0 '"k")) (lambda '($52) (block '(
+ (let $53 (ResourceType '"Yson2.Node"))
+ (let $54 '($53 '"" '"1"))
+ (let $55 (CallableType '() '((DataType 'Yson)) $54))
+ (let $56 '($34))
+ (let $57 (Udf '"Yson2.SerializeText" (Void) (VoidType) '"" $55 (VoidType) '"" $56))
+ (let $58 (StructType $10 '('"sum" $12)))
+ (let $59 (TupleType (TupleType $58) (StructType) (TupleType)))
+ (let $60 (CallableType '() '($53) '($58)))
+ (let $61 (Udf '"Yson2.From" (Void) $59 '"" $60 (VoidType) '"" $56))
+ (return (Just (AsStruct '('"column0" (Apply $57 (Apply $61 (AsStruct '('"k" (Member $52 '"k")) '('"sum" (Member $52 'Sum0)))))))))
+ )))))
+))) '('('"_logical_id" '0)) '((DqSink '0 $24 $26))))) $24))
+)
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/plan.json
new file mode 100644
index 00000000000..48462f24e7c
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/plan.json
@@ -0,0 +1,106 @@
+{
+ "Detailed" : {
+ "Operations" : [
+ {
+ "Id" : 5,
+ "Name" : "DqStage",
+ "Streams" : {
+ "Program" : [
+ {
+ "Name" : "DqSourceWideWrap"
+ },
+ {
+ "Name" : "NarrowMap"
+ }
+ ]
+ }
+ },
+ {
+ "Id" : 3,
+ "Name" : "DqStage",
+ "Streams" : {
+ "Program" : [
+ {
+ "Name" : "FromFlow"
+ },
+ {
+ "Name" : "MultiHoppingCore"
+ },
+ {
+ "Name" : "ExtractMembers"
+ },
+ {
+ "Name" : "FlatMap"
+ }
+ ]
+ },
+ "DependsOn" : [
+ 5
+ ]
+ },
+ {
+ "Id" : 2,
+ "Name" : "DqQuery!",
+ "DependsOn" : [
+ 3
+ ]
+ },
+ {
+ "Id" : 1,
+ "Name" : "Commit!",
+ "DependsOn" : [
+ 2
+ ]
+ }
+ ],
+ "OperationRoot" : 1,
+ "Providers" : [ ],
+ "OperationStats" : {
+ "Commit!" : 1,
+ "DqQuery!" : 1,
+ "DqStage" : 2
+ }
+ },
+ "Basic" : {
+ "nodes" : [
+ {
+ "id" : 5,
+ "level" : 1,
+ "name" : "DqStage #5",
+ "type" : "op"
+ },
+ {
+ "id" : 3,
+ "level" : 2,
+ "name" : "DqStage #3",
+ "type" : "op"
+ },
+ {
+ "id" : 2,
+ "level" : 3,
+ "name" : "DqQuery!",
+ "type" : "op"
+ },
+ {
+ "id" : 1,
+ "level" : 4,
+ "name" : "Commit!",
+ "type" : "op"
+ }
+ ],
+ "links" : [
+ {
+ "source" : 5,
+ "target" : 3
+ },
+ {
+ "source" : 3,
+ "target" : 2
+ },
+ {
+ "source" : 2,
+ "target" : 1
+ }
+ ]
+ }
+} \ No newline at end of file
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/ast.txt
new file mode 100644
index 00000000000..8a325fdd46c
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/ast.txt
@@ -0,0 +1,54 @@
+(
+(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force"))
+(let $2 (DataSource '"dq" '"$all"))
+(let $3 (Configure! $1 $2 '"Attr" '"maxtasksperstage" '"2"))
+(let $4 (Configure! $3 $2 '"Attr" '"watermarksmode" '"default"))
+(let $5 (Configure! $4 $2 '"Attr" '"computeactortype" '"async"))
+(let $6 (Configure! $5 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client"))
+(let $7 (DataSource '"pq" '"pq"))
+(let $8 '('('"PartitionsCount" '"1")))
+(let $9 (DataType 'String))
+(let $10 '('"k" (OptionalType $9)))
+(let $11 (DataType 'Uint64))
+(let $12 (OptionalType $11))
+(let $13 '('"t" $12))
+(let $14 (StructType $10 $13 '('"v" $12)))
+(let $15 (PqTopic '"pq" '"local" '"test_topic_input" $8 '() $14))
+(let $16 '('"k" '"t" '"v"))
+(let $17 '('"Endpoint" '"<pq_pq_endpoint>"))
+(let $18 '('"SharedReading" '"1"))
+(let $19 '('"UseSsl" '"1"))
+(let $20 '('('"Consumer" '"test_client") $17 $18 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $19))
+(let $21 (SecureParam '"cluster:default_pq"))
+(let $22 (DqPqTopicSource $6 $15 $16 $20 $21 '"" $14 '""))
+(let $23 (DqStage '((DqSource $7 $22)) (lambda '($27) (block '(
+ (let $28 '('('"format" '"json_each_row") '('"formatSettings" '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX"))) '('"settings" '($18))))
+ (let $29 (DqSourceWideWrap $27 $7 $14 $28))
+ (return (NarrowMap $29 (lambda '($30 $31 $32) (AsStruct '('"k" $30) '('"t" $31) '('"v" $32)))))
+))) '('('"_logical_id" '0))))
+(let $24 (DataSink '"pq" '"pq"))
+(let $25 (PqTopic '"pq" '"local" '"test_topic_output" $8 '() (StructType '('"Data" $9))))
+(let $26 (DqPqTopicSink $25 '($17 $19) $21))
+(return (Commit! (DqQuery! $6 '((DqStage '((DqCnHashShuffle (TDqOutput $23 '0) '('"k"))) (lambda '($33) (block '(
+ (let $34 '('"strict"))
+ (let $35 (lambda '($46) $46))
+ (let $36 (MultiHoppingCore (FromFlow $33) (lambda '($37) (Member $37 '"k")) (lambda '($38) (FlatMap (Member (SafeCast $38 (StructType $13)) '"t") (lambda '($39) (block '(
+ (let $40 '($11 '"" '"1"))
+ (let $41 (CallableType '() '((OptionalType (DataType 'Timestamp))) $40))
+ (let $42 (Udf '"DateTime2.FromMilliseconds" (Void) (VoidType) '"" $41 (VoidType) '"" '('('"blocks") $34)))
+ (return (Apply $42 $39))
+ ))))) (Interval '"5000") (Interval '"10000") (Interval '"5000000") 'true (lambda '($43) (AsStruct '('Sum0 (Member $43 '"v")))) (lambda '($44 $45) (AsStruct '('Sum0 (AggrAdd (Member $44 '"v") (Member $45 'Sum0))))) $35 $35 (lambda '($47 $48) (AsStruct '('Sum0 (AggrAdd (Member $47 'Sum0) (Member $48 'Sum0))))) (lambda '($49 $50 $51) (AsStruct '('Sum0 (Member $50 'Sum0)) '('"group0" $51) '('"k" $49))) '"1" '"group0" (Void) (Interval '"4291747199999999") (Uint32 '0) (Void)))
+ (return (FlatMap (ExtractMembers $36 '('Sum0 '"k")) (lambda '($52) (block '(
+ (let $53 (ResourceType '"Yson2.Node"))
+ (let $54 '($53 '"" '"1"))
+ (let $55 (CallableType '() '((DataType 'Yson)) $54))
+ (let $56 '($34))
+ (let $57 (Udf '"Yson2.SerializeText" (Void) (VoidType) '"" $55 (VoidType) '"" $56))
+ (let $58 (StructType $10 '('"sum" $12)))
+ (let $59 (TupleType (TupleType $58) (StructType) (TupleType)))
+ (let $60 (CallableType '() '($53) '($58)))
+ (let $61 (Udf '"Yson2.From" (Void) $59 '"" $60 (VoidType) '"" $56))
+ (return (Just (AsStruct '('"column0" (Apply $57 (Apply $61 (AsStruct '('"k" (Member $52 '"k")) '('"sum" (Member $52 'Sum0)))))))))
+ )))))
+))) '('('"_logical_id" '0)) '((DqSink '0 $24 $26))))) $24))
+)
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/plan.json
new file mode 100644
index 00000000000..48462f24e7c
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/plan.json
@@ -0,0 +1,106 @@
+{
+ "Detailed" : {
+ "Operations" : [
+ {
+ "Id" : 5,
+ "Name" : "DqStage",
+ "Streams" : {
+ "Program" : [
+ {
+ "Name" : "DqSourceWideWrap"
+ },
+ {
+ "Name" : "NarrowMap"
+ }
+ ]
+ }
+ },
+ {
+ "Id" : 3,
+ "Name" : "DqStage",
+ "Streams" : {
+ "Program" : [
+ {
+ "Name" : "FromFlow"
+ },
+ {
+ "Name" : "MultiHoppingCore"
+ },
+ {
+ "Name" : "ExtractMembers"
+ },
+ {
+ "Name" : "FlatMap"
+ }
+ ]
+ },
+ "DependsOn" : [
+ 5
+ ]
+ },
+ {
+ "Id" : 2,
+ "Name" : "DqQuery!",
+ "DependsOn" : [
+ 3
+ ]
+ },
+ {
+ "Id" : 1,
+ "Name" : "Commit!",
+ "DependsOn" : [
+ 2
+ ]
+ }
+ ],
+ "OperationRoot" : 1,
+ "Providers" : [ ],
+ "OperationStats" : {
+ "Commit!" : 1,
+ "DqQuery!" : 1,
+ "DqStage" : 2
+ }
+ },
+ "Basic" : {
+ "nodes" : [
+ {
+ "id" : 5,
+ "level" : 1,
+ "name" : "DqStage #5",
+ "type" : "op"
+ },
+ {
+ "id" : 3,
+ "level" : 2,
+ "name" : "DqStage #3",
+ "type" : "op"
+ },
+ {
+ "id" : 2,
+ "level" : 3,
+ "name" : "DqQuery!",
+ "type" : "op"
+ },
+ {
+ "id" : 1,
+ "level" : 4,
+ "name" : "Commit!",
+ "type" : "op"
+ }
+ ],
+ "links" : [
+ {
+ "source" : 5,
+ "target" : 3
+ },
+ {
+ "source" : 3,
+ "target" : 2
+ },
+ {
+ "source" : 2,
+ "target" : 1
+ }
+ ]
+ }
+} \ No newline at end of file
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/ast.txt
new file mode 100644
index 00000000000..d15debecefc
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/ast.txt
@@ -0,0 +1,55 @@
+(
+(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force"))
+(let $2 (DataSource '"dq" '"$all"))
+(let $3 (Configure! $1 $2 '"Attr" '"maxtasksperstage" '"2"))
+(let $4 (Configure! $3 $2 '"Attr" '"watermarksmode" '"default"))
+(let $5 (Configure! $4 $2 '"Attr" '"computeactortype" '"async"))
+(let $6 (Configure! $5 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client"))
+(let $7 (DataSource '"pq" '"pq"))
+(let $8 '('('"PartitionsCount" '"1")))
+(let $9 (DataType 'String))
+(let $10 '('"k" (OptionalType $9)))
+(let $11 (DataType 'Uint64))
+(let $12 (OptionalType $11))
+(let $13 '('"t" $12))
+(let $14 (StructType $10 $13 '('"v" $12)))
+(let $15 (PqTopic '"pq" '"local" '"test_topic_input" $8 '() $14))
+(let $16 '('"k" '"t" '"v"))
+(let $17 '('"Endpoint" '"<pq_pq_endpoint>"))
+(let $18 '('"SharedReading" '0))
+(let $19 '('"UseSsl" '"1"))
+(let $20 '('('"Consumer" '"test_client") $17 $18 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $19 '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"11000000") '('"WatermarksLateArrivalDelayUs" '"8000000") '('"WatermarksLateEventsPolicy" '"adjust")))
+(let $21 (SecureParam '"cluster:default_pq"))
+(let $22 (DqPqTopicSource $6 $15 $16 $20 $21 '"" $14 '"\x1A0\b\x03\x12\x19\x12\x17_yql_sys_tsp_write_time\x1A\x11\n\x0F\n\x02\b3\x12\t!\x00\x12z\x00\x00\x00\x00\x00"))
+(let $23 (DqStage '((DqSource $7 $22)) (lambda '($27) (block '(
+ (let $28 '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX") '('"watermarkgranularity" '"PT11S")))
+ (let $29 '('('"format" '"json_each_row") '('"formatSettings" $28) '('"settings" '($18))))
+ (let $30 (DqSourceWideWrap $27 $7 $14 $29))
+ (return (NarrowMap $30 (lambda '($31 $32 $33) (AsStruct '('"k" $31) '('"t" $32) '('"v" $33)))))
+))) '('('"_logical_id" '0))))
+(let $24 (DataSink '"pq" '"pq"))
+(let $25 (PqTopic '"pq" '"local" '"test_topic_output" $8 '() (StructType '('"Data" $9))))
+(let $26 (DqPqTopicSink $25 '($17 $19) $21))
+(return (Commit! (DqQuery! $6 '((DqStage '((DqCnHashShuffle (TDqOutput $23 '0) '('"k"))) (lambda '($34) (block '(
+ (let $35 '('"strict"))
+ (let $36 (lambda '($47) $47))
+ (let $37 (MultiHoppingCore (FromFlow $34) (lambda '($38) (Member $38 '"k")) (lambda '($39) (FlatMap (Member (SafeCast $39 (StructType $13)) '"t") (lambda '($40) (block '(
+ (let $41 '($11 '"" '"1"))
+ (let $42 (CallableType '() '((OptionalType (DataType 'Timestamp))) $41))
+ (let $43 (Udf '"DateTime2.FromMilliseconds" (Void) (VoidType) '"" $42 (VoidType) '"" '('('"blocks") $35)))
+ (return (Apply $43 $40))
+ ))))) (Interval '"5000") (Interval '"10000") (Interval '"5000000") 'true (lambda '($44) (AsStruct '('Sum0 (Member $44 '"v")))) (lambda '($45 $46) (AsStruct '('Sum0 (AggrAdd (Member $45 '"v") (Member $46 'Sum0))))) $36 $36 (lambda '($48 $49) (AsStruct '('Sum0 (AggrAdd (Member $48 'Sum0) (Member $49 'Sum0))))) (lambda '($50 $51 $52) (AsStruct '('Sum0 (Member $51 'Sum0)) '('"group0" $52) '('"k" $50))) '"1" '"group0"))
+ (return (FlatMap (ExtractMembers $37 '('Sum0 '"k")) (lambda '($53) (block '(
+ (let $54 (ResourceType '"Yson2.Node"))
+ (let $55 '($54 '"" '"1"))
+ (let $56 (CallableType '() '((DataType 'Yson)) $55))
+ (let $57 '($35))
+ (let $58 (Udf '"Yson2.SerializeText" (Void) (VoidType) '"" $56 (VoidType) '"" $57))
+ (let $59 (StructType $10 '('"sum" $12)))
+ (let $60 (TupleType (TupleType $59) (StructType) (TupleType)))
+ (let $61 (CallableType '() '($54) '($59)))
+ (let $62 (Udf '"Yson2.From" (Void) $60 '"" $61 (VoidType) '"" $57))
+ (return (Just (AsStruct '('"column0" (Apply $58 (Apply $62 (AsStruct '('"k" (Member $53 '"k")) '('"sum" (Member $53 'Sum0)))))))))
+ )))))
+))) '('('"_logical_id" '0)) '((DqSink '0 $24 $26))))) $24))
+)
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/plan.json
new file mode 100644
index 00000000000..48462f24e7c
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/plan.json
@@ -0,0 +1,106 @@
+{
+ "Detailed" : {
+ "Operations" : [
+ {
+ "Id" : 5,
+ "Name" : "DqStage",
+ "Streams" : {
+ "Program" : [
+ {
+ "Name" : "DqSourceWideWrap"
+ },
+ {
+ "Name" : "NarrowMap"
+ }
+ ]
+ }
+ },
+ {
+ "Id" : 3,
+ "Name" : "DqStage",
+ "Streams" : {
+ "Program" : [
+ {
+ "Name" : "FromFlow"
+ },
+ {
+ "Name" : "MultiHoppingCore"
+ },
+ {
+ "Name" : "ExtractMembers"
+ },
+ {
+ "Name" : "FlatMap"
+ }
+ ]
+ },
+ "DependsOn" : [
+ 5
+ ]
+ },
+ {
+ "Id" : 2,
+ "Name" : "DqQuery!",
+ "DependsOn" : [
+ 3
+ ]
+ },
+ {
+ "Id" : 1,
+ "Name" : "Commit!",
+ "DependsOn" : [
+ 2
+ ]
+ }
+ ],
+ "OperationRoot" : 1,
+ "Providers" : [ ],
+ "OperationStats" : {
+ "Commit!" : 1,
+ "DqQuery!" : 1,
+ "DqStage" : 2
+ }
+ },
+ "Basic" : {
+ "nodes" : [
+ {
+ "id" : 5,
+ "level" : 1,
+ "name" : "DqStage #5",
+ "type" : "op"
+ },
+ {
+ "id" : 3,
+ "level" : 2,
+ "name" : "DqStage #3",
+ "type" : "op"
+ },
+ {
+ "id" : 2,
+ "level" : 3,
+ "name" : "DqQuery!",
+ "type" : "op"
+ },
+ {
+ "id" : 1,
+ "level" : 4,
+ "name" : "Commit!",
+ "type" : "op"
+ }
+ ],
+ "links" : [
+ {
+ "source" : 5,
+ "target" : 3
+ },
+ {
+ "source" : 3,
+ "target" : 2
+ },
+ {
+ "source" : 2,
+ "target" : 1
+ }
+ ]
+ }
+} \ No newline at end of file
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/ast.txt
new file mode 100644
index 00000000000..5a34818a473
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/ast.txt
@@ -0,0 +1,55 @@
+(
+(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force"))
+(let $2 (DataSource '"dq" '"$all"))
+(let $3 (Configure! $1 $2 '"Attr" '"maxtasksperstage" '"2"))
+(let $4 (Configure! $3 $2 '"Attr" '"watermarksmode" '"default"))
+(let $5 (Configure! $4 $2 '"Attr" '"computeactortype" '"sync"))
+(let $6 (Configure! $5 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client"))
+(let $7 (DataSource '"pq" '"pq"))
+(let $8 '('('"PartitionsCount" '"1")))
+(let $9 (DataType 'String))
+(let $10 '('"k" (OptionalType $9)))
+(let $11 (DataType 'Uint64))
+(let $12 (OptionalType $11))
+(let $13 '('"t" $12))
+(let $14 (StructType $10 $13 '('"v" $12)))
+(let $15 (PqTopic '"pq" '"local" '"test_topic_input" $8 '() $14))
+(let $16 '('"k" '"t" '"v"))
+(let $17 '('"Endpoint" '"<pq_pq_endpoint>"))
+(let $18 '('"SharedReading" '0))
+(let $19 '('"UseSsl" '"1"))
+(let $20 '('('"Consumer" '"test_client") $17 $18 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $19 '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"11000000") '('"WatermarksLateArrivalDelayUs" '"8000000") '('"WatermarksLateEventsPolicy" '"adjust")))
+(let $21 (SecureParam '"cluster:default_pq"))
+(let $22 (DqPqTopicSource $6 $15 $16 $20 $21 '"" $14 '"\x1A0\b\x03\x12\x19\x12\x17_yql_sys_tsp_write_time\x1A\x11\n\x0F\n\x02\b3\x12\t!\x00\x12z\x00\x00\x00\x00\x00"))
+(let $23 (DqStage '((DqSource $7 $22)) (lambda '($27) (block '(
+ (let $28 '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX") '('"watermarkgranularity" '"PT11S")))
+ (let $29 '('('"format" '"json_each_row") '('"formatSettings" $28) '('"settings" '($18))))
+ (let $30 (DqSourceWideWrap $27 $7 $14 $29))
+ (return (NarrowMap $30 (lambda '($31 $32 $33) (AsStruct '('"k" $31) '('"t" $32) '('"v" $33)))))
+))) '('('"_logical_id" '0))))
+(let $24 (DataSink '"pq" '"pq"))
+(let $25 (PqTopic '"pq" '"local" '"test_topic_output" $8 '() (StructType '('"Data" $9))))
+(let $26 (DqPqTopicSink $25 '($17 $19) $21))
+(return (Commit! (DqQuery! $6 '((DqStage '((DqCnHashShuffle (TDqOutput $23 '0) '('"k"))) (lambda '($34) (block '(
+ (let $35 '('"strict"))
+ (let $36 (lambda '($47) $47))
+ (let $37 (MultiHoppingCore (FromFlow $34) (lambda '($38) (Member $38 '"k")) (lambda '($39) (FlatMap (Member (SafeCast $39 (StructType $13)) '"t") (lambda '($40) (block '(
+ (let $41 '($11 '"" '"1"))
+ (let $42 (CallableType '() '((OptionalType (DataType 'Timestamp))) $41))
+ (let $43 (Udf '"DateTime2.FromMilliseconds" (Void) (VoidType) '"" $42 (VoidType) '"" '('('"blocks") $35)))
+ (return (Apply $43 $40))
+ ))))) (Interval '"5000") (Interval '"10000") (Interval '"5000000") 'true (lambda '($44) (AsStruct '('Sum0 (Member $44 '"v")))) (lambda '($45 $46) (AsStruct '('Sum0 (AggrAdd (Member $45 '"v") (Member $46 'Sum0))))) $36 $36 (lambda '($48 $49) (AsStruct '('Sum0 (AggrAdd (Member $48 'Sum0) (Member $49 'Sum0))))) (lambda '($50 $51 $52) (AsStruct '('Sum0 (Member $51 'Sum0)) '('"group0" $52) '('"k" $50))) '"1" '"group0"))
+ (return (FlatMap (ExtractMembers $37 '('Sum0 '"k")) (lambda '($53) (block '(
+ (let $54 (ResourceType '"Yson2.Node"))
+ (let $55 '($54 '"" '"1"))
+ (let $56 (CallableType '() '((DataType 'Yson)) $55))
+ (let $57 '($35))
+ (let $58 (Udf '"Yson2.SerializeText" (Void) (VoidType) '"" $56 (VoidType) '"" $57))
+ (let $59 (StructType $10 '('"sum" $12)))
+ (let $60 (TupleType (TupleType $59) (StructType) (TupleType)))
+ (let $61 (CallableType '() '($54) '($59)))
+ (let $62 (Udf '"Yson2.From" (Void) $60 '"" $61 (VoidType) '"" $57))
+ (return (Just (AsStruct '('"column0" (Apply $58 (Apply $62 (AsStruct '('"k" (Member $53 '"k")) '('"sum" (Member $53 'Sum0)))))))))
+ )))))
+))) '('('"_logical_id" '0)) '((DqSink '0 $24 $26))))) $24))
+)
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/plan.json
new file mode 100644
index 00000000000..48462f24e7c
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/plan.json
@@ -0,0 +1,106 @@
+{
+ "Detailed" : {
+ "Operations" : [
+ {
+ "Id" : 5,
+ "Name" : "DqStage",
+ "Streams" : {
+ "Program" : [
+ {
+ "Name" : "DqSourceWideWrap"
+ },
+ {
+ "Name" : "NarrowMap"
+ }
+ ]
+ }
+ },
+ {
+ "Id" : 3,
+ "Name" : "DqStage",
+ "Streams" : {
+ "Program" : [
+ {
+ "Name" : "FromFlow"
+ },
+ {
+ "Name" : "MultiHoppingCore"
+ },
+ {
+ "Name" : "ExtractMembers"
+ },
+ {
+ "Name" : "FlatMap"
+ }
+ ]
+ },
+ "DependsOn" : [
+ 5
+ ]
+ },
+ {
+ "Id" : 2,
+ "Name" : "DqQuery!",
+ "DependsOn" : [
+ 3
+ ]
+ },
+ {
+ "Id" : 1,
+ "Name" : "Commit!",
+ "DependsOn" : [
+ 2
+ ]
+ }
+ ],
+ "OperationRoot" : 1,
+ "Providers" : [ ],
+ "OperationStats" : {
+ "Commit!" : 1,
+ "DqQuery!" : 1,
+ "DqStage" : 2
+ }
+ },
+ "Basic" : {
+ "nodes" : [
+ {
+ "id" : 5,
+ "level" : 1,
+ "name" : "DqStage #5",
+ "type" : "op"
+ },
+ {
+ "id" : 3,
+ "level" : 2,
+ "name" : "DqStage #3",
+ "type" : "op"
+ },
+ {
+ "id" : 2,
+ "level" : 3,
+ "name" : "DqQuery!",
+ "type" : "op"
+ },
+ {
+ "id" : 1,
+ "level" : 4,
+ "name" : "Commit!",
+ "type" : "op"
+ }
+ ],
+ "links" : [
+ {
+ "source" : 5,
+ "target" : 3
+ },
+ {
+ "source" : 3,
+ "target" : 2
+ },
+ {
+ "source" : 2,
+ "target" : 1
+ }
+ ]
+ }
+} \ No newline at end of file
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/ast.txt
new file mode 100644
index 00000000000..1f3b029631a
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/ast.txt
@@ -0,0 +1,21 @@
+(
+(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force"))
+(let $2 (Configure! $1 (DataSource '"dq" '"$all") '"Attr" '"watermarksmode" '"default"))
+(let $3 (Configure! $2 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client"))
+(let $4 (DataSink 'result))
+(let $5 (DataSource '"pq" '"pq"))
+(let $6 (StructType '('"ts" (DataType 'Timestamp))))
+(let $7 (PqTopic '"pq" '"local" '"test_topic_input" '('('"PartitionsCount" '"1")) '() $6))
+(let $8 '('"SharedReading" '"0"))
+(let $9 '('('"Consumer" '"test_client") '('"Endpoint" '"<pq_pq_endpoint>") $8 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") '('"UseSsl" '"1") '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"2000000") '('"WatermarksLateArrivalDelayUs" '"7000000") '('"WatermarksLateEventsPolicy" '"adjust") '('"WatermarksIdlePartitions" '"1") '('"WatermarksIdleTimeoutUs" '"3000000")))
+(let $10 (DqPqTopicSource world $7 '('"ts") $9 (SecureParam '"cluster:default_pq") '"" $6 '"\x1A0\b\x03\x12\x19\x12\x17_yql_sys_tsp_write_time\x1A\x11\n\x0F\n\x02\b3\x12\t!\xC0\xCFj\x00\x00\x00\x00\x00"))
+(let $11 (DqStage '((DqSource $5 $10)) (lambda '($14) (block '(
+ (let $15 '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX") '('"watermarkgranularity" '"PT2S") '('"watermarkidletimeout" '"PT3S")))
+ (let $16 '('('"format" '"json_each_row") '('"formatSettings" $15) '('"settings" '($8))))
+ (let $17 (DqSourceWideWrap $14 $5 $6 $16))
+ (return (NarrowMap $17 (lambda '($18) (AsStruct '('"ts" $18)))))
+))) '('('"_logical_id" '0))))
+(let $12 (DqStage '((DqCnUnionAll (TDqOutput $11 '"0"))) (lambda '($19) $19) '('('"_logical_id" '0))))
+(let $13 (ResPull! $3 $4 (Key) (DqCnResult (TDqOutput $12 '"0") '()) '('('type) '('autoref)) '"dq"))
+(return (Commit! (Commit! $13 $4) (DataSink '"pq" '"pq")))
+)
diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/plan.json
new file mode 100644
index 00000000000..a0919980b58
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/plan.json
@@ -0,0 +1,93 @@
+{
+ "Detailed" : {
+ "Operations" : [
+ {
+ "Id" : 10,
+ "Name" : "DqStage",
+ "Streams" : {
+ "Program" : [
+ {
+ "Name" : "DqSourceWideWrap"
+ },
+ {
+ "Name" : "NarrowMap"
+ }
+ ]
+ }
+ },
+ {
+ "Id" : 8,
+ "Name" : "DqStage",
+ "Streams" : {
+ "Program" : [ ]
+ },
+ "DependsOn" : [
+ 10
+ ]
+ },
+ {
+ "Id" : 3,
+ "Name" : "ResPull!",
+ "DependsOn" : [
+ 8
+ ]
+ },
+ {
+ "Id" : 1,
+ "Name" : "Commit!",
+ "DependsOn" : [
+ 3
+ ]
+ }
+ ],
+ "OperationRoot" : 1,
+ "Providers" : [ ],
+ "OperationStats" : {
+ "Commit!" : 1,
+ "DqStage" : 2,
+ "ResPull!" : 1
+ }
+ },
+ "Basic" : {
+ "nodes" : [
+ {
+ "id" : 10,
+ "level" : 1,
+ "name" : "DqStage #10",
+ "type" : "op"
+ },
+ {
+ "id" : 8,
+ "level" : 2,
+ "name" : "DqStage #8",
+ "type" : "op"
+ },
+ {
+ "id" : 3,
+ "level" : 3,
+ "name" : "ResPull!",
+ "type" : "op"
+ },
+ {
+ "id" : 1,
+ "level" : 4,
+ "name" : "Commit!",
+ "type" : "op"
+ }
+ ],
+ "links" : [
+ {
+ "source" : 10,
+ "target" : 8
+ },
+ {
+ "source" : 8,
+ "target" : 3
+ },
+ {
+ "source" : 3,
+ "target" : 1
+ }
+ ]
+ }
+} \ No newline at end of file
diff --git a/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowAllKeys.sql b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowAllKeys.sql
new file mode 100644
index 00000000000..e24a0b8bf3c
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowAllKeys.sql
@@ -0,0 +1,39 @@
+/* syntax version 1 */
+/* dq can not */
+
+PRAGMA dq.MaxTasksPerStage="2";
+PRAGMA dq.WatermarksMode="default";
+PRAGMA dq.ComputeActorType="async";
+
+PRAGMA pq.Consumer="test_client";
+
+$input =
+ SELECT
+ *
+ FROM pq.test_topic_input
+ WITH (
+ FORMAT=json_each_row,
+ SCHEMA(
+ t Uint64,
+ k String,
+ v Uint64
+ )
+ );
+
+$output =
+ SELECT
+ k,
+ Sum(v) AS sum
+ FROM $input
+ GROUP BY
+ k,
+ HoppingWindow(DateTime::FromMilliseconds(t), "PT0.005S", "PT0.01S"
+ , "PT1H" AS TimeLimit
+ , 12345678 AS SizeLimit
+ , "drop" AS EarlyPolicy
+ , "adjust" AS LatePolicy
+ );
+
+INSERT INTO pq.test_topic_output
+SELECT Yson::SerializeText(Yson::From(TableRow()))
+FROM $output;
diff --git a/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowPolicyBad.sqlx b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowPolicyBad.sqlx
new file mode 100644
index 00000000000..9ab15dfce83
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowPolicyBad.sqlx
@@ -0,0 +1,36 @@
+/* syntax version 1 */
+/* dq can not */
+
+PRAGMA dq.MaxTasksPerStage="2";
+PRAGMA dq.WatermarksMode="default";
+PRAGMA dq.ComputeActorType="async";
+
+PRAGMA pq.Consumer="test_client";
+
+$input =
+ SELECT
+ *
+ FROM pq.test_topic_input
+ WITH (
+ FORMAT=json_each_row,
+ SCHEMA(
+ t Uint64,
+ k String,
+ v Uint64
+ )
+ );
+
+$output =
+ SELECT
+ k,
+ Sum(v) AS sum
+ FROM $input
+ GROUP BY
+ k,
+ HoppingWindow(DateTime::FromMilliseconds(t), "PT0.005S", "PT0.01S"
+ , "foobar" AS EarlyPolicy
+ );
+
+INSERT INTO pq.test_topic_output
+SELECT Yson::SerializeText(Yson::From(TableRow()))
+FROM $output;
diff --git a/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowSizeLimit.sql b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowSizeLimit.sql
new file mode 100644
index 00000000000..01b38ea55ad
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowSizeLimit.sql
@@ -0,0 +1,37 @@
+/* syntax version 1 */
+/* dq can not */
+
+PRAGMA dq.MaxTasksPerStage="2";
+PRAGMA dq.WatermarksMode="default";
+PRAGMA dq.ComputeActorType="async";
+
+PRAGMA pq.Consumer="test_client";
+
+$input =
+ SELECT
+ *
+ FROM pq.test_topic_input
+ WITH (
+ FORMAT=json_each_row,
+ SCHEMA(
+ t Uint64,
+ k String,
+ v Uint64
+ )
+ );
+
+$output =
+ SELECT
+ k,
+ Sum(v) AS sum
+ FROM $input
+ GROUP BY
+ k,
+ HoppingWindow(DateTime::FromMilliseconds(t), "PT0.005S", "PT0.01S"
+ , "adjust" AS LatePolicy
+ , "max" AS SizeLimit
+ );
+
+INSERT INTO pq.test_topic_output
+SELECT Yson::SerializeText(Yson::From(TableRow()))
+FROM $output;
diff --git a/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowTimeLimit.sql b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowTimeLimit.sql
new file mode 100644
index 00000000000..adbfe10fba7
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowTimeLimit.sql
@@ -0,0 +1,37 @@
+/* syntax version 1 */
+/* dq can not */
+
+PRAGMA dq.MaxTasksPerStage="2";
+PRAGMA dq.WatermarksMode="default";
+PRAGMA dq.ComputeActorType="async";
+
+PRAGMA pq.Consumer="test_client";
+
+$input =
+ SELECT
+ *
+ FROM pq.test_topic_input
+ WITH (
+ FORMAT=json_each_row,
+ SCHEMA(
+ t Uint64,
+ k String,
+ v Uint64
+ )
+ );
+
+$output =
+ SELECT
+ k,
+ Sum(v) AS sum
+ FROM $input
+ GROUP BY
+ k,
+ HoppingWindow(DateTime::FromMilliseconds(t), "PT0.005S", "PT0.01S"
+ , "drop" AS EarlyPolicy
+ , "max" AS TimeLimit
+ );
+
+INSERT INTO pq.test_topic_output
+SELECT Yson::SerializeText(Yson::From(TableRow()))
+FROM $output;
diff --git a/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowTimeLimitBad.sqlx b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowTimeLimitBad.sqlx
new file mode 100644
index 00000000000..8655c027ee2
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowTimeLimitBad.sqlx
@@ -0,0 +1,36 @@
+/* syntax version 1 */
+/* dq can not */
+
+PRAGMA dq.MaxTasksPerStage="2";
+PRAGMA dq.WatermarksMode="default";
+PRAGMA dq.ComputeActorType="async";
+
+PRAGMA pq.Consumer="test_client";
+
+$input =
+ SELECT
+ *
+ FROM pq.test_topic_input
+ WITH (
+ FORMAT=json_each_row,
+ SCHEMA(
+ t Uint64,
+ k String,
+ v Uint64
+ )
+ );
+
+$output =
+ SELECT
+ k,
+ Sum(v) AS sum
+ FROM $input
+ GROUP BY
+ k,
+ HoppingWindow(DateTime::FromMilliseconds(t), "PT0.005S", "PT0.01S"
+ , "barfoo" AS TimeLimit
+ );
+
+INSERT INTO pq.test_topic_output
+SELECT Yson::SerializeText(Yson::From(TableRow()))
+FROM $output;
diff --git a/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowWatermark.sql b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowWatermark.sql
new file mode 100644
index 00000000000..9811b944a8e
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowWatermark.sql
@@ -0,0 +1,37 @@
+/* syntax version 1 */
+/* dq can not */
+
+PRAGMA dq.MaxTasksPerStage="2";
+PRAGMA dq.WatermarksMode="default";
+PRAGMA dq.ComputeActorType="async";
+
+PRAGMA pq.Consumer="test_client";
+-- TAG: pq-no-shared
+
+$input =
+ SELECT
+ *
+ FROM pq.test_topic_input
+ WITH (
+ FORMAT=json_each_row,
+ SCHEMA(
+ t Uint64,
+ k String,
+ v Uint64
+ )
+ , WATERMARK AS (SystemMetadata("write_time") - Interval("PT8S"))
+ , WATERMARK_GRANULARITY = "PT11S"
+ );
+
+$output =
+ SELECT
+ k,
+ Sum(v) AS sum
+ FROM $input
+ GROUP BY
+ k,
+ HoppingWindow(DateTime::FromMilliseconds(t), "PT0.005S", "PT0.01S");
+
+INSERT INTO pq.test_topic_output
+SELECT Yson::SerializeText(Yson::From(TableRow()))
+FROM $output;
diff --git a/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowWatermarkNoRd.sql b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowWatermarkNoRd.sql
new file mode 100644
index 00000000000..8d8d2a3bbc1
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowWatermarkNoRd.sql
@@ -0,0 +1,37 @@
+/* syntax version 1 */
+/* dq can not */
+-- TAG: pq-no-shared
+
+PRAGMA dq.MaxTasksPerStage="2";
+PRAGMA dq.WatermarksMode="default";
+PRAGMA dq.ComputeActorType="sync";
+
+PRAGMA pq.Consumer="test_client";
+
+$input =
+ SELECT
+ *
+ FROM pq.test_topic_input
+ WITH (
+ FORMAT=json_each_row,
+ SCHEMA(
+ t Uint64,
+ k String,
+ v Uint64
+ )
+ , WATERMARK AS (SystemMetadata("write_time") - Interval("PT8S"))
+ , WATERMARK_GRANULARITY = "PT11S"
+ );
+
+$output =
+ SELECT
+ k,
+ Sum(v) AS sum
+ FROM $input
+ GROUP BY
+ k,
+ HoppingWindow(DateTime::FromMilliseconds(t), "PT0.005S", "PT0.01S");
+
+INSERT INTO pq.test_topic_output
+SELECT Yson::SerializeText(Yson::From(TableRow()))
+FROM $output;
diff --git a/ydb/tests/fq/streaming_optimize/suites/watermarks/bad-watermarks-no-rd.sqlx b/ydb/tests/fq/streaming_optimize/suites/watermarks/bad-watermarks-no-rd.sqlx
new file mode 100644
index 00000000000..646ca4b0942
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/suites/watermarks/bad-watermarks-no-rd.sqlx
@@ -0,0 +1,16 @@
+PRAGMA dq.WatermarksMode="default";
+PRAGMA pq.Consumer="test_client";
+-- TAG: pq-no-shared
+
+SELECT
+ *
+FROM pq.test_topic_input
+WITH(
+ FORMAT=json_each_row,
+ SCHEMA(
+ ts Timestamp NOT NULL
+ ),
+ WATERMARK = ts - Interval("PT7S"),
+ WATERMARK_GRANULARITY="PT2S",
+ WATERMARK_IDLE_TIMEOUT="PT3S"
+);
diff --git a/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks-no-rd.sql b/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks-no-rd.sql
new file mode 100644
index 00000000000..81d3d96116b
--- /dev/null
+++ b/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks-no-rd.sql
@@ -0,0 +1,16 @@
+PRAGMA dq.WatermarksMode="default";
+PRAGMA pq.Consumer="test_client";
+-- TAG: pq-no-shared
+
+SELECT
+ *
+FROM pq.test_topic_input
+WITH(
+ FORMAT=json_each_row,
+ SCHEMA(
+ ts Timestamp NOT NULL
+ ),
+ WATERMARK = SystemMetadata('write_time') - Interval("PT7S"),
+ WATERMARK_GRANULARITY="PT2S",
+ WATERMARK_IDLE_TIMEOUT="PT3S"
+);
diff --git a/ydb/tests/fq/streaming_optimize/test_sql_negative.py b/ydb/tests/fq/streaming_optimize/test_sql_negative.py
index 5fcd74e0088..9bfd714636d 100644
--- a/ydb/tests/fq/streaming_optimize/test_sql_negative.py
+++ b/ydb/tests/fq/streaming_optimize/test_sql_negative.py
@@ -51,6 +51,8 @@ def test(suite, case, cfg, tmpdir, fq_run):
with open(program_sql, encoding="utf-8") as f:
sql_query = f.read()
+ if sql_query.find('-- TAG: pq-no-shared\n') >= 0:
+ fq_run.replace_config(lambda config: config.replace('SharedReading: true', 'SharedReading: false'))
fq_run.add_query(sql_query)
result = fq_run.yql_exec(check_error=False, action="explain")
diff --git a/ydb/tests/fq/streaming_optimize/test_sql_streaming.py b/ydb/tests/fq/streaming_optimize/test_sql_streaming.py
index a2c0b304a8c..a8867bb0684 100644
--- a/ydb/tests/fq/streaming_optimize/test_sql_streaming.py
+++ b/ydb/tests/fq/streaming_optimize/test_sql_streaming.py
@@ -33,6 +33,8 @@ def test(suite, case, cfg, fq_run):
with open(program_sql, encoding="utf-8") as f:
sql_query = f.read()
+ if sql_query.find('-- TAG: pq-no-shared\n') >= 0:
+ fq_run.replace_config(lambda config: config.replace('SharedReading: true', 'SharedReading: false'))
fq_run.add_query(sql_query)
result = fq_run.yql_exec(action="explain")