summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <[email protected]>2025-08-27 17:56:20 +0300
committerGitHub <[email protected]>2025-08-27 14:56:20 +0000
commit163f3124855db79f016cbe8f24d5173777acee47 (patch)
treec9bd61759d61ff84e5e731d8f54a4a4d1e9921fc
parentbc5e0001e00ac6770da34981d8408cfa6044ab36 (diff)
fix stream lookup join chains: keep seqno for input row (#23642)
-rw-r--r--ydb/core/kqp/opt/kqp_type_ann.cpp14
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp10
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp2
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp174
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp8
-rw-r--r--ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp2
6 files changed, 144 insertions, 66 deletions
diff --git a/ydb/core/kqp/opt/kqp_type_ann.cpp b/ydb/core/kqp/opt/kqp_type_ann.cpp
index 64e02617399..a7b0c83da88 100644
--- a/ydb/core/kqp/opt/kqp_type_ann.cpp
+++ b/ydb/core/kqp/opt/kqp_type_ann.cpp
@@ -514,11 +514,14 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
return TStatus::Error;
}
- if (!EnsureTupleTypeSize(node->Pos(), lookupType, 2, ctx)) {
+ auto tupleType = lookupType->Cast<TTupleExprType>();
+ if (tupleType->GetSize() < 2 || tupleType->GetSize() > 3) {
+ ctx.AddError(
+ TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Table stream lookup has unexpected input tuple, expected tuple size 2 or 3, but found %s"
+ << tupleType->GetSize()));
return TStatus::Error;
}
- auto tupleType = lookupType->Cast<TTupleExprType>();
if (!EnsureOptionalType(node->Pos(), *tupleType->GetItems()[0], ctx)) {
return TStatus::Error;
}
@@ -1871,11 +1874,14 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
return TStatus::Error;
}
- if (!EnsureTupleTypeSize(node->Pos(), inputItemType, 2, ctx)) {
+ auto inputTupleType = inputItemType->Cast<TTupleExprType>();
+ if (inputTupleType->GetSize() < 2 || inputTupleType->GetSize() > 3) {
+ ctx.AddError(
+ TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Table stream lookup has unexpected input tuple, expected tuple size 2 or 3, but found %s"
+ << inputTupleType->GetSize()));
return TStatus::Error;
}
- auto inputTupleType = inputItemType->Cast<TTupleExprType>();
if (!EnsureOptionalType(node->Pos(), *inputTupleType->GetItems()[0], ctx)) {
return TStatus::Error;
}
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
index f13a1ec7737..3bec61e0832 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
@@ -836,9 +836,9 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
TMaybeNode<TExprBase> lookupKeys;
if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows || settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) {
- // Result type of lookupIndexTable: list<tuple<left_row, optional<main_table_pk>>>,
- // expected input type for main table stream join: list<tuple<optional<main_table_pk>, left_row>>,
- // so we should transform list<tuple<left_row, optional<main_table_pk>>> to list<tuple<optional<main_table_pk>, left_row>>
+ // Result type of lookupIndexTable: list<tuple<left_row, optional<main_table_pk>, rowMeta>>,
+ // expected input type for main table stream join: list<tuple<optional<main_table_pk>, left_row, rowMeta>>,
+ // so we should transform list<tuple<left_row, optional<main_table_pk>>> to list<tuple<optional<main_table_pk>, left_row, rowMeta>>
lookupKeys = Build<TCoMap>(ctx, node.Pos())
.Input(lookupIndexTable)
.Lambda()
@@ -852,6 +852,10 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
.Tuple("tuple")
.Index().Value("0").Build()
.Build()
+ .Add<TCoNth>()
+ .Tuple("tuple")
+ .Index().Value("2").Build()
+ .Build()
.Build()
.Build()
.Done();
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 8ed157c5ff9..beacf18a747 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -1660,7 +1660,7 @@ private:
case NKqpProto::EStreamLookupStrategy::SEMI_JOIN: {
YQL_ENSURE(inputItemType->GetKind() == ETypeAnnotationKind::Tuple);
const auto inputTupleType = inputItemType->Cast<TTupleExprType>();
- YQL_ENSURE(inputTupleType->GetSize() == 2);
+ YQL_ENSURE(inputTupleType->GetSize() == 2 || inputTupleType->GetSize() == 3);
YQL_ENSURE(inputTupleType->GetItems()[0]->GetKind() == ETypeAnnotationKind::Optional);
const auto joinKeyType = inputTupleType->GetItems()[0]->Cast<TOptionalExprType>()->GetItemType();
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
index b472dcac73d..c095a088dae 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
@@ -497,11 +497,36 @@ public:
}
}
+ struct TUnprocessedLeftRow {
+ TOwnedCellVec JoinKey;
+ NUdf::TUnboxedValue InputLeftRow;
+ ui64 RowSeqNo;
+
+ explicit TUnprocessedLeftRow(TOwnedCellVec joinKey, NUdf::TUnboxedValue inputLeftRow, ui64 rowSeqNo)
+ : JoinKey(std::move(joinKey))
+ , InputLeftRow(std::move(inputLeftRow))
+ , RowSeqNo(rowSeqNo)
+ {}
+ };
+
void AddInputRow(NUdf::TUnboxedValue inputRow) final {
auto joinKey = inputRow.GetElement(0);
std::vector<TCell> joinKeyCells(Settings.LookupKeyColumns.size());
NMiniKQL::TStringProviderBackend backend;
+ ui64 rowSeqNo;
+ bool firstRow = true;
+ bool lastRow = true;
+ if (IsInputTriplet()) {
+ auto value = inputRow.GetElement(2).Get<ui64>();
+ auto cookie = TStreamLookupJoinRowCookie::Decode(value);
+ rowSeqNo = cookie.RowSeqNo;
+ firstRow = cookie.FirstRow;
+ lastRow = cookie.LastRow;
+ } else {
+ rowSeqNo = InputRowSeqNo++;
+ }
+
if (joinKey.HasValue()) {
for (size_t colId = 0; colId < Settings.LookupKeyColumns.size(); ++colId) {
const auto* joinKeyColumn = Settings.LookupKeyColumns[colId];
@@ -513,7 +538,14 @@ public:
}
}
- UnprocessedRows.emplace_back(std::make_pair(TOwnedCellVec(joinKeyCells), std::move(inputRow.GetElement(1))));
+ ResultRowsBySeqNo[rowSeqNo].AcceptLeftRow(firstRow, lastRow);
+ UnprocessedRows.emplace_back(
+ TUnprocessedLeftRow(
+ std::move(TOwnedCellVec(joinKeyCells)),
+ std::move(inputRow.GetElement(1)),
+ rowSeqNo
+ )
+ );
}
void AddInputRow(TConstArrayRef<TCell>) final {
@@ -545,12 +577,6 @@ public:
auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(ranges[i]));
YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
leftRowIt->second.PendingReads.erase(prevReadId);
-
- if (leftRowIt->second.Completed()) {
- YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo));
- ResultRowsBySeqNo[leftRowIt->second.SeqNo].Completed = true;
- PendingLeftRowsByKey.erase(leftRowIt);
- }
}
if (lastProcessedKey) {
@@ -645,9 +671,9 @@ public:
}
while (!UnprocessedRows.empty()) {
- auto [joinKey, leftData] = UnprocessedRows.front();
+ const auto& unprocessedRow = UnprocessedRows.front();
- if (PendingLeftRowsByKey.contains(joinKey)) {
+ if (PendingLeftRowsByKey.contains(unprocessedRow.JoinKey)) {
// TODO: skip key duplicate
break;
}
@@ -669,22 +695,21 @@ public:
return true;
};
- UnprocessedRows.pop_front();
- if (isKeyAllowed(joinKey)) {
+ if (isKeyAllowed(unprocessedRow.JoinKey)) {
std::vector <std::pair<ui64, TOwnedTableRange>> partitions;
- if (joinKey.size() < Settings.KeyColumns.size()) {
+ if (unprocessedRow.JoinKey.size() < Settings.KeyColumns.size()) {
// build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf])
- std::vector <TCell> fromCells(Settings.KeyColumns.size() - joinKey.size());
- fromCells.insert(fromCells.begin(), joinKey.begin(), joinKey.end());
+ std::vector <TCell> fromCells(Settings.KeyColumns.size() - unprocessedRow.JoinKey.size());
+ fromCells.insert(fromCells.begin(), unprocessedRow.JoinKey.begin(), unprocessedRow.JoinKey.end());
bool fromInclusive = true;
bool toInclusive = false;
partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(),
- TOwnedTableRange(fromCells, fromInclusive, joinKey, toInclusive)
+ TOwnedTableRange(fromCells, fromInclusive, unprocessedRow.JoinKey, toInclusive)
);
} else {
// full pk, build point
- partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), TOwnedTableRange(joinKey));
+ partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), TOwnedTableRange(unprocessedRow.JoinKey));
}
for (auto[shardId, range] : partitions) {
@@ -696,7 +721,10 @@ public:
}
}
- PendingLeftRowsByKey.insert(std::make_pair(std::move(joinKey), TLeftRowInfo(std::move(leftData), InputRowSeqNo++, GetLeftRowType())));
+ PendingLeftRowsByKey.insert(
+ std::make_pair(std::move(unprocessedRow.JoinKey), TLeftRowInfo(
+ std::move(unprocessedRow.InputLeftRow), unprocessedRow.RowSeqNo, GetLeftRowType())));
+ UnprocessedRows.pop_front();
}
std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>> requests;
@@ -764,10 +792,12 @@ public:
YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
TReadResultStats rowStats;
- auto resultRow = leftRowIt->second.TryBuildResultRow(HolderFactory, Settings, ReadColumns, row, rowStats, false, result.ShardId);
+ auto& resultRows = ResultRowsBySeqNo[leftRowIt->second.SeqNo];
+ auto resultRow = leftRowIt->second.TryBuildResultRow(
+ HolderFactory, Settings, ReadColumns, row, rowStats, resultRows.FirstRowInBatch(), false, result.ShardId);
if (resultRow.HasValue()) {
+ resultRows.AddRow(std::move(rowStats), std::move(resultRow));
YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo));
- ResultRowsBySeqNo[leftRowIt->second.SeqNo].Rows.emplace_back(std::move(resultRow), std::move(rowStats));
}
}
@@ -776,14 +806,6 @@ public:
auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(key));
YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
leftRowIt->second.PendingReads.erase(record.GetReadId());
-
- // row is considered processed when all reads are finished
- // and at least one right row is found
- if (leftRowIt->second.Completed()) {
- YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo));
- ResultRowsBySeqNo[leftRowIt->second.SeqNo].Completed = true;
- PendingLeftRowsByKey.erase(leftRowIt);
- }
}
ReadStateByReadId.erase(pendingKeysIt);
@@ -846,11 +868,11 @@ public:
for (auto leftRowIt = PendingLeftRowsByKey.begin(); leftRowIt != PendingLeftRowsByKey.end();) {
if (leftRowIt->second.PendingReads.empty()) {
TReadResultStats rowStats;
- auto resultRow = leftRowIt->second.TryBuildResultRow(HolderFactory, Settings, ReadColumns, {}, rowStats, true);
- YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo));
auto& result = ResultRowsBySeqNo[leftRowIt->second.SeqNo];
- result.Rows.emplace_back(std::move(resultRow), std::move(rowStats));
- result.Completed = true;
+ result.FinishLeftRow();
+ auto resultRow = leftRowIt->second.TryBuildResultRow(HolderFactory, Settings, ReadColumns, {}, rowStats, result.FirstRowInBatch(), result.ProcessedAllLeftRows());
+ YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo));
+ result.AddRow(std::move(rowStats), std::move(resultRow));
PendingLeftRowsByKey.erase(leftRowIt++);
} else {
++leftRowIt;
@@ -872,8 +894,9 @@ public:
}
auto& result = resultIt->second;
- for (; result.FirstUnprocessedRow < result.Rows.size(); ++result.FirstUnprocessedRow) {
- auto& row = result.Rows[result.FirstUnprocessedRow];
+
+ while(!result.Rows.empty()) {
+ TResultBatch::TResultRow& row = result.Rows.front();
if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
resultStats.SizeLimitExceeded = true;
@@ -881,21 +904,15 @@ public:
}
batch.emplace_back(std::move(row.Data));
+ result.Rows.pop_front();
resultStats.Add(row.Stats);
}
- if (result.FirstUnprocessedRow == result.Rows.size()) {
- if (ShoulKeepRowsOrder()) {
- // we can increment seqNo only if current result is completed
- if (result.Completed) {
- ResultRowsBySeqNo.erase(resultIt);
- ++CurrentResultSeqNo;
- } else {
- break;
- }
- } else {
- ResultRowsBySeqNo.erase(resultIt);
- }
+ if (result.Completed()) {
+ ResultRowsBySeqNo.erase(resultIt);
+ ++CurrentResultSeqNo;
+ } else {
+ break;
}
}
@@ -926,7 +943,6 @@ private:
bool RightRowExist = false;
bool LastRowReceived = false;
const ui64 SeqNo;
- ui64 MatchedRows = 0;
NMiniKQL::TStructType* LeftRowType = nullptr;
ui64 LeftRowSize = 0;
@@ -936,7 +952,7 @@ private:
NUdf::TUnboxedValue TryBuildResultRow(const NMiniKQL::THolderFactory& HolderFactory, const TLookupSettings& Settings,
const std::map<std::string, TSysTables::TTableColumnInfo>& ReadColumns, TConstArrayRef<TCell> rightRow,
- TReadResultStats& rowStats, bool lastRow, TMaybe<ui64> shardId = {})
+ TReadResultStats& rowStats, bool firstRow, bool lastRow, TMaybe<ui64> shardId = {})
{
if (lastRow) {
LastRowReceived = lastRow;
@@ -950,8 +966,7 @@ private:
resultRow = HolderFactory.CreateDirectArrayHolder(3, resultRowItems);
resultRowItems[0] = Row;
resultRowItems[1] = std::move(RightRow);
- MatchedRows++;
- auto rowCookie = TStreamLookupJoinRowCookie{.RowSeqNo=SeqNo, .LastRow=lastRow, .FirstRow=(MatchedRows == 1)};
+ auto rowCookie = TStreamLookupJoinRowCookie{.RowSeqNo=SeqNo, .LastRow=lastRow, .FirstRow=firstRow};
resultRowItems[2] = NUdf::TUnboxedValuePod(rowCookie.Encode());
rowStats.ReadRowsCount += (hasValue ? 1 : 0);
rowStats.ReadBytesCount += StorageReadBytes;
@@ -1006,9 +1021,48 @@ private:
TReadResultStats Stats;
};
- std::vector<TResultRow> Rows;
+ std::deque<TResultRow> Rows;
ui32 FirstUnprocessedRow = 0;
- bool Completed = false;
+ ui64 ProcessedRows = 0;
+ ui64 ProcessingLeftRows = 0;
+ bool FirstRow = false;
+ bool LastRow = false;
+
+ bool Finished() const {
+ return Rows.empty();
+ }
+
+ bool ProcessedAllLeftRows() const {
+ return ProcessingLeftRows == 0;
+ }
+
+ void FinishLeftRow() {
+ --ProcessingLeftRows;
+ }
+
+ void AcceptLeftRow(bool firstRow, bool lastRow) {
+ ++ProcessingLeftRows;
+ if (firstRow)
+ FirstRow = true;
+
+ if (lastRow)
+ LastRow = true;
+ }
+
+ bool Completed() const {
+ return Rows.empty() && FirstRow && LastRow && ProcessingLeftRows == 0;
+ }
+
+ void AddRow(TReadResultStats&& stats, NUdf::TUnboxedValue&& data) {
+ if (data.HasValue()) {
+ Rows.emplace_back(std::move(data), std::move(stats));
+ ++ProcessedRows;
+ }
+ }
+
+ bool FirstRowInBatch() const {
+ return ProcessedRows == 0;
+ }
};
bool ShoulKeepRowsOrder() const {
@@ -1068,6 +1122,21 @@ private:
return range.From.subspan(0, Settings.LookupKeyColumns.size());
}
+ bool IsInputTriplet() {
+ if (InputTupleType) {
+ return InputTupleType->GetElementsCount() == 3;
+ }
+
+ auto inputTypeNode = NMiniKQL::DeserializeNode(TStringBuf{InputDesc.GetTransform().GetInputType()}, TypeEnv);
+ YQL_ENSURE(inputTypeNode, "Failed to deserialize stream lookup transform output type");
+
+ auto inputType = static_cast<NMiniKQL::TType*>(inputTypeNode);
+ YQL_ENSURE(inputType->GetKind() == NMiniKQL::TType::EKind::Tuple, "Unexpected stream lookup output type");
+
+ InputTupleType = AS_TYPE(NMiniKQL::TTupleType, inputType);
+ return InputTupleType->GetElementsCount() == 3;
+ }
+
NMiniKQL::TStructType* GetLeftRowType() {
if (LeftRowType) {
// KIKIMR-23296: avoid allocating separate type structure for each lookup
@@ -1094,7 +1163,7 @@ private:
private:
const NYql::NDqProto::TTaskInput& InputDesc;
std::map<std::string, TSysTables::TTableColumnInfo> ReadColumns;
- std::deque<std::pair<TOwnedCellVec, NUdf::TUnboxedValue>> UnprocessedRows;
+ std::deque<TUnprocessedLeftRow> UnprocessedRows;
std::deque<TOwnedTableRange> UnprocessedKeys;
std::unordered_map<ui64, TReadState> ReadStateByReadId;
absl::flat_hash_map<TOwnedCellVec, TLeftRowInfo, NKikimr::TCellVectorsHash, NKikimr::TCellVectorsEquals> PendingLeftRowsByKey;
@@ -1102,6 +1171,7 @@ private:
ui64 InputRowSeqNo = 0;
ui64 CurrentResultSeqNo = 0;
NMiniKQL::TStructType* LeftRowType = nullptr;
+ NKikimr::NMiniKQL::TTupleType* InputTupleType = nullptr;
};
std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
index 7e8731f9137..abd90ce5a1b 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
@@ -5504,16 +5504,12 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
}
Y_UNIT_TEST_TWIN(JoinWithNonPKColumnsInPredicate, UseStreamJoin) {
- if (UseStreamJoin) {
- return;
- }
-
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings().SetKqpSettings({setting});
serverSettings.AppConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(UseStreamJoin);
TKikimrRunner kikimr(serverSettings);
- kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BUILD_INDEX, NActors::NLog::PRI_TRACE);
- kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+ //kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BUILD_INDEX, NActors::NLog::PRI_TRACE);
+ //kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
index 737dabea4fc..135e415b93c 100644
--- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
+++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
@@ -1066,6 +1066,7 @@ Y_UNIT_TEST_TWIN(JoinLeftJoinPostJoinFilterTest, StreamLookupJoin) {
select A.a, A.b, B.a, B.b from A
left join (select * from B where a > 2 and a < 3) as B
on A.b = B.b
+ ORDER BY A.a, A.b
)",
.Answer=R"([
[[1];[2];#;#];[[2];[2];#;#];[[3];[2];#;#];[[4];[2];#;#]
@@ -1082,6 +1083,7 @@ Y_UNIT_TEST_TWIN(JoinInclusionTestSemiJoin, StreamLookupJoin) {
select A.a, A.b, from A
left semi join (select * from B where a > 1 and a < 3) as B
ON A.b = B.b
+ ORDER BY A.a, A.b
)",
.Answer=R"([
[[1];[2]];[[2];[2]];[[3];[2]];[[4];[2]]