diff options
author | Vitalii Gridnev <[email protected]> | 2025-08-27 17:56:20 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2025-08-27 14:56:20 +0000 |
commit | 163f3124855db79f016cbe8f24d5173777acee47 (patch) | |
tree | c9bd61759d61ff84e5e731d8f54a4a4d1e9921fc | |
parent | bc5e0001e00ac6770da34981d8408cfa6044ab36 (diff) |
fix stream lookup join chains: keep seqno for input row (#23642)
-rw-r--r-- | ydb/core/kqp/opt/kqp_type_ann.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp | 174 | ||||
-rw-r--r-- | ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp | 2 |
6 files changed, 144 insertions, 66 deletions
diff --git a/ydb/core/kqp/opt/kqp_type_ann.cpp b/ydb/core/kqp/opt/kqp_type_ann.cpp index 64e02617399..a7b0c83da88 100644 --- a/ydb/core/kqp/opt/kqp_type_ann.cpp +++ b/ydb/core/kqp/opt/kqp_type_ann.cpp @@ -514,11 +514,14 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons return TStatus::Error; } - if (!EnsureTupleTypeSize(node->Pos(), lookupType, 2, ctx)) { + auto tupleType = lookupType->Cast<TTupleExprType>(); + if (tupleType->GetSize() < 2 || tupleType->GetSize() > 3) { + ctx.AddError( + TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Table stream lookup has unexpected input tuple, expected tuple size 2 or 3, but found %s" + << tupleType->GetSize())); return TStatus::Error; } - auto tupleType = lookupType->Cast<TTupleExprType>(); if (!EnsureOptionalType(node->Pos(), *tupleType->GetItems()[0], ctx)) { return TStatus::Error; } @@ -1871,11 +1874,14 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext return TStatus::Error; } - if (!EnsureTupleTypeSize(node->Pos(), inputItemType, 2, ctx)) { + auto inputTupleType = inputItemType->Cast<TTupleExprType>(); + if (inputTupleType->GetSize() < 2 || inputTupleType->GetSize() > 3) { + ctx.AddError( + TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Table stream lookup has unexpected input tuple, expected tuple size 2 or 3, but found %s" + << inputTupleType->GetSize())); return TStatus::Error; } - auto inputTupleType = inputItemType->Cast<TTupleExprType>(); if (!EnsureOptionalType(node->Pos(), *inputTupleType->GetItems()[0], ctx)) { return TStatus::Error; } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp index f13a1ec7737..3bec61e0832 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp @@ -836,9 +836,9 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, TMaybeNode<TExprBase> lookupKeys; if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows || settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) { - // Result type of lookupIndexTable: list<tuple<left_row, optional<main_table_pk>>>, - // expected input type for main table stream join: list<tuple<optional<main_table_pk>, left_row>>, - // so we should transform list<tuple<left_row, optional<main_table_pk>>> to list<tuple<optional<main_table_pk>, left_row>> + // Result type of lookupIndexTable: list<tuple<left_row, optional<main_table_pk>, rowMeta>>, + // expected input type for main table stream join: list<tuple<optional<main_table_pk>, left_row, rowMeta>>, + // so we should transform list<tuple<left_row, optional<main_table_pk>>> to list<tuple<optional<main_table_pk>, left_row, rowMeta>> lookupKeys = Build<TCoMap>(ctx, node.Pos()) .Input(lookupIndexTable) .Lambda() @@ -852,6 +852,10 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, .Tuple("tuple") .Index().Value("0").Build() .Build() + .Add<TCoNth>() + .Tuple("tuple") + .Index().Value("2").Build() + .Build() .Build() .Build() .Done(); diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 8ed157c5ff9..beacf18a747 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -1660,7 +1660,7 @@ private: case NKqpProto::EStreamLookupStrategy::SEMI_JOIN: { YQL_ENSURE(inputItemType->GetKind() == ETypeAnnotationKind::Tuple); const auto inputTupleType = inputItemType->Cast<TTupleExprType>(); - YQL_ENSURE(inputTupleType->GetSize() == 2); + YQL_ENSURE(inputTupleType->GetSize() == 2 || inputTupleType->GetSize() == 3); YQL_ENSURE(inputTupleType->GetItems()[0]->GetKind() == ETypeAnnotationKind::Optional); const auto joinKeyType = inputTupleType->GetItems()[0]->Cast<TOptionalExprType>()->GetItemType(); diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index b472dcac73d..c095a088dae 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -497,11 +497,36 @@ public: } } + struct TUnprocessedLeftRow { + TOwnedCellVec JoinKey; + NUdf::TUnboxedValue InputLeftRow; + ui64 RowSeqNo; + + explicit TUnprocessedLeftRow(TOwnedCellVec joinKey, NUdf::TUnboxedValue inputLeftRow, ui64 rowSeqNo) + : JoinKey(std::move(joinKey)) + , InputLeftRow(std::move(inputLeftRow)) + , RowSeqNo(rowSeqNo) + {} + }; + void AddInputRow(NUdf::TUnboxedValue inputRow) final { auto joinKey = inputRow.GetElement(0); std::vector<TCell> joinKeyCells(Settings.LookupKeyColumns.size()); NMiniKQL::TStringProviderBackend backend; + ui64 rowSeqNo; + bool firstRow = true; + bool lastRow = true; + if (IsInputTriplet()) { + auto value = inputRow.GetElement(2).Get<ui64>(); + auto cookie = TStreamLookupJoinRowCookie::Decode(value); + rowSeqNo = cookie.RowSeqNo; + firstRow = cookie.FirstRow; + lastRow = cookie.LastRow; + } else { + rowSeqNo = InputRowSeqNo++; + } + if (joinKey.HasValue()) { for (size_t colId = 0; colId < Settings.LookupKeyColumns.size(); ++colId) { const auto* joinKeyColumn = Settings.LookupKeyColumns[colId]; @@ -513,7 +538,14 @@ public: } } - UnprocessedRows.emplace_back(std::make_pair(TOwnedCellVec(joinKeyCells), std::move(inputRow.GetElement(1)))); + ResultRowsBySeqNo[rowSeqNo].AcceptLeftRow(firstRow, lastRow); + UnprocessedRows.emplace_back( + TUnprocessedLeftRow( + std::move(TOwnedCellVec(joinKeyCells)), + std::move(inputRow.GetElement(1)), + rowSeqNo + ) + ); } void AddInputRow(TConstArrayRef<TCell>) final { @@ -545,12 +577,6 @@ public: auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(ranges[i])); YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end()); leftRowIt->second.PendingReads.erase(prevReadId); - - if (leftRowIt->second.Completed()) { - YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo)); - ResultRowsBySeqNo[leftRowIt->second.SeqNo].Completed = true; - PendingLeftRowsByKey.erase(leftRowIt); - } } if (lastProcessedKey) { @@ -645,9 +671,9 @@ public: } while (!UnprocessedRows.empty()) { - auto [joinKey, leftData] = UnprocessedRows.front(); + const auto& unprocessedRow = UnprocessedRows.front(); - if (PendingLeftRowsByKey.contains(joinKey)) { + if (PendingLeftRowsByKey.contains(unprocessedRow.JoinKey)) { // TODO: skip key duplicate break; } @@ -669,22 +695,21 @@ public: return true; }; - UnprocessedRows.pop_front(); - if (isKeyAllowed(joinKey)) { + if (isKeyAllowed(unprocessedRow.JoinKey)) { std::vector <std::pair<ui64, TOwnedTableRange>> partitions; - if (joinKey.size() < Settings.KeyColumns.size()) { + if (unprocessedRow.JoinKey.size() < Settings.KeyColumns.size()) { // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) - std::vector <TCell> fromCells(Settings.KeyColumns.size() - joinKey.size()); - fromCells.insert(fromCells.begin(), joinKey.begin(), joinKey.end()); + std::vector <TCell> fromCells(Settings.KeyColumns.size() - unprocessedRow.JoinKey.size()); + fromCells.insert(fromCells.begin(), unprocessedRow.JoinKey.begin(), unprocessedRow.JoinKey.end()); bool fromInclusive = true; bool toInclusive = false; partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), - TOwnedTableRange(fromCells, fromInclusive, joinKey, toInclusive) + TOwnedTableRange(fromCells, fromInclusive, unprocessedRow.JoinKey, toInclusive) ); } else { // full pk, build point - partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), TOwnedTableRange(joinKey)); + partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), TOwnedTableRange(unprocessedRow.JoinKey)); } for (auto[shardId, range] : partitions) { @@ -696,7 +721,10 @@ public: } } - PendingLeftRowsByKey.insert(std::make_pair(std::move(joinKey), TLeftRowInfo(std::move(leftData), InputRowSeqNo++, GetLeftRowType()))); + PendingLeftRowsByKey.insert( + std::make_pair(std::move(unprocessedRow.JoinKey), TLeftRowInfo( + std::move(unprocessedRow.InputLeftRow), unprocessedRow.RowSeqNo, GetLeftRowType()))); + UnprocessedRows.pop_front(); } std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>> requests; @@ -764,10 +792,12 @@ public: YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end()); TReadResultStats rowStats; - auto resultRow = leftRowIt->second.TryBuildResultRow(HolderFactory, Settings, ReadColumns, row, rowStats, false, result.ShardId); + auto& resultRows = ResultRowsBySeqNo[leftRowIt->second.SeqNo]; + auto resultRow = leftRowIt->second.TryBuildResultRow( + HolderFactory, Settings, ReadColumns, row, rowStats, resultRows.FirstRowInBatch(), false, result.ShardId); if (resultRow.HasValue()) { + resultRows.AddRow(std::move(rowStats), std::move(resultRow)); YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo)); - ResultRowsBySeqNo[leftRowIt->second.SeqNo].Rows.emplace_back(std::move(resultRow), std::move(rowStats)); } } @@ -776,14 +806,6 @@ public: auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(key)); YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end()); leftRowIt->second.PendingReads.erase(record.GetReadId()); - - // row is considered processed when all reads are finished - // and at least one right row is found - if (leftRowIt->second.Completed()) { - YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo)); - ResultRowsBySeqNo[leftRowIt->second.SeqNo].Completed = true; - PendingLeftRowsByKey.erase(leftRowIt); - } } ReadStateByReadId.erase(pendingKeysIt); @@ -846,11 +868,11 @@ public: for (auto leftRowIt = PendingLeftRowsByKey.begin(); leftRowIt != PendingLeftRowsByKey.end();) { if (leftRowIt->second.PendingReads.empty()) { TReadResultStats rowStats; - auto resultRow = leftRowIt->second.TryBuildResultRow(HolderFactory, Settings, ReadColumns, {}, rowStats, true); - YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo)); auto& result = ResultRowsBySeqNo[leftRowIt->second.SeqNo]; - result.Rows.emplace_back(std::move(resultRow), std::move(rowStats)); - result.Completed = true; + result.FinishLeftRow(); + auto resultRow = leftRowIt->second.TryBuildResultRow(HolderFactory, Settings, ReadColumns, {}, rowStats, result.FirstRowInBatch(), result.ProcessedAllLeftRows()); + YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo)); + result.AddRow(std::move(rowStats), std::move(resultRow)); PendingLeftRowsByKey.erase(leftRowIt++); } else { ++leftRowIt; @@ -872,8 +894,9 @@ public: } auto& result = resultIt->second; - for (; result.FirstUnprocessedRow < result.Rows.size(); ++result.FirstUnprocessedRow) { - auto& row = result.Rows[result.FirstUnprocessedRow]; + + while(!result.Rows.empty()) { + TResultBatch::TResultRow& row = result.Rows.front(); if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) { resultStats.SizeLimitExceeded = true; @@ -881,21 +904,15 @@ public: } batch.emplace_back(std::move(row.Data)); + result.Rows.pop_front(); resultStats.Add(row.Stats); } - if (result.FirstUnprocessedRow == result.Rows.size()) { - if (ShoulKeepRowsOrder()) { - // we can increment seqNo only if current result is completed - if (result.Completed) { - ResultRowsBySeqNo.erase(resultIt); - ++CurrentResultSeqNo; - } else { - break; - } - } else { - ResultRowsBySeqNo.erase(resultIt); - } + if (result.Completed()) { + ResultRowsBySeqNo.erase(resultIt); + ++CurrentResultSeqNo; + } else { + break; } } @@ -926,7 +943,6 @@ private: bool RightRowExist = false; bool LastRowReceived = false; const ui64 SeqNo; - ui64 MatchedRows = 0; NMiniKQL::TStructType* LeftRowType = nullptr; ui64 LeftRowSize = 0; @@ -936,7 +952,7 @@ private: NUdf::TUnboxedValue TryBuildResultRow(const NMiniKQL::THolderFactory& HolderFactory, const TLookupSettings& Settings, const std::map<std::string, TSysTables::TTableColumnInfo>& ReadColumns, TConstArrayRef<TCell> rightRow, - TReadResultStats& rowStats, bool lastRow, TMaybe<ui64> shardId = {}) + TReadResultStats& rowStats, bool firstRow, bool lastRow, TMaybe<ui64> shardId = {}) { if (lastRow) { LastRowReceived = lastRow; @@ -950,8 +966,7 @@ private: resultRow = HolderFactory.CreateDirectArrayHolder(3, resultRowItems); resultRowItems[0] = Row; resultRowItems[1] = std::move(RightRow); - MatchedRows++; - auto rowCookie = TStreamLookupJoinRowCookie{.RowSeqNo=SeqNo, .LastRow=lastRow, .FirstRow=(MatchedRows == 1)}; + auto rowCookie = TStreamLookupJoinRowCookie{.RowSeqNo=SeqNo, .LastRow=lastRow, .FirstRow=firstRow}; resultRowItems[2] = NUdf::TUnboxedValuePod(rowCookie.Encode()); rowStats.ReadRowsCount += (hasValue ? 1 : 0); rowStats.ReadBytesCount += StorageReadBytes; @@ -1006,9 +1021,48 @@ private: TReadResultStats Stats; }; - std::vector<TResultRow> Rows; + std::deque<TResultRow> Rows; ui32 FirstUnprocessedRow = 0; - bool Completed = false; + ui64 ProcessedRows = 0; + ui64 ProcessingLeftRows = 0; + bool FirstRow = false; + bool LastRow = false; + + bool Finished() const { + return Rows.empty(); + } + + bool ProcessedAllLeftRows() const { + return ProcessingLeftRows == 0; + } + + void FinishLeftRow() { + --ProcessingLeftRows; + } + + void AcceptLeftRow(bool firstRow, bool lastRow) { + ++ProcessingLeftRows; + if (firstRow) + FirstRow = true; + + if (lastRow) + LastRow = true; + } + + bool Completed() const { + return Rows.empty() && FirstRow && LastRow && ProcessingLeftRows == 0; + } + + void AddRow(TReadResultStats&& stats, NUdf::TUnboxedValue&& data) { + if (data.HasValue()) { + Rows.emplace_back(std::move(data), std::move(stats)); + ++ProcessedRows; + } + } + + bool FirstRowInBatch() const { + return ProcessedRows == 0; + } }; bool ShoulKeepRowsOrder() const { @@ -1068,6 +1122,21 @@ private: return range.From.subspan(0, Settings.LookupKeyColumns.size()); } + bool IsInputTriplet() { + if (InputTupleType) { + return InputTupleType->GetElementsCount() == 3; + } + + auto inputTypeNode = NMiniKQL::DeserializeNode(TStringBuf{InputDesc.GetTransform().GetInputType()}, TypeEnv); + YQL_ENSURE(inputTypeNode, "Failed to deserialize stream lookup transform output type"); + + auto inputType = static_cast<NMiniKQL::TType*>(inputTypeNode); + YQL_ENSURE(inputType->GetKind() == NMiniKQL::TType::EKind::Tuple, "Unexpected stream lookup output type"); + + InputTupleType = AS_TYPE(NMiniKQL::TTupleType, inputType); + return InputTupleType->GetElementsCount() == 3; + } + NMiniKQL::TStructType* GetLeftRowType() { if (LeftRowType) { // KIKIMR-23296: avoid allocating separate type structure for each lookup @@ -1094,7 +1163,7 @@ private: private: const NYql::NDqProto::TTaskInput& InputDesc; std::map<std::string, TSysTables::TTableColumnInfo> ReadColumns; - std::deque<std::pair<TOwnedCellVec, NUdf::TUnboxedValue>> UnprocessedRows; + std::deque<TUnprocessedLeftRow> UnprocessedRows; std::deque<TOwnedTableRange> UnprocessedKeys; std::unordered_map<ui64, TReadState> ReadStateByReadId; absl::flat_hash_map<TOwnedCellVec, TLeftRowInfo, NKikimr::TCellVectorsHash, NKikimr::TCellVectorsEquals> PendingLeftRowsByKey; @@ -1102,6 +1171,7 @@ private: ui64 InputRowSeqNo = 0; ui64 CurrentResultSeqNo = 0; NMiniKQL::TStructType* LeftRowType = nullptr; + NKikimr::NMiniKQL::TTupleType* InputTupleType = nullptr; }; std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 7e8731f9137..abd90ce5a1b 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -5504,16 +5504,12 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda } Y_UNIT_TEST_TWIN(JoinWithNonPKColumnsInPredicate, UseStreamJoin) { - if (UseStreamJoin) { - return; - } - auto setting = NKikimrKqp::TKqpSetting(); auto serverSettings = TKikimrSettings().SetKqpSettings({setting}); serverSettings.AppConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(UseStreamJoin); TKikimrRunner kikimr(serverSettings); - kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BUILD_INDEX, NActors::NLog::PRI_TRACE); - kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + //kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BUILD_INDEX, NActors::NLog::PRI_TRACE); + //kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp index 737dabea4fc..135e415b93c 100644 --- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp @@ -1066,6 +1066,7 @@ Y_UNIT_TEST_TWIN(JoinLeftJoinPostJoinFilterTest, StreamLookupJoin) { select A.a, A.b, B.a, B.b from A left join (select * from B where a > 2 and a < 3) as B on A.b = B.b + ORDER BY A.a, A.b )", .Answer=R"([ [[1];[2];#;#];[[2];[2];#;#];[[3];[2];#;#];[[4];[2];#;#] @@ -1082,6 +1083,7 @@ Y_UNIT_TEST_TWIN(JoinInclusionTestSemiJoin, StreamLookupJoin) { select A.a, A.b, from A left semi join (select * from B where a > 1 and a < 3) as B ON A.b = B.b + ORDER BY A.a, A.b )", .Answer=R"([ [[1];[2]];[[2];[2]];[[3];[2]];[[4];[2]] |