diff options
author | Mikhail Surin <ssmike@ydb.tech> | 2024-12-03 13:37:02 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-03 13:37:02 +0300 |
commit | a349c79bba9bd87a88b25535d090244811bf70a6 (patch) | |
tree | 6eb06c2fc1878e621da051bdcfefca186bf11bca | |
parent | 9a920b4c8acf65115fc284af9ff8df7428e22af9 (diff) | |
download | ydb-a349c79bba9bd87a88b25535d090244811bf70a6.tar.gz |
allow to keep rows order for stream lookup join (#12233)
Co-authored-by: ulya-sidorina <yulia@ydb.tech>
Co-authored-by: Pavel Velikhov <pavelvelikhov@ydb.tech>
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_settings.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_settings.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp | 222 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/join/kqp_join_order_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 1 |
10 files changed, 147 insertions, 92 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index fbef481428..ec171c1c0b 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -419,6 +419,7 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf } settings->SetLookupStrategy(streamLookup.GetLookupStrategy()); + settings->SetKeepRowsOrder(streamLookup.GetKeepRowsOrder()); TTransform streamLookupTransform; streamLookupTransform.Type = "StreamLookupInputTransformer"; diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.cpp b/ydb/core/kqp/provider/yql_kikimr_settings.cpp index 65d960ee45..da4f7733d6 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_settings.cpp @@ -87,6 +87,7 @@ TKikimrConfiguration::TKikimrConfiguration() { REGISTER_SETTING(*this, OptimizerHints).Parser([](const TString& v) { return NYql::TOptimizerHints::Parse(v); }); REGISTER_SETTING(*this, OverridePlanner); REGISTER_SETTING(*this, UseGraceJoinCoreForMap); + REGISTER_SETTING(*this, EnableOrderPreservingLookupJoin); REGISTER_SETTING(*this, OptUseFinalizeByKey); REGISTER_SETTING(*this, CostBasedOptimizationLevel); @@ -121,6 +122,10 @@ bool TKikimrSettings::SpillingEnabled() const { return GetFlagValue(_KqpEnableSpilling.Get()); } +bool TKikimrSettings::OrderPreservingLookupJoinEnabled() const { + return GetFlagValue(EnableOrderPreservingLookupJoin.Get()); +} + bool TKikimrSettings::DisableLlvmForUdfStages() const { return GetFlagValue(_KqpDisableLlvmForUdfStages.Get()); } diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index 28bab1b62a..9d81e3f2fa 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -55,6 +55,7 @@ struct TKikimrSettings { NCommon::TConfSetting<ui64, false> EnableSpillingNodes; NCommon::TConfSetting<TString, false> OverridePlanner; NCommon::TConfSetting<bool, false> UseGraceJoinCoreForMap; + NCommon::TConfSetting<bool, false> EnableOrderPreservingLookupJoin; NCommon::TConfSetting<TString, false> OptOverrideStatistics; NCommon::TConfSetting<NYql::TOptimizerHints, false> OptimizerHints; @@ -91,7 +92,7 @@ struct TKikimrSettings { bool HasOptEnableOlapProvideComputeSharding() const; bool HasOptUseFinalizeByKey() const; bool HasMaxSequentialReadsInFlight() const; - + bool OrderPreservingLookupJoinEnabled() const; EOptionalFlag GetOptPredicateExtract() const; EOptionalFlag GetUseLlvm() const; NDq::EHashJoinMode GetHashJoinMode() const; diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index b0354be230..cc69404c02 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -1383,6 +1383,7 @@ private: YQL_ENSURE(streamLookup.LookupStrategy().Maybe<TCoAtom>()); TString lookupStrategy = streamLookup.LookupStrategy().Maybe<TCoAtom>().Cast().StringValue(); streamLookupProto.SetLookupStrategy(GetStreamLookupStrategy(lookupStrategy)); + streamLookupProto.SetKeepRowsOrder(Config->OrderPreservingLookupJoinEnabled()); switch (streamLookupProto.GetLookupStrategy()) { case NKqpProto::EStreamLookupStrategy::LOOKUP: { diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index bf917fbb17..30f7d86d50 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -400,6 +400,7 @@ private: } } + auto guard = BindAllocator(); StreamLookupWorker->AddResult(TKqpStreamLookupWorker::TShardReadResult{ read.ShardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release()) }); diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index 0a5eae5940..54b88c2f97 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -131,16 +131,16 @@ struct TKeyEq { TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc) - : TypeEnv(typeEnv) + : Settings(std::move(settings)) + , TypeEnv(typeEnv) , HolderFactory(holderFactory) , InputDesc(inputDesc) - , TablePath(settings.GetTable().GetPath()) - , TableId(MakeTableId(settings.GetTable())) - , Strategy(settings.GetLookupStrategy()) { + , TablePath(Settings.GetTable().GetPath()) + , TableId(MakeTableId(Settings.GetTable())) { - KeyColumns.reserve(settings.GetKeyColumns().size()); + KeyColumns.reserve(Settings.GetKeyColumns().size()); i32 keyOrder = 0; - for (const auto& keyColumn : settings.GetKeyColumns()) { + for (const auto& keyColumn : Settings.GetKeyColumns()) { NScheme::TTypeInfo typeInfo = NScheme::TypeInfoFromProto(keyColumn.GetTypeId(), keyColumn.GetTypeInfo()); KeyColumns.emplace( @@ -155,15 +155,15 @@ TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSetti ); } - LookupKeyColumns.reserve(settings.GetLookupKeyColumns().size()); - for (const auto& lookupKey : settings.GetLookupKeyColumns()) { + LookupKeyColumns.reserve(Settings.GetLookupKeyColumns().size()); + for (const auto& lookupKey : Settings.GetLookupKeyColumns()) { auto columnIt = KeyColumns.find(lookupKey); YQL_ENSURE(columnIt != KeyColumns.end()); LookupKeyColumns.push_back(&columnIt->second); } - Columns.reserve(settings.GetColumns().size()); - for (const auto& column : settings.GetColumns()) { + Columns.reserve(Settings.GetColumns().size()); + for (const auto& column : Settings.GetColumns()) { NScheme::TTypeInfo typeInfo = NScheme::TypeInfoFromProto(column.GetTypeId(), column.GetTypeInfo()); Columns.emplace_back(TSysTables::TTableColumnInfo{ @@ -634,7 +634,7 @@ public: } } - PendingLeftRowsByKey.insert(std::make_pair(std::move(joinKey), TLeftRowInfo{std::move(leftData)})); + PendingLeftRowsByKey.insert(std::make_pair(std::move(joinKey), TLeftRowInfo{std::move(leftData), InputRowSeqNo++})); } std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>> requests; @@ -675,17 +675,62 @@ public: const auto& record = result.ReadResult->Get()->Record; YQL_ENSURE(record.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS); - auto it = PendingKeysByReadId.find(record.GetReadId()); - YQL_ENSURE(it != PendingKeysByReadId.end()); + auto pendingKeysIt = PendingKeysByReadId.find(record.GetReadId()); + YQL_ENSURE(pendingKeysIt != PendingKeysByReadId.end()); - ReadResults.emplace_back(std::move(result)); + for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) { + const auto& row = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow); + // result can contain fewer columns because of system columns + YQL_ENSURE(row.size() <= ReadColumns.size(), "Result columns mismatch"); + + std::vector<TCell> joinKeyCells(LookupKeyColumns.size()); + for (size_t joinKeyColumn = 0; joinKeyColumn < LookupKeyColumns.size(); ++joinKeyColumn) { + auto columnIt = ReadColumns.find(LookupKeyColumns[joinKeyColumn]->Name); + YQL_ENSURE(columnIt != ReadColumns.end()); + joinKeyCells[LookupKeyColumns[joinKeyColumn]->KeyOrder] = row[std::distance(ReadColumns.begin(), columnIt)]; + } + + auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells); + YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end()); + + if (Settings.GetLookupStrategy() == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second.RightRowExist) { + // semi join should return one result row per key + continue; + } + + TReadResultStats rowStats; + auto resultRow = TryBuildResultRow(leftRowIt->second, row, rowStats, result.ShardId); + YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo)); + ResultRowsBySeqNo[leftRowIt->second.SeqNo].Rows.emplace_back(std::move(resultRow), std::move(rowStats)); + } + + if (record.GetFinished()) { + for (const auto& key : pendingKeysIt->second) { + auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(key)); + if (leftRowIt != PendingLeftRowsByKey.end()) { + leftRowIt->second.PendingReads.erase(record.GetReadId()); + + // row is considered processed when all reads are finished + // and at least one right row is found + const bool leftRowProcessed = leftRowIt->second.PendingReads.empty() + && leftRowIt->second.RightRowExist; + if (leftRowProcessed) { + YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo)); + ResultRowsBySeqNo[leftRowIt->second.SeqNo].Completed = true; + PendingLeftRowsByKey.erase(leftRowIt); + } + } + } + + PendingKeysByReadId.erase(pendingKeysIt); + } } bool AllRowsProcessed() final { return UnprocessedRows.empty() && UnprocessedKeys.empty() && PendingKeysByReadId.empty() - && ReadResults.empty() + && ResultRowsBySeqNo.empty() && PendingLeftRowsByKey.empty(); } @@ -724,85 +769,62 @@ public: bool sizeLimitExceeded = false; batch.clear(); - while (!ReadResults.empty() && !sizeLimitExceeded) { - auto& result = ReadResults.front(); - - for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) { - const auto& row = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow); - YQL_ENSURE(row.size() <= ReadColumns.size(), "Result columns mismatch"); - - std::vector<TCell> joinKeyCells(LookupKeyColumns.size()); - for (size_t joinKeyIdx = 0; joinKeyIdx < LookupKeyColumns.size(); ++joinKeyIdx) { - auto it = ReadColumns.find(LookupKeyColumns[joinKeyIdx]->Name); - YQL_ENSURE(it != ReadColumns.end()); - joinKeyCells[LookupKeyColumns[joinKeyIdx]->KeyOrder] = row[std::distance(ReadColumns.begin(), it)]; - } - - auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells); - YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end()); - - if (Strategy == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second.RightRowExist) { - // Semi join should return one result row per key - continue; - } + // we should process left rows that haven't matches on the right + for (auto leftRowIt = PendingLeftRowsByKey.begin(); leftRowIt != PendingLeftRowsByKey.end();) { + const bool leftRowShouldBeProcessed = leftRowIt->second.PendingReads.empty() + && !leftRowIt->second.RightRowExist; + if (leftRowShouldBeProcessed) { TReadResultStats rowStats; - i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount; - auto resultRow = TryBuildResultRow(leftRowIt->second, row, rowStats, availableSpace, result.ShardId); + auto resultRow = TryBuildResultRow(leftRowIt->second, {}, rowStats); + YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo)); + auto& result = ResultRowsBySeqNo[leftRowIt->second.SeqNo]; + result.Rows.emplace_back(std::move(resultRow), std::move(rowStats)); + result.Completed = true; + PendingLeftRowsByKey.erase(leftRowIt++); + } else { + ++leftRowIt; + } + } - if (!resultRow.HasValue()) { - sizeLimitExceeded = true; - break; - } + auto getNextResult = [&]() { + if (!ShoulKeepRowsOrder()) { + return ResultRowsBySeqNo.begin(); + } - batch.push_back(std::move(resultRow)); - resultStats.Add(rowStats); + return ResultRowsBySeqNo.find(CurrentResultSeqNo); + }; + + while (!sizeLimitExceeded) { + auto resultIt = getNextResult(); + if (resultIt == ResultRowsBySeqNo.end()) { + break; } - if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) { - if (result.ReadResult->Get()->Record.GetFinished()) { - auto it = PendingKeysByReadId.find(result.ReadResult->Get()->Record.GetReadId()); - YQL_ENSURE(it != PendingKeysByReadId.end()); - - for (const auto& range : it->second) { - auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(range)); - if (leftRowIt != PendingLeftRowsByKey.end()) { - leftRowIt->second.PendingReads.erase(result.ReadResult->Get()->Record.GetReadId()); - - const bool leftRowCanBeDeleted = leftRowIt->second.PendingReads.empty() - && leftRowIt->second.RightRowExist; - if (leftRowCanBeDeleted) { - PendingLeftRowsByKey.erase(leftRowIt); - } - } - } + auto& result = resultIt->second; + for (; result.FirstUnprocessedRow < result.Rows.size(); ++result.FirstUnprocessedRow) { + auto& row = result.Rows[result.FirstUnprocessedRow]; - PendingKeysByReadId.erase(it); + if (resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) { + sizeLimitExceeded = true; + break; } - ReadResults.pop_front(); + batch.emplace_back(std::move(row.Data)); + resultStats.Add(row.Stats); } - } - if (!sizeLimitExceeded) { - for (auto leftRowIt = PendingLeftRowsByKey.begin(); leftRowIt != PendingLeftRowsByKey.end();) { - const bool leftRowCanBeSent = leftRowIt->second.PendingReads.empty() - && !leftRowIt->second.RightRowExist; - - if (leftRowCanBeSent) { - TReadResultStats rowStats; - i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount; - auto resultRow = TryBuildResultRow(leftRowIt->second, {}, rowStats, availableSpace); - - if (!resultRow.HasValue()) { + if (result.FirstUnprocessedRow == result.Rows.size()) { + if (ShoulKeepRowsOrder()) { + // we can increment seqNo only if current result is completed + if (result.Completed) { + ResultRowsBySeqNo.erase(resultIt); + ++CurrentResultSeqNo; + } else { break; } - - batch.push_back(std::move(resultRow)); - resultStats.Add(rowStats); - PendingLeftRowsByKey.erase(leftRowIt++); } else { - ++leftRowIt; + ResultRowsBySeqNo.erase(resultIt); } } } @@ -813,17 +835,43 @@ public: ~TKqpJoinRows() { UnprocessedRows.clear(); PendingLeftRowsByKey.clear(); + ResultRowsBySeqNo.clear(); } private: struct TLeftRowInfo { - TLeftRowInfo(NUdf::TUnboxedValue row) : Row(std::move(row)) { + TLeftRowInfo(NUdf::TUnboxedValue row, ui64 seqNo) : Row(std::move(row)), SeqNo(seqNo) { } NUdf::TUnboxedValue Row; std::unordered_set<ui64> PendingReads; bool RightRowExist = false; + const ui64 SeqNo; + }; + + struct TResultBatch { + struct TResultRow { + NUdf::TUnboxedValue Data; + TReadResultStats Stats; + }; + + std::vector<TResultRow> Rows; + ui32 FirstUnprocessedRow = 0; + bool Completed = false; }; + bool ShoulKeepRowsOrder() const { + return Settings.HasKeepRowsOrder() && Settings.GetKeepRowsOrder(); + } + + bool IsRowSeqNoValid(const ui64& seqNo) const { + if (!ShoulKeepRowsOrder()) { + return true; + } + + // we should check row seqNo only if we need to keep the order + return seqNo >= CurrentResultSeqNo; + } + void FillReadRequest(ui64 readId, THolder<TEvDataShard::TEvRead>& request, const std::vector<TOwnedTableRange>& ranges) { auto& record = request->Record; @@ -887,7 +935,7 @@ private: } NUdf::TUnboxedValue TryBuildResultRow(TLeftRowInfo& leftRowInfo, TConstArrayRef<TCell> rightRow, - TReadResultStats& rowStats, i64 freeSpace, TMaybe<ui64> shardId = {}) { + TReadResultStats& rowStats, TMaybe<ui64> shardId = {}) { NUdf::TUnboxedValue* resultRowItems = nullptr; auto resultRow = HolderFactory.CreateDirectArrayHolder(2, resultRowItems); @@ -938,12 +986,6 @@ private: rowStats.ResultRowsCount += 1; rowStats.ResultBytesCount += leftRowSize + rightRowSize; - if (rowStats.ResultBytesCount > (ui64)freeSpace) { - resultRow.DeleteUnreferenced(); - rowStats.Clear(); - return NUdf::TUnboxedValuePod(); - } - return resultRow; } @@ -953,7 +995,9 @@ private: std::deque<TOwnedTableRange> UnprocessedKeys; std::unordered_map<ui64, std::vector<TOwnedTableRange>> PendingKeysByReadId; absl::flat_hash_map<TOwnedCellVec, TLeftRowInfo, TKeyHash, TKeyEq> PendingLeftRowsByKey; - std::deque<TShardReadResult> ReadResults; + std::unordered_map<ui64, TResultBatch> ResultRowsBySeqNo; + ui64 InputRowSeqNo = 0; + ui64 CurrentResultSeqNo = 0; }; std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h index e7129f2e04..e4aad8652e 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h @@ -63,6 +63,7 @@ public: virtual void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) = 0; protected: + const NKikimrKqp::TKqpStreamLookupSettings Settings; const NMiniKQL::TTypeEnvironment& TypeEnv; const NMiniKQL::THolderFactory& HolderFactory; const NYql::NDqProto::TTaskInput& InputDesc; @@ -71,7 +72,6 @@ protected: std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns; std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns; std::vector<TSysTables::TTableColumnInfo> Columns; - const NKqpProto::EStreamLookupStrategy Strategy; }; std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, diff --git a/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp b/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp index fb60e4508d..4c7f59dd50 100644 --- a/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp @@ -736,4 +736,4 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) { } } -}
\ No newline at end of file +} diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index fe1293c094..2ce495241b 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -748,6 +748,7 @@ message TKqpStreamLookupSettings { optional NKqpProto.EStreamLookupStrategy LookupStrategy = 8; optional bool AllowInconsistentReads = 9 [default = false]; optional uint32 LockNodeId = 10; + optional bool KeepRowsOrder = 11; } message TKqpSequencerSettings { diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 8c237161a2..5a52ed188a 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -287,6 +287,7 @@ message TKqpPhyCnStreamLookup { bytes LookupKeysType = 4; bytes ResultType = 5; EStreamLookupStrategy LookupStrategy = 6; + bool KeepRowsOrder = 7; } message TKqpPhyCnSequencer { |