diff options
author | yumkam <yumkam7@ydb.tech> | 2024-08-30 16:51:53 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-30 16:51:53 +0300 |
commit | e923efd87ac5cd87ded5e3eaa67af2ee0961d73f (patch) | |
tree | 3f4849f2f4ae97d8df486a0d70a59a90dafa67d5 | |
parent | 1b2baabb7d669e82cf0bd84b7b548509fd8831c4 (diff) | |
download | ydb-e923efd87ac5cd87ded5e3eaa67af2ee0961d73f.tar.gz |
fix streamlookup keys join on unmatched rows (#8422)
-rw-r--r-- | ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp | 2 | ||||
-rw-r--r-- | ydb/tests/fq/generic/test_streaming_join.py | 68 |
2 files changed, 59 insertions, 11 deletions
diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 646f491305..9f2f52f17b 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -111,7 +111,7 @@ private: //events outputRowItems[i] = wideInputRow[index]; break; case EOutputRowItemSource::LookupKey: - outputRowItems[i] = lookupKey.GetElement(index); + outputRowItems[i] = lookupPayload && *lookupPayload ? lookupKey.GetElement(index) : NUdf::TUnboxedValue {}; break; case EOutputRowItemSource::LookupOther: if (lookupPayload && *lookupPayload) { diff --git a/ydb/tests/fq/generic/test_streaming_join.py b/ydb/tests/fq/generic/test_streaming_join.py index 3840ceec3b..dc5e54d85a 100644 --- a/ydb/tests/fq/generic/test_streaming_join.py +++ b/ydb/tests/fq/generic/test_streaming_join.py @@ -10,6 +10,7 @@ from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase from ydb.tests.fq.generic.utils.settings import Settings +DEBUG = 0 TESTCASES = [ # 0 ( @@ -187,6 +188,7 @@ TESTCASES = [ $enriched = select e.id as id, $formatTime(DateTime::ParseIso8601(e.ts)) as ts, e.user as user_id, + u.id as uid, u.name as name, u.age as age from @@ -201,35 +203,80 @@ TESTCASES = [ [ ( '{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}', - '{"id":1,"ts":"11:33:44","user_id":2,"name":"Petr","age":25}', + '{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}', ), ( '{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}', - '{"id":2,"ts":"11:22:33","user_id":1,"name":"Anya","age":15}', + '{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}', ), ( '{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}', - '{"id":3,"ts":"11:33:55","user_id":100,"name":null,"age":null}', + '{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}', ), ( '{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}', - '{"id":4,"ts":"11:33:56","user_id":3,"name":"Masha","age":17}', + '{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}', ), ( '{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}', - '{"id":5,"ts":"11:33:57","user_id":3,"name":"Masha","age":17}', + '{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}', ), ( '{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}', - '{"id":6,"ts":"11:22:38","user_id":1,"name":"Anya","age":15}', + '{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}', ), ( '{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}', - '{"id":7,"ts":"11:33:49","user_id":2,"name":"Petr","age":25}', + '{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}', ), ] * 1000, ), + # 5 + ( + R''' + $input = SELECT * FROM myyds.`{input_topic}` + WITH ( + FORMAT=json_each_row, + SCHEMA ( + id Int32, + ts String, + ev_type String, + user Int32, + ) + ) ; + + $enriched = select e.id as id, + e.user as user_id, + u.id as uid + from + $input as e + left join {streamlookup} ydb_conn_{table_name}.`users` as u + on(e.user = u.id) + ; + + insert into myyds.`{output_topic}` + select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched; + ''', + [ + ( + '{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}', + '{"id":1,"uid":2,"user_id":2}', + ), + ( + '{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}', + '{"id":2,"uid":1,"user_id":1}', + ), + ( + '{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}', + '{"id":3,"uid":null,"user_id":100}', + ), + ( + '{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}', + '{"id":4,"uid":3,"user_id":3}', + ), + ], + ), ] @@ -324,9 +371,10 @@ class TestStreamingJoin(TestYdsBase): offset += 500 read_data = self.read_stream(len(messages)) - print(streamlookup, testcase, file=sys.stderr) - print(sql, file=sys.stderr) - print(*zip(messages, read_data), file=sys.stderr, sep="\n") + if DEBUG: + print(streamlookup, testcase, file=sys.stderr) + print(sql, file=sys.stderr) + print(*zip(messages, read_data), file=sys.stderr, sep="\n") for r, exp in zip(read_data, messages): r = json.loads(r) exp = json.loads(exp[1]) |