diff options
| author | Yuriy Kaminskiy <[email protected]> | 2026-03-06 13:13:15 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-03-06 13:13:15 +0300 |
| commit | a2feaa941f2122ad73b2550230cc3e107fc3c53b (patch) | |
| tree | 1fd36eaca5b12a0fbbe56e663a86527e13207540 | |
| parent | 870eb55d84ac05a43ec62657640817b860dadb01 (diff) | |
streaming_optimize: canonized ast for watermark and HoppingWindow (#33525)meta-1.0.4
27 files changed, 1292 insertions, 0 deletions
diff --git a/ydb/tests/fq/streaming_optimize/canondata/result.json b/ydb/tests/fq/streaming_optimize/canondata/result.json index bd426686daf..b02d10a09fb 100644 --- a/ydb/tests/fq/streaming_optimize/canondata/result.json +++ b/ydb/tests/fq/streaming_optimize/canondata/result.json @@ -1,4 +1,19 @@ { + "test_sql_negative.test[hopping_window-GroupByHoppingWindowPolicyBad-default.txt]": [ + { + "uri": "file://test_sql_negative.test_hopping_window-GroupByHoppingWindowPolicyBad-default.txt_/err_file.out" + } + ], + "test_sql_negative.test[hopping_window-GroupByHoppingWindowTimeLimitBad-default.txt]": [ + { + "uri": "file://test_sql_negative.test_hopping_window-GroupByHoppingWindowTimeLimitBad-default.txt_/err_file.out" + } + ], + "test_sql_negative.test[watermarks-bad-watermarks-no-rd-default.txt]": [ + { + "uri": "file://test_sql_negative.test_watermarks-bad-watermarks-no-rd-default.txt_/err_file.out" + } + ], "test_sql_negative.test[watermarks-bad_column-default.txt]": [ { "uri": "file://test_sql_negative.test_watermarks-bad_column-default.txt_/err_file.out" @@ -81,6 +96,14 @@ "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindow-default.txt_/plan.json" } }, + "test_sql_streaming.test[hopping_window-GroupByHoppingWindowAllKeys-default.txt]": { + "Ast": { + "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/ast.txt" + }, + "Plan": { + "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/plan.json" + } + }, "test_sql_streaming.test[hopping_window-GroupByHoppingWindowByStringKey-default.txt]": { "Ast": { "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowByStringKey-default.txt_/ast.txt" @@ -121,6 +144,14 @@ "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowPercentile-default.txt_/plan.json" } }, + "test_sql_streaming.test[hopping_window-GroupByHoppingWindowSizeLimit-default.txt]": { + "Ast": { + "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/ast.txt" + }, + "Plan": { + "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/plan.json" + } + }, "test_sql_streaming.test[hopping_window-GroupByHoppingWindowTimeExtractorUnusedColumns-default.txt]": { "Ast": { "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeExtractorUnusedColumns-default.txt_/ast.txt" @@ -129,6 +160,30 @@ "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeExtractorUnusedColumns-default.txt_/plan.json" } }, + "test_sql_streaming.test[hopping_window-GroupByHoppingWindowTimeLimit-default.txt]": { + "Ast": { + "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/ast.txt" + }, + "Plan": { + "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/plan.json" + } + }, + "test_sql_streaming.test[hopping_window-GroupByHoppingWindowWatermark-default.txt]": { + "Ast": { + "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/ast.txt" + }, + "Plan": { + "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/plan.json" + } + }, + "test_sql_streaming.test[hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt]": { + "Ast": { + "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/ast.txt" + }, + "Plan": { + "uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/plan.json" + } + }, "test_sql_streaming.test[pq-ReadTopic-default.txt]": { "Ast": { "uri": "file://test_sql_streaming.test_pq-ReadTopic-default.txt_/ast.txt" @@ -233,6 +288,14 @@ "uri": "file://test_sql_streaming.test_watermarks-watermarks-default.txt_/plan.json" } }, + "test_sql_streaming.test[watermarks-watermarks-no-rd-default.txt]": { + "Ast": { + "uri": "file://test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/ast.txt" + }, + "Plan": { + "uri": "file://test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/plan.json" + } + }, "test_sql_streaming.test[watermarks-watermarks_adjust-default.txt]": { "Ast": { "uri": "file://test_sql_streaming.test_watermarks-watermarks_adjust-default.txt_/ast.txt" diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_hopping_window-GroupByHoppingWindowPolicyBad-default.txt_/err_file.out b/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_hopping_window-GroupByHoppingWindowPolicyBad-default.txt_/err_file.out new file mode 100644 index 00000000000..478da564231 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_hopping_window-GroupByHoppingWindowPolicyBad-default.txt_/err_file.out @@ -0,0 +1,6 @@ +Failed to execute query, invalid final status FAILED, issues: +<main>: Error: Query failed with code ABORTED at ISOTIME + <main>: Error: Run query failed: Error + <main>: Error: Optimization, code: 1070 + <main>:27:10: Error: HoppingWindow: EarlyPolicy: expected one of 'drop', 'adjust', 'close', but got foobar + diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_hopping_window-GroupByHoppingWindowTimeLimitBad-default.txt_/err_file.out b/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_hopping_window-GroupByHoppingWindowTimeLimitBad-default.txt_/err_file.out new file mode 100644 index 00000000000..9244747eaed --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_hopping_window-GroupByHoppingWindowTimeLimitBad-default.txt_/err_file.out @@ -0,0 +1,6 @@ +Failed to execute query, invalid final status FAILED, issues: +<main>: Error: Query failed with code ABORTED at ISOTIME + <main>: Error: Failed to parse query + <main>: Error: Parse Sql + <main>:31:25: Error: Invalid value "barfoo" for type Interval + diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_watermarks-bad-watermarks-no-rd-default.txt_/err_file.out b/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_watermarks-bad-watermarks-no-rd-default.txt_/err_file.out new file mode 100644 index 00000000000..022933aabd3 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_negative.test_watermarks-bad-watermarks-no-rd-default.txt_/err_file.out @@ -0,0 +1,6 @@ +Failed to execute query, invalid final status FAILED, issues: +<main>: Error: Query failed with code ABORTED at ISOTIME + <main>: Error: Run query failed: Error + <main>: Error: Optimization, code: 1070 + <main>:7:6: Error: Unrecognized watermark expression, flexible watermark expressions are only implemented in shared reading mode, please use WATERMARK = (SystemMetadata('write_time') - Interval('PT5S')) + diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/ast.txt new file mode 100644 index 00000000000..29eca805544 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/ast.txt @@ -0,0 +1,54 @@ +( +(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force")) +(let $2 (DataSource '"dq" '"$all")) +(let $3 (Configure! $1 $2 '"Attr" '"maxtasksperstage" '"2")) +(let $4 (Configure! $3 $2 '"Attr" '"watermarksmode" '"default")) +(let $5 (Configure! $4 $2 '"Attr" '"computeactortype" '"async")) +(let $6 (Configure! $5 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client")) +(let $7 (DataSource '"pq" '"pq")) +(let $8 '('('"PartitionsCount" '"1"))) +(let $9 (DataType 'String)) +(let $10 '('"k" (OptionalType $9))) +(let $11 (DataType 'Uint64)) +(let $12 (OptionalType $11)) +(let $13 '('"t" $12)) +(let $14 (StructType $10 $13 '('"v" $12))) +(let $15 (PqTopic '"pq" '"local" '"test_topic_input" $8 '() $14)) +(let $16 '('"k" '"t" '"v")) +(let $17 '('"Endpoint" '"<pq_pq_endpoint>")) +(let $18 '('"SharedReading" '"1")) +(let $19 '('"UseSsl" '"1")) +(let $20 '('('"Consumer" '"test_client") $17 $18 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $19)) +(let $21 (SecureParam '"cluster:default_pq")) +(let $22 (DqPqTopicSource $6 $15 $16 $20 $21 '"" $14 '"")) +(let $23 (DqStage '((DqSource $7 $22)) (lambda '($27) (block '( + (let $28 '('('"format" '"json_each_row") '('"formatSettings" '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX"))) '('"settings" '($18)))) + (let $29 (DqSourceWideWrap $27 $7 $14 $28)) + (return (NarrowMap $29 (lambda '($30 $31 $32) (AsStruct '('"k" $30) '('"t" $31) '('"v" $32))))) +))) '('('"_logical_id" '0)))) +(let $24 (DataSink '"pq" '"pq")) +(let $25 (PqTopic '"pq" '"local" '"test_topic_output" $8 '() (StructType '('"Data" $9)))) +(let $26 (DqPqTopicSink $25 '($17 $19) $21)) +(return (Commit! (DqQuery! $6 '((DqStage '((DqCnHashShuffle (TDqOutput $23 '0) '('"k"))) (lambda '($33) (block '( + (let $34 '('"strict")) + (let $35 (lambda '($46) $46)) + (let $36 (MultiHoppingCore (FromFlow $33) (lambda '($37) (Member $37 '"k")) (lambda '($38) (FlatMap (Member (SafeCast $38 (StructType $13)) '"t") (lambda '($39) (block '( + (let $40 '($11 '"" '"1")) + (let $41 (CallableType '() '((OptionalType (DataType 'Timestamp))) $40)) + (let $42 (Udf '"DateTime2.FromMilliseconds" (Void) (VoidType) '"" $41 (VoidType) '"" '('('"blocks") $34))) + (return (Apply $42 $39)) + ))))) (Interval '"5000") (Interval '"10000") (Interval '"5000000") 'true (lambda '($43) (AsStruct '('Sum0 (Member $43 '"v")))) (lambda '($44 $45) (AsStruct '('Sum0 (AggrAdd (Member $44 '"v") (Member $45 'Sum0))))) $35 $35 (lambda '($47 $48) (AsStruct '('Sum0 (AggrAdd (Member $47 'Sum0) (Member $48 'Sum0))))) (lambda '($49 $50 $51) (AsStruct '('Sum0 (Member $50 'Sum0)) '('"group0" $51) '('"k" $49))) '"1" '"group0" (Uint64 '12345678) (Interval '"3600000000") (Uint32 '0) (Uint32 '"1"))) + (return (FlatMap (ExtractMembers $36 '('Sum0 '"k")) (lambda '($52) (block '( + (let $53 (ResourceType '"Yson2.Node")) + (let $54 '($53 '"" '"1")) + (let $55 (CallableType '() '((DataType 'Yson)) $54)) + (let $56 '($34)) + (let $57 (Udf '"Yson2.SerializeText" (Void) (VoidType) '"" $55 (VoidType) '"" $56)) + (let $58 (StructType $10 '('"sum" $12))) + (let $59 (TupleType (TupleType $58) (StructType) (TupleType))) + (let $60 (CallableType '() '($53) '($58))) + (let $61 (Udf '"Yson2.From" (Void) $59 '"" $60 (VoidType) '"" $56)) + (return (Just (AsStruct '('"column0" (Apply $57 (Apply $61 (AsStruct '('"k" (Member $52 '"k")) '('"sum" (Member $52 'Sum0))))))))) + ))))) +))) '('('"_logical_id" '0)) '((DqSink '0 $24 $26))))) $24)) +) diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/plan.json new file mode 100644 index 00000000000..48462f24e7c --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowAllKeys-default.txt_/plan.json @@ -0,0 +1,106 @@ +{ + "Detailed" : { + "Operations" : [ + { + "Id" : 5, + "Name" : "DqStage", + "Streams" : { + "Program" : [ + { + "Name" : "DqSourceWideWrap" + }, + { + "Name" : "NarrowMap" + } + ] + } + }, + { + "Id" : 3, + "Name" : "DqStage", + "Streams" : { + "Program" : [ + { + "Name" : "FromFlow" + }, + { + "Name" : "MultiHoppingCore" + }, + { + "Name" : "ExtractMembers" + }, + { + "Name" : "FlatMap" + } + ] + }, + "DependsOn" : [ + 5 + ] + }, + { + "Id" : 2, + "Name" : "DqQuery!", + "DependsOn" : [ + 3 + ] + }, + { + "Id" : 1, + "Name" : "Commit!", + "DependsOn" : [ + 2 + ] + } + ], + "OperationRoot" : 1, + "Providers" : [ ], + "OperationStats" : { + "Commit!" : 1, + "DqQuery!" : 1, + "DqStage" : 2 + } + }, + "Basic" : { + "nodes" : [ + { + "id" : 5, + "level" : 1, + "name" : "DqStage #5", + "type" : "op" + }, + { + "id" : 3, + "level" : 2, + "name" : "DqStage #3", + "type" : "op" + }, + { + "id" : 2, + "level" : 3, + "name" : "DqQuery!", + "type" : "op" + }, + { + "id" : 1, + "level" : 4, + "name" : "Commit!", + "type" : "op" + } + ], + "links" : [ + { + "source" : 5, + "target" : 3 + }, + { + "source" : 3, + "target" : 2 + }, + { + "source" : 2, + "target" : 1 + } + ] + } +}
\ No newline at end of file diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/ast.txt new file mode 100644 index 00000000000..1c0153bd55e --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/ast.txt @@ -0,0 +1,54 @@ +( +(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force")) +(let $2 (DataSource '"dq" '"$all")) +(let $3 (Configure! $1 $2 '"Attr" '"maxtasksperstage" '"2")) +(let $4 (Configure! $3 $2 '"Attr" '"watermarksmode" '"default")) +(let $5 (Configure! $4 $2 '"Attr" '"computeactortype" '"async")) +(let $6 (Configure! $5 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client")) +(let $7 (DataSource '"pq" '"pq")) +(let $8 '('('"PartitionsCount" '"1"))) +(let $9 (DataType 'String)) +(let $10 '('"k" (OptionalType $9))) +(let $11 (DataType 'Uint64)) +(let $12 (OptionalType $11)) +(let $13 '('"t" $12)) +(let $14 (StructType $10 $13 '('"v" $12))) +(let $15 (PqTopic '"pq" '"local" '"test_topic_input" $8 '() $14)) +(let $16 '('"k" '"t" '"v")) +(let $17 '('"Endpoint" '"<pq_pq_endpoint>")) +(let $18 '('"SharedReading" '"1")) +(let $19 '('"UseSsl" '"1")) +(let $20 '('('"Consumer" '"test_client") $17 $18 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $19)) +(let $21 (SecureParam '"cluster:default_pq")) +(let $22 (DqPqTopicSource $6 $15 $16 $20 $21 '"" $14 '"")) +(let $23 (DqStage '((DqSource $7 $22)) (lambda '($27) (block '( + (let $28 '('('"format" '"json_each_row") '('"formatSettings" '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX"))) '('"settings" '($18)))) + (let $29 (DqSourceWideWrap $27 $7 $14 $28)) + (return (NarrowMap $29 (lambda '($30 $31 $32) (AsStruct '('"k" $30) '('"t" $31) '('"v" $32))))) +))) '('('"_logical_id" '0)))) +(let $24 (DataSink '"pq" '"pq")) +(let $25 (PqTopic '"pq" '"local" '"test_topic_output" $8 '() (StructType '('"Data" $9)))) +(let $26 (DqPqTopicSink $25 '($17 $19) $21)) +(return (Commit! (DqQuery! $6 '((DqStage '((DqCnHashShuffle (TDqOutput $23 '0) '('"k"))) (lambda '($33) (block '( + (let $34 '('"strict")) + (let $35 (lambda '($46) $46)) + (let $36 (MultiHoppingCore (FromFlow $33) (lambda '($37) (Member $37 '"k")) (lambda '($38) (FlatMap (Member (SafeCast $38 (StructType $13)) '"t") (lambda '($39) (block '( + (let $40 '($11 '"" '"1")) + (let $41 (CallableType '() '((OptionalType (DataType 'Timestamp))) $40)) + (let $42 (Udf '"DateTime2.FromMilliseconds" (Void) (VoidType) '"" $41 (VoidType) '"" '('('"blocks") $34))) + (return (Apply $42 $39)) + ))))) (Interval '"5000") (Interval '"10000") (Interval '"5000000") 'true (lambda '($43) (AsStruct '('Sum0 (Member $43 '"v")))) (lambda '($44 $45) (AsStruct '('Sum0 (AggrAdd (Member $44 '"v") (Member $45 'Sum0))))) $35 $35 (lambda '($47 $48) (AsStruct '('Sum0 (AggrAdd (Member $47 'Sum0) (Member $48 'Sum0))))) (lambda '($49 $50 $51) (AsStruct '('Sum0 (Member $50 'Sum0)) '('"group0" $51) '('"k" $49))) '"1" '"group0" (Uint64 '"18446744073709551615") (Void) (Void) (Uint32 '"1"))) + (return (FlatMap (ExtractMembers $36 '('Sum0 '"k")) (lambda '($52) (block '( + (let $53 (ResourceType '"Yson2.Node")) + (let $54 '($53 '"" '"1")) + (let $55 (CallableType '() '((DataType 'Yson)) $54)) + (let $56 '($34)) + (let $57 (Udf '"Yson2.SerializeText" (Void) (VoidType) '"" $55 (VoidType) '"" $56)) + (let $58 (StructType $10 '('"sum" $12))) + (let $59 (TupleType (TupleType $58) (StructType) (TupleType))) + (let $60 (CallableType '() '($53) '($58))) + (let $61 (Udf '"Yson2.From" (Void) $59 '"" $60 (VoidType) '"" $56)) + (return (Just (AsStruct '('"column0" (Apply $57 (Apply $61 (AsStruct '('"k" (Member $52 '"k")) '('"sum" (Member $52 'Sum0))))))))) + ))))) +))) '('('"_logical_id" '0)) '((DqSink '0 $24 $26))))) $24)) +) diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/plan.json new file mode 100644 index 00000000000..48462f24e7c --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowSizeLimit-default.txt_/plan.json @@ -0,0 +1,106 @@ +{ + "Detailed" : { + "Operations" : [ + { + "Id" : 5, + "Name" : "DqStage", + "Streams" : { + "Program" : [ + { + "Name" : "DqSourceWideWrap" + }, + { + "Name" : "NarrowMap" + } + ] + } + }, + { + "Id" : 3, + "Name" : "DqStage", + "Streams" : { + "Program" : [ + { + "Name" : "FromFlow" + }, + { + "Name" : "MultiHoppingCore" + }, + { + "Name" : "ExtractMembers" + }, + { + "Name" : "FlatMap" + } + ] + }, + "DependsOn" : [ + 5 + ] + }, + { + "Id" : 2, + "Name" : "DqQuery!", + "DependsOn" : [ + 3 + ] + }, + { + "Id" : 1, + "Name" : "Commit!", + "DependsOn" : [ + 2 + ] + } + ], + "OperationRoot" : 1, + "Providers" : [ ], + "OperationStats" : { + "Commit!" : 1, + "DqQuery!" : 1, + "DqStage" : 2 + } + }, + "Basic" : { + "nodes" : [ + { + "id" : 5, + "level" : 1, + "name" : "DqStage #5", + "type" : "op" + }, + { + "id" : 3, + "level" : 2, + "name" : "DqStage #3", + "type" : "op" + }, + { + "id" : 2, + "level" : 3, + "name" : "DqQuery!", + "type" : "op" + }, + { + "id" : 1, + "level" : 4, + "name" : "Commit!", + "type" : "op" + } + ], + "links" : [ + { + "source" : 5, + "target" : 3 + }, + { + "source" : 3, + "target" : 2 + }, + { + "source" : 2, + "target" : 1 + } + ] + } +}
\ No newline at end of file diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/ast.txt new file mode 100644 index 00000000000..8a325fdd46c --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/ast.txt @@ -0,0 +1,54 @@ +( +(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force")) +(let $2 (DataSource '"dq" '"$all")) +(let $3 (Configure! $1 $2 '"Attr" '"maxtasksperstage" '"2")) +(let $4 (Configure! $3 $2 '"Attr" '"watermarksmode" '"default")) +(let $5 (Configure! $4 $2 '"Attr" '"computeactortype" '"async")) +(let $6 (Configure! $5 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client")) +(let $7 (DataSource '"pq" '"pq")) +(let $8 '('('"PartitionsCount" '"1"))) +(let $9 (DataType 'String)) +(let $10 '('"k" (OptionalType $9))) +(let $11 (DataType 'Uint64)) +(let $12 (OptionalType $11)) +(let $13 '('"t" $12)) +(let $14 (StructType $10 $13 '('"v" $12))) +(let $15 (PqTopic '"pq" '"local" '"test_topic_input" $8 '() $14)) +(let $16 '('"k" '"t" '"v")) +(let $17 '('"Endpoint" '"<pq_pq_endpoint>")) +(let $18 '('"SharedReading" '"1")) +(let $19 '('"UseSsl" '"1")) +(let $20 '('('"Consumer" '"test_client") $17 $18 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $19)) +(let $21 (SecureParam '"cluster:default_pq")) +(let $22 (DqPqTopicSource $6 $15 $16 $20 $21 '"" $14 '"")) +(let $23 (DqStage '((DqSource $7 $22)) (lambda '($27) (block '( + (let $28 '('('"format" '"json_each_row") '('"formatSettings" '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX"))) '('"settings" '($18)))) + (let $29 (DqSourceWideWrap $27 $7 $14 $28)) + (return (NarrowMap $29 (lambda '($30 $31 $32) (AsStruct '('"k" $30) '('"t" $31) '('"v" $32))))) +))) '('('"_logical_id" '0)))) +(let $24 (DataSink '"pq" '"pq")) +(let $25 (PqTopic '"pq" '"local" '"test_topic_output" $8 '() (StructType '('"Data" $9)))) +(let $26 (DqPqTopicSink $25 '($17 $19) $21)) +(return (Commit! (DqQuery! $6 '((DqStage '((DqCnHashShuffle (TDqOutput $23 '0) '('"k"))) (lambda '($33) (block '( + (let $34 '('"strict")) + (let $35 (lambda '($46) $46)) + (let $36 (MultiHoppingCore (FromFlow $33) (lambda '($37) (Member $37 '"k")) (lambda '($38) (FlatMap (Member (SafeCast $38 (StructType $13)) '"t") (lambda '($39) (block '( + (let $40 '($11 '"" '"1")) + (let $41 (CallableType '() '((OptionalType (DataType 'Timestamp))) $40)) + (let $42 (Udf '"DateTime2.FromMilliseconds" (Void) (VoidType) '"" $41 (VoidType) '"" '('('"blocks") $34))) + (return (Apply $42 $39)) + ))))) (Interval '"5000") (Interval '"10000") (Interval '"5000000") 'true (lambda '($43) (AsStruct '('Sum0 (Member $43 '"v")))) (lambda '($44 $45) (AsStruct '('Sum0 (AggrAdd (Member $44 '"v") (Member $45 'Sum0))))) $35 $35 (lambda '($47 $48) (AsStruct '('Sum0 (AggrAdd (Member $47 'Sum0) (Member $48 'Sum0))))) (lambda '($49 $50 $51) (AsStruct '('Sum0 (Member $50 'Sum0)) '('"group0" $51) '('"k" $49))) '"1" '"group0" (Void) (Interval '"4291747199999999") (Uint32 '0) (Void))) + (return (FlatMap (ExtractMembers $36 '('Sum0 '"k")) (lambda '($52) (block '( + (let $53 (ResourceType '"Yson2.Node")) + (let $54 '($53 '"" '"1")) + (let $55 (CallableType '() '((DataType 'Yson)) $54)) + (let $56 '($34)) + (let $57 (Udf '"Yson2.SerializeText" (Void) (VoidType) '"" $55 (VoidType) '"" $56)) + (let $58 (StructType $10 '('"sum" $12))) + (let $59 (TupleType (TupleType $58) (StructType) (TupleType))) + (let $60 (CallableType '() '($53) '($58))) + (let $61 (Udf '"Yson2.From" (Void) $59 '"" $60 (VoidType) '"" $56)) + (return (Just (AsStruct '('"column0" (Apply $57 (Apply $61 (AsStruct '('"k" (Member $52 '"k")) '('"sum" (Member $52 'Sum0))))))))) + ))))) +))) '('('"_logical_id" '0)) '((DqSink '0 $24 $26))))) $24)) +) diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/plan.json new file mode 100644 index 00000000000..48462f24e7c --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeLimit-default.txt_/plan.json @@ -0,0 +1,106 @@ +{ + "Detailed" : { + "Operations" : [ + { + "Id" : 5, + "Name" : "DqStage", + "Streams" : { + "Program" : [ + { + "Name" : "DqSourceWideWrap" + }, + { + "Name" : "NarrowMap" + } + ] + } + }, + { + "Id" : 3, + "Name" : "DqStage", + "Streams" : { + "Program" : [ + { + "Name" : "FromFlow" + }, + { + "Name" : "MultiHoppingCore" + }, + { + "Name" : "ExtractMembers" + }, + { + "Name" : "FlatMap" + } + ] + }, + "DependsOn" : [ + 5 + ] + }, + { + "Id" : 2, + "Name" : "DqQuery!", + "DependsOn" : [ + 3 + ] + }, + { + "Id" : 1, + "Name" : "Commit!", + "DependsOn" : [ + 2 + ] + } + ], + "OperationRoot" : 1, + "Providers" : [ ], + "OperationStats" : { + "Commit!" : 1, + "DqQuery!" : 1, + "DqStage" : 2 + } + }, + "Basic" : { + "nodes" : [ + { + "id" : 5, + "level" : 1, + "name" : "DqStage #5", + "type" : "op" + }, + { + "id" : 3, + "level" : 2, + "name" : "DqStage #3", + "type" : "op" + }, + { + "id" : 2, + "level" : 3, + "name" : "DqQuery!", + "type" : "op" + }, + { + "id" : 1, + "level" : 4, + "name" : "Commit!", + "type" : "op" + } + ], + "links" : [ + { + "source" : 5, + "target" : 3 + }, + { + "source" : 3, + "target" : 2 + }, + { + "source" : 2, + "target" : 1 + } + ] + } +}
\ No newline at end of file diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/ast.txt new file mode 100644 index 00000000000..d15debecefc --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/ast.txt @@ -0,0 +1,55 @@ +( +(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force")) +(let $2 (DataSource '"dq" '"$all")) +(let $3 (Configure! $1 $2 '"Attr" '"maxtasksperstage" '"2")) +(let $4 (Configure! $3 $2 '"Attr" '"watermarksmode" '"default")) +(let $5 (Configure! $4 $2 '"Attr" '"computeactortype" '"async")) +(let $6 (Configure! $5 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client")) +(let $7 (DataSource '"pq" '"pq")) +(let $8 '('('"PartitionsCount" '"1"))) +(let $9 (DataType 'String)) +(let $10 '('"k" (OptionalType $9))) +(let $11 (DataType 'Uint64)) +(let $12 (OptionalType $11)) +(let $13 '('"t" $12)) +(let $14 (StructType $10 $13 '('"v" $12))) +(let $15 (PqTopic '"pq" '"local" '"test_topic_input" $8 '() $14)) +(let $16 '('"k" '"t" '"v")) +(let $17 '('"Endpoint" '"<pq_pq_endpoint>")) +(let $18 '('"SharedReading" '0)) +(let $19 '('"UseSsl" '"1")) +(let $20 '('('"Consumer" '"test_client") $17 $18 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $19 '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"11000000") '('"WatermarksLateArrivalDelayUs" '"8000000") '('"WatermarksLateEventsPolicy" '"adjust"))) +(let $21 (SecureParam '"cluster:default_pq")) +(let $22 (DqPqTopicSource $6 $15 $16 $20 $21 '"" $14 '"\x1A0\b\x03\x12\x19\x12\x17_yql_sys_tsp_write_time\x1A\x11\n\x0F\n\x02\b3\x12\t!\x00\x12z\x00\x00\x00\x00\x00")) +(let $23 (DqStage '((DqSource $7 $22)) (lambda '($27) (block '( + (let $28 '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX") '('"watermarkgranularity" '"PT11S"))) + (let $29 '('('"format" '"json_each_row") '('"formatSettings" $28) '('"settings" '($18)))) + (let $30 (DqSourceWideWrap $27 $7 $14 $29)) + (return (NarrowMap $30 (lambda '($31 $32 $33) (AsStruct '('"k" $31) '('"t" $32) '('"v" $33))))) +))) '('('"_logical_id" '0)))) +(let $24 (DataSink '"pq" '"pq")) +(let $25 (PqTopic '"pq" '"local" '"test_topic_output" $8 '() (StructType '('"Data" $9)))) +(let $26 (DqPqTopicSink $25 '($17 $19) $21)) +(return (Commit! (DqQuery! $6 '((DqStage '((DqCnHashShuffle (TDqOutput $23 '0) '('"k"))) (lambda '($34) (block '( + (let $35 '('"strict")) + (let $36 (lambda '($47) $47)) + (let $37 (MultiHoppingCore (FromFlow $34) (lambda '($38) (Member $38 '"k")) (lambda '($39) (FlatMap (Member (SafeCast $39 (StructType $13)) '"t") (lambda '($40) (block '( + (let $41 '($11 '"" '"1")) + (let $42 (CallableType '() '((OptionalType (DataType 'Timestamp))) $41)) + (let $43 (Udf '"DateTime2.FromMilliseconds" (Void) (VoidType) '"" $42 (VoidType) '"" '('('"blocks") $35))) + (return (Apply $43 $40)) + ))))) (Interval '"5000") (Interval '"10000") (Interval '"5000000") 'true (lambda '($44) (AsStruct '('Sum0 (Member $44 '"v")))) (lambda '($45 $46) (AsStruct '('Sum0 (AggrAdd (Member $45 '"v") (Member $46 'Sum0))))) $36 $36 (lambda '($48 $49) (AsStruct '('Sum0 (AggrAdd (Member $48 'Sum0) (Member $49 'Sum0))))) (lambda '($50 $51 $52) (AsStruct '('Sum0 (Member $51 'Sum0)) '('"group0" $52) '('"k" $50))) '"1" '"group0")) + (return (FlatMap (ExtractMembers $37 '('Sum0 '"k")) (lambda '($53) (block '( + (let $54 (ResourceType '"Yson2.Node")) + (let $55 '($54 '"" '"1")) + (let $56 (CallableType '() '((DataType 'Yson)) $55)) + (let $57 '($35)) + (let $58 (Udf '"Yson2.SerializeText" (Void) (VoidType) '"" $56 (VoidType) '"" $57)) + (let $59 (StructType $10 '('"sum" $12))) + (let $60 (TupleType (TupleType $59) (StructType) (TupleType))) + (let $61 (CallableType '() '($54) '($59))) + (let $62 (Udf '"Yson2.From" (Void) $60 '"" $61 (VoidType) '"" $57)) + (return (Just (AsStruct '('"column0" (Apply $58 (Apply $62 (AsStruct '('"k" (Member $53 '"k")) '('"sum" (Member $53 'Sum0))))))))) + ))))) +))) '('('"_logical_id" '0)) '((DqSink '0 $24 $26))))) $24)) +) diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/plan.json new file mode 100644 index 00000000000..48462f24e7c --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermark-default.txt_/plan.json @@ -0,0 +1,106 @@ +{ + "Detailed" : { + "Operations" : [ + { + "Id" : 5, + "Name" : "DqStage", + "Streams" : { + "Program" : [ + { + "Name" : "DqSourceWideWrap" + }, + { + "Name" : "NarrowMap" + } + ] + } + }, + { + "Id" : 3, + "Name" : "DqStage", + "Streams" : { + "Program" : [ + { + "Name" : "FromFlow" + }, + { + "Name" : "MultiHoppingCore" + }, + { + "Name" : "ExtractMembers" + }, + { + "Name" : "FlatMap" + } + ] + }, + "DependsOn" : [ + 5 + ] + }, + { + "Id" : 2, + "Name" : "DqQuery!", + "DependsOn" : [ + 3 + ] + }, + { + "Id" : 1, + "Name" : "Commit!", + "DependsOn" : [ + 2 + ] + } + ], + "OperationRoot" : 1, + "Providers" : [ ], + "OperationStats" : { + "Commit!" : 1, + "DqQuery!" : 1, + "DqStage" : 2 + } + }, + "Basic" : { + "nodes" : [ + { + "id" : 5, + "level" : 1, + "name" : "DqStage #5", + "type" : "op" + }, + { + "id" : 3, + "level" : 2, + "name" : "DqStage #3", + "type" : "op" + }, + { + "id" : 2, + "level" : 3, + "name" : "DqQuery!", + "type" : "op" + }, + { + "id" : 1, + "level" : 4, + "name" : "Commit!", + "type" : "op" + } + ], + "links" : [ + { + "source" : 5, + "target" : 3 + }, + { + "source" : 3, + "target" : 2 + }, + { + "source" : 2, + "target" : 1 + } + ] + } +}
\ No newline at end of file diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/ast.txt new file mode 100644 index 00000000000..5a34818a473 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/ast.txt @@ -0,0 +1,55 @@ +( +(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force")) +(let $2 (DataSource '"dq" '"$all")) +(let $3 (Configure! $1 $2 '"Attr" '"maxtasksperstage" '"2")) +(let $4 (Configure! $3 $2 '"Attr" '"watermarksmode" '"default")) +(let $5 (Configure! $4 $2 '"Attr" '"computeactortype" '"sync")) +(let $6 (Configure! $5 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client")) +(let $7 (DataSource '"pq" '"pq")) +(let $8 '('('"PartitionsCount" '"1"))) +(let $9 (DataType 'String)) +(let $10 '('"k" (OptionalType $9))) +(let $11 (DataType 'Uint64)) +(let $12 (OptionalType $11)) +(let $13 '('"t" $12)) +(let $14 (StructType $10 $13 '('"v" $12))) +(let $15 (PqTopic '"pq" '"local" '"test_topic_input" $8 '() $14)) +(let $16 '('"k" '"t" '"v")) +(let $17 '('"Endpoint" '"<pq_pq_endpoint>")) +(let $18 '('"SharedReading" '0)) +(let $19 '('"UseSsl" '"1")) +(let $20 '('('"Consumer" '"test_client") $17 $18 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $19 '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"11000000") '('"WatermarksLateArrivalDelayUs" '"8000000") '('"WatermarksLateEventsPolicy" '"adjust"))) +(let $21 (SecureParam '"cluster:default_pq")) +(let $22 (DqPqTopicSource $6 $15 $16 $20 $21 '"" $14 '"\x1A0\b\x03\x12\x19\x12\x17_yql_sys_tsp_write_time\x1A\x11\n\x0F\n\x02\b3\x12\t!\x00\x12z\x00\x00\x00\x00\x00")) +(let $23 (DqStage '((DqSource $7 $22)) (lambda '($27) (block '( + (let $28 '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX") '('"watermarkgranularity" '"PT11S"))) + (let $29 '('('"format" '"json_each_row") '('"formatSettings" $28) '('"settings" '($18)))) + (let $30 (DqSourceWideWrap $27 $7 $14 $29)) + (return (NarrowMap $30 (lambda '($31 $32 $33) (AsStruct '('"k" $31) '('"t" $32) '('"v" $33))))) +))) '('('"_logical_id" '0)))) +(let $24 (DataSink '"pq" '"pq")) +(let $25 (PqTopic '"pq" '"local" '"test_topic_output" $8 '() (StructType '('"Data" $9)))) +(let $26 (DqPqTopicSink $25 '($17 $19) $21)) +(return (Commit! (DqQuery! $6 '((DqStage '((DqCnHashShuffle (TDqOutput $23 '0) '('"k"))) (lambda '($34) (block '( + (let $35 '('"strict")) + (let $36 (lambda '($47) $47)) + (let $37 (MultiHoppingCore (FromFlow $34) (lambda '($38) (Member $38 '"k")) (lambda '($39) (FlatMap (Member (SafeCast $39 (StructType $13)) '"t") (lambda '($40) (block '( + (let $41 '($11 '"" '"1")) + (let $42 (CallableType '() '((OptionalType (DataType 'Timestamp))) $41)) + (let $43 (Udf '"DateTime2.FromMilliseconds" (Void) (VoidType) '"" $42 (VoidType) '"" '('('"blocks") $35))) + (return (Apply $43 $40)) + ))))) (Interval '"5000") (Interval '"10000") (Interval '"5000000") 'true (lambda '($44) (AsStruct '('Sum0 (Member $44 '"v")))) (lambda '($45 $46) (AsStruct '('Sum0 (AggrAdd (Member $45 '"v") (Member $46 'Sum0))))) $36 $36 (lambda '($48 $49) (AsStruct '('Sum0 (AggrAdd (Member $48 'Sum0) (Member $49 'Sum0))))) (lambda '($50 $51 $52) (AsStruct '('Sum0 (Member $51 'Sum0)) '('"group0" $52) '('"k" $50))) '"1" '"group0")) + (return (FlatMap (ExtractMembers $37 '('Sum0 '"k")) (lambda '($53) (block '( + (let $54 (ResourceType '"Yson2.Node")) + (let $55 '($54 '"" '"1")) + (let $56 (CallableType '() '((DataType 'Yson)) $55)) + (let $57 '($35)) + (let $58 (Udf '"Yson2.SerializeText" (Void) (VoidType) '"" $56 (VoidType) '"" $57)) + (let $59 (StructType $10 '('"sum" $12))) + (let $60 (TupleType (TupleType $59) (StructType) (TupleType))) + (let $61 (CallableType '() '($54) '($59))) + (let $62 (Udf '"Yson2.From" (Void) $60 '"" $61 (VoidType) '"" $57)) + (return (Just (AsStruct '('"column0" (Apply $58 (Apply $62 (AsStruct '('"k" (Member $53 '"k")) '('"sum" (Member $53 'Sum0))))))))) + ))))) +))) '('('"_logical_id" '0)) '((DqSink '0 $24 $26))))) $24)) +) diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/plan.json new file mode 100644 index 00000000000..48462f24e7c --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowWatermarkNoRd-default.txt_/plan.json @@ -0,0 +1,106 @@ +{ + "Detailed" : { + "Operations" : [ + { + "Id" : 5, + "Name" : "DqStage", + "Streams" : { + "Program" : [ + { + "Name" : "DqSourceWideWrap" + }, + { + "Name" : "NarrowMap" + } + ] + } + }, + { + "Id" : 3, + "Name" : "DqStage", + "Streams" : { + "Program" : [ + { + "Name" : "FromFlow" + }, + { + "Name" : "MultiHoppingCore" + }, + { + "Name" : "ExtractMembers" + }, + { + "Name" : "FlatMap" + } + ] + }, + "DependsOn" : [ + 5 + ] + }, + { + "Id" : 2, + "Name" : "DqQuery!", + "DependsOn" : [ + 3 + ] + }, + { + "Id" : 1, + "Name" : "Commit!", + "DependsOn" : [ + 2 + ] + } + ], + "OperationRoot" : 1, + "Providers" : [ ], + "OperationStats" : { + "Commit!" : 1, + "DqQuery!" : 1, + "DqStage" : 2 + } + }, + "Basic" : { + "nodes" : [ + { + "id" : 5, + "level" : 1, + "name" : "DqStage #5", + "type" : "op" + }, + { + "id" : 3, + "level" : 2, + "name" : "DqStage #3", + "type" : "op" + }, + { + "id" : 2, + "level" : 3, + "name" : "DqQuery!", + "type" : "op" + }, + { + "id" : 1, + "level" : 4, + "name" : "Commit!", + "type" : "op" + } + ], + "links" : [ + { + "source" : 5, + "target" : 3 + }, + { + "source" : 3, + "target" : 2 + }, + { + "source" : 2, + "target" : 1 + } + ] + } +}
\ No newline at end of file diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/ast.txt new file mode 100644 index 00000000000..1f3b029631a --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/ast.txt @@ -0,0 +1,21 @@ +( +(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force")) +(let $2 (Configure! $1 (DataSource '"dq" '"$all") '"Attr" '"watermarksmode" '"default")) +(let $3 (Configure! $2 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client")) +(let $4 (DataSink 'result)) +(let $5 (DataSource '"pq" '"pq")) +(let $6 (StructType '('"ts" (DataType 'Timestamp)))) +(let $7 (PqTopic '"pq" '"local" '"test_topic_input" '('('"PartitionsCount" '"1")) '() $6)) +(let $8 '('"SharedReading" '"0")) +(let $9 '('('"Consumer" '"test_client") '('"Endpoint" '"<pq_pq_endpoint>") $8 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") '('"UseSsl" '"1") '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"2000000") '('"WatermarksLateArrivalDelayUs" '"7000000") '('"WatermarksLateEventsPolicy" '"adjust") '('"WatermarksIdlePartitions" '"1") '('"WatermarksIdleTimeoutUs" '"3000000"))) +(let $10 (DqPqTopicSource world $7 '('"ts") $9 (SecureParam '"cluster:default_pq") '"" $6 '"\x1A0\b\x03\x12\x19\x12\x17_yql_sys_tsp_write_time\x1A\x11\n\x0F\n\x02\b3\x12\t!\xC0\xCFj\x00\x00\x00\x00\x00")) +(let $11 (DqStage '((DqSource $5 $10)) (lambda '($14) (block '( + (let $15 '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX") '('"watermarkgranularity" '"PT2S") '('"watermarkidletimeout" '"PT3S"))) + (let $16 '('('"format" '"json_each_row") '('"formatSettings" $15) '('"settings" '($8)))) + (let $17 (DqSourceWideWrap $14 $5 $6 $16)) + (return (NarrowMap $17 (lambda '($18) (AsStruct '('"ts" $18))))) +))) '('('"_logical_id" '0)))) +(let $12 (DqStage '((DqCnUnionAll (TDqOutput $11 '"0"))) (lambda '($19) $19) '('('"_logical_id" '0)))) +(let $13 (ResPull! $3 $4 (Key) (DqCnResult (TDqOutput $12 '"0") '()) '('('type) '('autoref)) '"dq")) +(return (Commit! (Commit! $13 $4) (DataSink '"pq" '"pq"))) +) diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/plan.json new file mode 100644 index 00000000000..a0919980b58 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-no-rd-default.txt_/plan.json @@ -0,0 +1,93 @@ +{ + "Detailed" : { + "Operations" : [ + { + "Id" : 10, + "Name" : "DqStage", + "Streams" : { + "Program" : [ + { + "Name" : "DqSourceWideWrap" + }, + { + "Name" : "NarrowMap" + } + ] + } + }, + { + "Id" : 8, + "Name" : "DqStage", + "Streams" : { + "Program" : [ ] + }, + "DependsOn" : [ + 10 + ] + }, + { + "Id" : 3, + "Name" : "ResPull!", + "DependsOn" : [ + 8 + ] + }, + { + "Id" : 1, + "Name" : "Commit!", + "DependsOn" : [ + 3 + ] + } + ], + "OperationRoot" : 1, + "Providers" : [ ], + "OperationStats" : { + "Commit!" : 1, + "DqStage" : 2, + "ResPull!" : 1 + } + }, + "Basic" : { + "nodes" : [ + { + "id" : 10, + "level" : 1, + "name" : "DqStage #10", + "type" : "op" + }, + { + "id" : 8, + "level" : 2, + "name" : "DqStage #8", + "type" : "op" + }, + { + "id" : 3, + "level" : 3, + "name" : "ResPull!", + "type" : "op" + }, + { + "id" : 1, + "level" : 4, + "name" : "Commit!", + "type" : "op" + } + ], + "links" : [ + { + "source" : 10, + "target" : 8 + }, + { + "source" : 8, + "target" : 3 + }, + { + "source" : 3, + "target" : 1 + } + ] + } +}
\ No newline at end of file diff --git a/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowAllKeys.sql b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowAllKeys.sql new file mode 100644 index 00000000000..e24a0b8bf3c --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowAllKeys.sql @@ -0,0 +1,39 @@ +/* syntax version 1 */ +/* dq can not */ + +PRAGMA dq.MaxTasksPerStage="2"; +PRAGMA dq.WatermarksMode="default"; +PRAGMA dq.ComputeActorType="async"; + +PRAGMA pq.Consumer="test_client"; + +$input = + SELECT + * + FROM pq.test_topic_input + WITH ( + FORMAT=json_each_row, + SCHEMA( + t Uint64, + k String, + v Uint64 + ) + ); + +$output = + SELECT + k, + Sum(v) AS sum + FROM $input + GROUP BY + k, + HoppingWindow(DateTime::FromMilliseconds(t), "PT0.005S", "PT0.01S" + , "PT1H" AS TimeLimit + , 12345678 AS SizeLimit + , "drop" AS EarlyPolicy + , "adjust" AS LatePolicy + ); + +INSERT INTO pq.test_topic_output +SELECT Yson::SerializeText(Yson::From(TableRow())) +FROM $output; diff --git a/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowPolicyBad.sqlx b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowPolicyBad.sqlx new file mode 100644 index 00000000000..9ab15dfce83 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowPolicyBad.sqlx @@ -0,0 +1,36 @@ +/* syntax version 1 */ +/* dq can not */ + +PRAGMA dq.MaxTasksPerStage="2"; +PRAGMA dq.WatermarksMode="default"; +PRAGMA dq.ComputeActorType="async"; + +PRAGMA pq.Consumer="test_client"; + +$input = + SELECT + * + FROM pq.test_topic_input + WITH ( + FORMAT=json_each_row, + SCHEMA( + t Uint64, + k String, + v Uint64 + ) + ); + +$output = + SELECT + k, + Sum(v) AS sum + FROM $input + GROUP BY + k, + HoppingWindow(DateTime::FromMilliseconds(t), "PT0.005S", "PT0.01S" + , "foobar" AS EarlyPolicy + ); + +INSERT INTO pq.test_topic_output +SELECT Yson::SerializeText(Yson::From(TableRow())) +FROM $output; diff --git a/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowSizeLimit.sql b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowSizeLimit.sql new file mode 100644 index 00000000000..01b38ea55ad --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowSizeLimit.sql @@ -0,0 +1,37 @@ +/* syntax version 1 */ +/* dq can not */ + +PRAGMA dq.MaxTasksPerStage="2"; +PRAGMA dq.WatermarksMode="default"; +PRAGMA dq.ComputeActorType="async"; + +PRAGMA pq.Consumer="test_client"; + +$input = + SELECT + * + FROM pq.test_topic_input + WITH ( + FORMAT=json_each_row, + SCHEMA( + t Uint64, + k String, + v Uint64 + ) + ); + +$output = + SELECT + k, + Sum(v) AS sum + FROM $input + GROUP BY + k, + HoppingWindow(DateTime::FromMilliseconds(t), "PT0.005S", "PT0.01S" + , "adjust" AS LatePolicy + , "max" AS SizeLimit + ); + +INSERT INTO pq.test_topic_output +SELECT Yson::SerializeText(Yson::From(TableRow())) +FROM $output; diff --git a/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowTimeLimit.sql b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowTimeLimit.sql new file mode 100644 index 00000000000..adbfe10fba7 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowTimeLimit.sql @@ -0,0 +1,37 @@ +/* syntax version 1 */ +/* dq can not */ + +PRAGMA dq.MaxTasksPerStage="2"; +PRAGMA dq.WatermarksMode="default"; +PRAGMA dq.ComputeActorType="async"; + +PRAGMA pq.Consumer="test_client"; + +$input = + SELECT + * + FROM pq.test_topic_input + WITH ( + FORMAT=json_each_row, + SCHEMA( + t Uint64, + k String, + v Uint64 + ) + ); + +$output = + SELECT + k, + Sum(v) AS sum + FROM $input + GROUP BY + k, + HoppingWindow(DateTime::FromMilliseconds(t), "PT0.005S", "PT0.01S" + , "drop" AS EarlyPolicy + , "max" AS TimeLimit + ); + +INSERT INTO pq.test_topic_output +SELECT Yson::SerializeText(Yson::From(TableRow())) +FROM $output; diff --git a/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowTimeLimitBad.sqlx b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowTimeLimitBad.sqlx new file mode 100644 index 00000000000..8655c027ee2 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowTimeLimitBad.sqlx @@ -0,0 +1,36 @@ +/* syntax version 1 */ +/* dq can not */ + +PRAGMA dq.MaxTasksPerStage="2"; +PRAGMA dq.WatermarksMode="default"; +PRAGMA dq.ComputeActorType="async"; + +PRAGMA pq.Consumer="test_client"; + +$input = + SELECT + * + FROM pq.test_topic_input + WITH ( + FORMAT=json_each_row, + SCHEMA( + t Uint64, + k String, + v Uint64 + ) + ); + +$output = + SELECT + k, + Sum(v) AS sum + FROM $input + GROUP BY + k, + HoppingWindow(DateTime::FromMilliseconds(t), "PT0.005S", "PT0.01S" + , "barfoo" AS TimeLimit + ); + +INSERT INTO pq.test_topic_output +SELECT Yson::SerializeText(Yson::From(TableRow())) +FROM $output; diff --git a/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowWatermark.sql b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowWatermark.sql new file mode 100644 index 00000000000..9811b944a8e --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowWatermark.sql @@ -0,0 +1,37 @@ +/* syntax version 1 */ +/* dq can not */ + +PRAGMA dq.MaxTasksPerStage="2"; +PRAGMA dq.WatermarksMode="default"; +PRAGMA dq.ComputeActorType="async"; + +PRAGMA pq.Consumer="test_client"; +-- TAG: pq-no-shared + +$input = + SELECT + * + FROM pq.test_topic_input + WITH ( + FORMAT=json_each_row, + SCHEMA( + t Uint64, + k String, + v Uint64 + ) + , WATERMARK AS (SystemMetadata("write_time") - Interval("PT8S")) + , WATERMARK_GRANULARITY = "PT11S" + ); + +$output = + SELECT + k, + Sum(v) AS sum + FROM $input + GROUP BY + k, + HoppingWindow(DateTime::FromMilliseconds(t), "PT0.005S", "PT0.01S"); + +INSERT INTO pq.test_topic_output +SELECT Yson::SerializeText(Yson::From(TableRow())) +FROM $output; diff --git a/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowWatermarkNoRd.sql b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowWatermarkNoRd.sql new file mode 100644 index 00000000000..8d8d2a3bbc1 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/suites/hopping_window/GroupByHoppingWindowWatermarkNoRd.sql @@ -0,0 +1,37 @@ +/* syntax version 1 */ +/* dq can not */ +-- TAG: pq-no-shared + +PRAGMA dq.MaxTasksPerStage="2"; +PRAGMA dq.WatermarksMode="default"; +PRAGMA dq.ComputeActorType="sync"; + +PRAGMA pq.Consumer="test_client"; + +$input = + SELECT + * + FROM pq.test_topic_input + WITH ( + FORMAT=json_each_row, + SCHEMA( + t Uint64, + k String, + v Uint64 + ) + , WATERMARK AS (SystemMetadata("write_time") - Interval("PT8S")) + , WATERMARK_GRANULARITY = "PT11S" + ); + +$output = + SELECT + k, + Sum(v) AS sum + FROM $input + GROUP BY + k, + HoppingWindow(DateTime::FromMilliseconds(t), "PT0.005S", "PT0.01S"); + +INSERT INTO pq.test_topic_output +SELECT Yson::SerializeText(Yson::From(TableRow())) +FROM $output; diff --git a/ydb/tests/fq/streaming_optimize/suites/watermarks/bad-watermarks-no-rd.sqlx b/ydb/tests/fq/streaming_optimize/suites/watermarks/bad-watermarks-no-rd.sqlx new file mode 100644 index 00000000000..646ca4b0942 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/suites/watermarks/bad-watermarks-no-rd.sqlx @@ -0,0 +1,16 @@ +PRAGMA dq.WatermarksMode="default"; +PRAGMA pq.Consumer="test_client"; +-- TAG: pq-no-shared + +SELECT + * +FROM pq.test_topic_input +WITH( + FORMAT=json_each_row, + SCHEMA( + ts Timestamp NOT NULL + ), + WATERMARK = ts - Interval("PT7S"), + WATERMARK_GRANULARITY="PT2S", + WATERMARK_IDLE_TIMEOUT="PT3S" +); diff --git a/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks-no-rd.sql b/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks-no-rd.sql new file mode 100644 index 00000000000..81d3d96116b --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks-no-rd.sql @@ -0,0 +1,16 @@ +PRAGMA dq.WatermarksMode="default"; +PRAGMA pq.Consumer="test_client"; +-- TAG: pq-no-shared + +SELECT + * +FROM pq.test_topic_input +WITH( + FORMAT=json_each_row, + SCHEMA( + ts Timestamp NOT NULL + ), + WATERMARK = SystemMetadata('write_time') - Interval("PT7S"), + WATERMARK_GRANULARITY="PT2S", + WATERMARK_IDLE_TIMEOUT="PT3S" +); diff --git a/ydb/tests/fq/streaming_optimize/test_sql_negative.py b/ydb/tests/fq/streaming_optimize/test_sql_negative.py index 5fcd74e0088..9bfd714636d 100644 --- a/ydb/tests/fq/streaming_optimize/test_sql_negative.py +++ b/ydb/tests/fq/streaming_optimize/test_sql_negative.py @@ -51,6 +51,8 @@ def test(suite, case, cfg, tmpdir, fq_run): with open(program_sql, encoding="utf-8") as f: sql_query = f.read() + if sql_query.find('-- TAG: pq-no-shared\n') >= 0: + fq_run.replace_config(lambda config: config.replace('SharedReading: true', 'SharedReading: false')) fq_run.add_query(sql_query) result = fq_run.yql_exec(check_error=False, action="explain") diff --git a/ydb/tests/fq/streaming_optimize/test_sql_streaming.py b/ydb/tests/fq/streaming_optimize/test_sql_streaming.py index a2c0b304a8c..a8867bb0684 100644 --- a/ydb/tests/fq/streaming_optimize/test_sql_streaming.py +++ b/ydb/tests/fq/streaming_optimize/test_sql_streaming.py @@ -33,6 +33,8 @@ def test(suite, case, cfg, fq_run): with open(program_sql, encoding="utf-8") as f: sql_query = f.read() + if sql_query.find('-- TAG: pq-no-shared\n') >= 0: + fq_run.replace_config(lambda config: config.replace('SharedReading: true', 'SharedReading: false')) fq_run.add_query(sql_query) result = fq_run.yql_exec(action="explain") |
