aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryumkam <yumkam7@ydb.tech>2024-08-30 16:51:53 +0300
committerGitHub <noreply@github.com>2024-08-30 16:51:53 +0300
commite923efd87ac5cd87ded5e3eaa67af2ee0961d73f (patch)
tree3f4849f2f4ae97d8df486a0d70a59a90dafa67d5
parent1b2baabb7d669e82cf0bd84b7b548509fd8831c4 (diff)
downloadydb-e923efd87ac5cd87ded5e3eaa67af2ee0961d73f.tar.gz
fix streamlookup keys join on unmatched rows (#8422)
-rw-r--r--ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp2
-rw-r--r--ydb/tests/fq/generic/test_streaming_join.py68
2 files changed, 59 insertions, 11 deletions
diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp
index 646f491305..9f2f52f17b 100644
--- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp
+++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp
@@ -111,7 +111,7 @@ private: //events
outputRowItems[i] = wideInputRow[index];
break;
case EOutputRowItemSource::LookupKey:
- outputRowItems[i] = lookupKey.GetElement(index);
+ outputRowItems[i] = lookupPayload && *lookupPayload ? lookupKey.GetElement(index) : NUdf::TUnboxedValue {};
break;
case EOutputRowItemSource::LookupOther:
if (lookupPayload && *lookupPayload) {
diff --git a/ydb/tests/fq/generic/test_streaming_join.py b/ydb/tests/fq/generic/test_streaming_join.py
index 3840ceec3b..dc5e54d85a 100644
--- a/ydb/tests/fq/generic/test_streaming_join.py
+++ b/ydb/tests/fq/generic/test_streaming_join.py
@@ -10,6 +10,7 @@ from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
from ydb.tests.fq.generic.utils.settings import Settings
+DEBUG = 0
TESTCASES = [
# 0
(
@@ -187,6 +188,7 @@ TESTCASES = [
$enriched = select e.id as id,
$formatTime(DateTime::ParseIso8601(e.ts)) as ts,
e.user as user_id,
+ u.id as uid,
u.name as name,
u.age as age
from
@@ -201,35 +203,80 @@ TESTCASES = [
[
(
'{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}',
- '{"id":1,"ts":"11:33:44","user_id":2,"name":"Petr","age":25}',
+ '{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}',
),
(
'{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}',
- '{"id":2,"ts":"11:22:33","user_id":1,"name":"Anya","age":15}',
+ '{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}',
- '{"id":3,"ts":"11:33:55","user_id":100,"name":null,"age":null}',
+ '{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
- '{"id":4,"ts":"11:33:56","user_id":3,"name":"Masha","age":17}',
+ '{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
- '{"id":5,"ts":"11:33:57","user_id":3,"name":"Masha","age":17}',
+ '{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
- '{"id":6,"ts":"11:22:38","user_id":1,"name":"Anya","age":15}',
+ '{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
- '{"id":7,"ts":"11:33:49","user_id":2,"name":"Petr","age":25}',
+ '{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}',
),
]
* 1000,
),
+ # 5
+ (
+ R'''
+ $input = SELECT * FROM myyds.`{input_topic}`
+ WITH (
+ FORMAT=json_each_row,
+ SCHEMA (
+ id Int32,
+ ts String,
+ ev_type String,
+ user Int32,
+ )
+ ) ;
+
+ $enriched = select e.id as id,
+ e.user as user_id,
+ u.id as uid
+ from
+ $input as e
+ left join {streamlookup} ydb_conn_{table_name}.`users` as u
+ on(e.user = u.id)
+ ;
+
+ insert into myyds.`{output_topic}`
+ select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
+ ''',
+ [
+ (
+ '{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}',
+ '{"id":1,"uid":2,"user_id":2}',
+ ),
+ (
+ '{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}',
+ '{"id":2,"uid":1,"user_id":1}',
+ ),
+ (
+ '{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}',
+ '{"id":3,"uid":null,"user_id":100}',
+ ),
+ (
+ '{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
+ '{"id":4,"uid":3,"user_id":3}',
+ ),
+ ],
+ ),
]
@@ -324,9 +371,10 @@ class TestStreamingJoin(TestYdsBase):
offset += 500
read_data = self.read_stream(len(messages))
- print(streamlookup, testcase, file=sys.stderr)
- print(sql, file=sys.stderr)
- print(*zip(messages, read_data), file=sys.stderr, sep="\n")
+ if DEBUG:
+ print(streamlookup, testcase, file=sys.stderr)
+ print(sql, file=sys.stderr)
+ print(*zip(messages, read_data), file=sys.stderr, sep="\n")
for r, exp in zip(read_data, messages):
r = json.loads(r)
exp = json.loads(exp[1])