aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMikhail Surin <ssmike@ydb.tech>2024-12-03 13:37:02 +0300
committerGitHub <noreply@github.com>2024-12-03 13:37:02 +0300
commita349c79bba9bd87a88b25535d090244811bf70a6 (patch)
tree6eb06c2fc1878e621da051bdcfefca186bf11bca
parent9a920b4c8acf65115fc284af9ff8df7428e22af9 (diff)
downloadydb-a349c79bba9bd87a88b25535d090244811bf70a6.tar.gz
allow to keep rows order for stream lookup join (#12233)
Co-authored-by: ulya-sidorina <yulia@ydb.tech> Co-authored-by: Pavel Velikhov <pavelvelikhov@ydb.tech>
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp1
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_settings.cpp5
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_settings.h3
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp1
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp1
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp222
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.h2
-rw-r--r--ydb/core/kqp/ut/join/kqp_join_order_ut.cpp2
-rw-r--r--ydb/core/protos/kqp.proto1
-rw-r--r--ydb/core/protos/kqp_physical.proto1
10 files changed, 147 insertions, 92 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index fbef481428..ec171c1c0b 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -419,6 +419,7 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
}
settings->SetLookupStrategy(streamLookup.GetLookupStrategy());
+ settings->SetKeepRowsOrder(streamLookup.GetKeepRowsOrder());
TTransform streamLookupTransform;
streamLookupTransform.Type = "StreamLookupInputTransformer";
diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.cpp b/ydb/core/kqp/provider/yql_kikimr_settings.cpp
index 65d960ee45..da4f7733d6 100644
--- a/ydb/core/kqp/provider/yql_kikimr_settings.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_settings.cpp
@@ -87,6 +87,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
REGISTER_SETTING(*this, OptimizerHints).Parser([](const TString& v) { return NYql::TOptimizerHints::Parse(v); });
REGISTER_SETTING(*this, OverridePlanner);
REGISTER_SETTING(*this, UseGraceJoinCoreForMap);
+ REGISTER_SETTING(*this, EnableOrderPreservingLookupJoin);
REGISTER_SETTING(*this, OptUseFinalizeByKey);
REGISTER_SETTING(*this, CostBasedOptimizationLevel);
@@ -121,6 +122,10 @@ bool TKikimrSettings::SpillingEnabled() const {
return GetFlagValue(_KqpEnableSpilling.Get());
}
+bool TKikimrSettings::OrderPreservingLookupJoinEnabled() const {
+ return GetFlagValue(EnableOrderPreservingLookupJoin.Get());
+}
+
bool TKikimrSettings::DisableLlvmForUdfStages() const {
return GetFlagValue(_KqpDisableLlvmForUdfStages.Get());
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h
index 28bab1b62a..9d81e3f2fa 100644
--- a/ydb/core/kqp/provider/yql_kikimr_settings.h
+++ b/ydb/core/kqp/provider/yql_kikimr_settings.h
@@ -55,6 +55,7 @@ struct TKikimrSettings {
NCommon::TConfSetting<ui64, false> EnableSpillingNodes;
NCommon::TConfSetting<TString, false> OverridePlanner;
NCommon::TConfSetting<bool, false> UseGraceJoinCoreForMap;
+ NCommon::TConfSetting<bool, false> EnableOrderPreservingLookupJoin;
NCommon::TConfSetting<TString, false> OptOverrideStatistics;
NCommon::TConfSetting<NYql::TOptimizerHints, false> OptimizerHints;
@@ -91,7 +92,7 @@ struct TKikimrSettings {
bool HasOptEnableOlapProvideComputeSharding() const;
bool HasOptUseFinalizeByKey() const;
bool HasMaxSequentialReadsInFlight() const;
-
+ bool OrderPreservingLookupJoinEnabled() const;
EOptionalFlag GetOptPredicateExtract() const;
EOptionalFlag GetUseLlvm() const;
NDq::EHashJoinMode GetHashJoinMode() const;
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index b0354be230..cc69404c02 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -1383,6 +1383,7 @@ private:
YQL_ENSURE(streamLookup.LookupStrategy().Maybe<TCoAtom>());
TString lookupStrategy = streamLookup.LookupStrategy().Maybe<TCoAtom>().Cast().StringValue();
streamLookupProto.SetLookupStrategy(GetStreamLookupStrategy(lookupStrategy));
+ streamLookupProto.SetKeepRowsOrder(Config->OrderPreservingLookupJoinEnabled());
switch (streamLookupProto.GetLookupStrategy()) {
case NKqpProto::EStreamLookupStrategy::LOOKUP: {
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index bf917fbb17..30f7d86d50 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -400,6 +400,7 @@ private:
}
}
+ auto guard = BindAllocator();
StreamLookupWorker->AddResult(TKqpStreamLookupWorker::TShardReadResult{
read.ShardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())
});
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
index 0a5eae5940..54b88c2f97 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
@@ -131,16 +131,16 @@ struct TKeyEq {
TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
const NYql::NDqProto::TTaskInput& inputDesc)
- : TypeEnv(typeEnv)
+ : Settings(std::move(settings))
+ , TypeEnv(typeEnv)
, HolderFactory(holderFactory)
, InputDesc(inputDesc)
- , TablePath(settings.GetTable().GetPath())
- , TableId(MakeTableId(settings.GetTable()))
- , Strategy(settings.GetLookupStrategy()) {
+ , TablePath(Settings.GetTable().GetPath())
+ , TableId(MakeTableId(Settings.GetTable())) {
- KeyColumns.reserve(settings.GetKeyColumns().size());
+ KeyColumns.reserve(Settings.GetKeyColumns().size());
i32 keyOrder = 0;
- for (const auto& keyColumn : settings.GetKeyColumns()) {
+ for (const auto& keyColumn : Settings.GetKeyColumns()) {
NScheme::TTypeInfo typeInfo = NScheme::TypeInfoFromProto(keyColumn.GetTypeId(), keyColumn.GetTypeInfo());
KeyColumns.emplace(
@@ -155,15 +155,15 @@ TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSetti
);
}
- LookupKeyColumns.reserve(settings.GetLookupKeyColumns().size());
- for (const auto& lookupKey : settings.GetLookupKeyColumns()) {
+ LookupKeyColumns.reserve(Settings.GetLookupKeyColumns().size());
+ for (const auto& lookupKey : Settings.GetLookupKeyColumns()) {
auto columnIt = KeyColumns.find(lookupKey);
YQL_ENSURE(columnIt != KeyColumns.end());
LookupKeyColumns.push_back(&columnIt->second);
}
- Columns.reserve(settings.GetColumns().size());
- for (const auto& column : settings.GetColumns()) {
+ Columns.reserve(Settings.GetColumns().size());
+ for (const auto& column : Settings.GetColumns()) {
NScheme::TTypeInfo typeInfo = NScheme::TypeInfoFromProto(column.GetTypeId(), column.GetTypeInfo());
Columns.emplace_back(TSysTables::TTableColumnInfo{
@@ -634,7 +634,7 @@ public:
}
}
- PendingLeftRowsByKey.insert(std::make_pair(std::move(joinKey), TLeftRowInfo{std::move(leftData)}));
+ PendingLeftRowsByKey.insert(std::make_pair(std::move(joinKey), TLeftRowInfo{std::move(leftData), InputRowSeqNo++}));
}
std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>> requests;
@@ -675,17 +675,62 @@ public:
const auto& record = result.ReadResult->Get()->Record;
YQL_ENSURE(record.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS);
- auto it = PendingKeysByReadId.find(record.GetReadId());
- YQL_ENSURE(it != PendingKeysByReadId.end());
+ auto pendingKeysIt = PendingKeysByReadId.find(record.GetReadId());
+ YQL_ENSURE(pendingKeysIt != PendingKeysByReadId.end());
- ReadResults.emplace_back(std::move(result));
+ for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
+ const auto& row = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow);
+ // result can contain fewer columns because of system columns
+ YQL_ENSURE(row.size() <= ReadColumns.size(), "Result columns mismatch");
+
+ std::vector<TCell> joinKeyCells(LookupKeyColumns.size());
+ for (size_t joinKeyColumn = 0; joinKeyColumn < LookupKeyColumns.size(); ++joinKeyColumn) {
+ auto columnIt = ReadColumns.find(LookupKeyColumns[joinKeyColumn]->Name);
+ YQL_ENSURE(columnIt != ReadColumns.end());
+ joinKeyCells[LookupKeyColumns[joinKeyColumn]->KeyOrder] = row[std::distance(ReadColumns.begin(), columnIt)];
+ }
+
+ auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells);
+ YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
+
+ if (Settings.GetLookupStrategy() == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second.RightRowExist) {
+ // semi join should return one result row per key
+ continue;
+ }
+
+ TReadResultStats rowStats;
+ auto resultRow = TryBuildResultRow(leftRowIt->second, row, rowStats, result.ShardId);
+ YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo));
+ ResultRowsBySeqNo[leftRowIt->second.SeqNo].Rows.emplace_back(std::move(resultRow), std::move(rowStats));
+ }
+
+ if (record.GetFinished()) {
+ for (const auto& key : pendingKeysIt->second) {
+ auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(key));
+ if (leftRowIt != PendingLeftRowsByKey.end()) {
+ leftRowIt->second.PendingReads.erase(record.GetReadId());
+
+ // row is considered processed when all reads are finished
+ // and at least one right row is found
+ const bool leftRowProcessed = leftRowIt->second.PendingReads.empty()
+ && leftRowIt->second.RightRowExist;
+ if (leftRowProcessed) {
+ YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo));
+ ResultRowsBySeqNo[leftRowIt->second.SeqNo].Completed = true;
+ PendingLeftRowsByKey.erase(leftRowIt);
+ }
+ }
+ }
+
+ PendingKeysByReadId.erase(pendingKeysIt);
+ }
}
bool AllRowsProcessed() final {
return UnprocessedRows.empty()
&& UnprocessedKeys.empty()
&& PendingKeysByReadId.empty()
- && ReadResults.empty()
+ && ResultRowsBySeqNo.empty()
&& PendingLeftRowsByKey.empty();
}
@@ -724,85 +769,62 @@ public:
bool sizeLimitExceeded = false;
batch.clear();
- while (!ReadResults.empty() && !sizeLimitExceeded) {
- auto& result = ReadResults.front();
-
- for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
- const auto& row = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow);
- YQL_ENSURE(row.size() <= ReadColumns.size(), "Result columns mismatch");
-
- std::vector<TCell> joinKeyCells(LookupKeyColumns.size());
- for (size_t joinKeyIdx = 0; joinKeyIdx < LookupKeyColumns.size(); ++joinKeyIdx) {
- auto it = ReadColumns.find(LookupKeyColumns[joinKeyIdx]->Name);
- YQL_ENSURE(it != ReadColumns.end());
- joinKeyCells[LookupKeyColumns[joinKeyIdx]->KeyOrder] = row[std::distance(ReadColumns.begin(), it)];
- }
-
- auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells);
- YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
-
- if (Strategy == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second.RightRowExist) {
- // Semi join should return one result row per key
- continue;
- }
+ // we should process left rows that haven't matches on the right
+ for (auto leftRowIt = PendingLeftRowsByKey.begin(); leftRowIt != PendingLeftRowsByKey.end();) {
+ const bool leftRowShouldBeProcessed = leftRowIt->second.PendingReads.empty()
+ && !leftRowIt->second.RightRowExist;
+ if (leftRowShouldBeProcessed) {
TReadResultStats rowStats;
- i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount;
- auto resultRow = TryBuildResultRow(leftRowIt->second, row, rowStats, availableSpace, result.ShardId);
+ auto resultRow = TryBuildResultRow(leftRowIt->second, {}, rowStats);
+ YQL_ENSURE(IsRowSeqNoValid(leftRowIt->second.SeqNo));
+ auto& result = ResultRowsBySeqNo[leftRowIt->second.SeqNo];
+ result.Rows.emplace_back(std::move(resultRow), std::move(rowStats));
+ result.Completed = true;
+ PendingLeftRowsByKey.erase(leftRowIt++);
+ } else {
+ ++leftRowIt;
+ }
+ }
- if (!resultRow.HasValue()) {
- sizeLimitExceeded = true;
- break;
- }
+ auto getNextResult = [&]() {
+ if (!ShoulKeepRowsOrder()) {
+ return ResultRowsBySeqNo.begin();
+ }
- batch.push_back(std::move(resultRow));
- resultStats.Add(rowStats);
+ return ResultRowsBySeqNo.find(CurrentResultSeqNo);
+ };
+
+ while (!sizeLimitExceeded) {
+ auto resultIt = getNextResult();
+ if (resultIt == ResultRowsBySeqNo.end()) {
+ break;
}
- if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
- if (result.ReadResult->Get()->Record.GetFinished()) {
- auto it = PendingKeysByReadId.find(result.ReadResult->Get()->Record.GetReadId());
- YQL_ENSURE(it != PendingKeysByReadId.end());
-
- for (const auto& range : it->second) {
- auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(range));
- if (leftRowIt != PendingLeftRowsByKey.end()) {
- leftRowIt->second.PendingReads.erase(result.ReadResult->Get()->Record.GetReadId());
-
- const bool leftRowCanBeDeleted = leftRowIt->second.PendingReads.empty()
- && leftRowIt->second.RightRowExist;
- if (leftRowCanBeDeleted) {
- PendingLeftRowsByKey.erase(leftRowIt);
- }
- }
- }
+ auto& result = resultIt->second;
+ for (; result.FirstUnprocessedRow < result.Rows.size(); ++result.FirstUnprocessedRow) {
+ auto& row = result.Rows[result.FirstUnprocessedRow];
- PendingKeysByReadId.erase(it);
+ if (resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
+ sizeLimitExceeded = true;
+ break;
}
- ReadResults.pop_front();
+ batch.emplace_back(std::move(row.Data));
+ resultStats.Add(row.Stats);
}
- }
- if (!sizeLimitExceeded) {
- for (auto leftRowIt = PendingLeftRowsByKey.begin(); leftRowIt != PendingLeftRowsByKey.end();) {
- const bool leftRowCanBeSent = leftRowIt->second.PendingReads.empty()
- && !leftRowIt->second.RightRowExist;
-
- if (leftRowCanBeSent) {
- TReadResultStats rowStats;
- i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount;
- auto resultRow = TryBuildResultRow(leftRowIt->second, {}, rowStats, availableSpace);
-
- if (!resultRow.HasValue()) {
+ if (result.FirstUnprocessedRow == result.Rows.size()) {
+ if (ShoulKeepRowsOrder()) {
+ // we can increment seqNo only if current result is completed
+ if (result.Completed) {
+ ResultRowsBySeqNo.erase(resultIt);
+ ++CurrentResultSeqNo;
+ } else {
break;
}
-
- batch.push_back(std::move(resultRow));
- resultStats.Add(rowStats);
- PendingLeftRowsByKey.erase(leftRowIt++);
} else {
- ++leftRowIt;
+ ResultRowsBySeqNo.erase(resultIt);
}
}
}
@@ -813,17 +835,43 @@ public:
~TKqpJoinRows() {
UnprocessedRows.clear();
PendingLeftRowsByKey.clear();
+ ResultRowsBySeqNo.clear();
}
private:
struct TLeftRowInfo {
- TLeftRowInfo(NUdf::TUnboxedValue row) : Row(std::move(row)) {
+ TLeftRowInfo(NUdf::TUnboxedValue row, ui64 seqNo) : Row(std::move(row)), SeqNo(seqNo) {
}
NUdf::TUnboxedValue Row;
std::unordered_set<ui64> PendingReads;
bool RightRowExist = false;
+ const ui64 SeqNo;
+ };
+
+ struct TResultBatch {
+ struct TResultRow {
+ NUdf::TUnboxedValue Data;
+ TReadResultStats Stats;
+ };
+
+ std::vector<TResultRow> Rows;
+ ui32 FirstUnprocessedRow = 0;
+ bool Completed = false;
};
+ bool ShoulKeepRowsOrder() const {
+ return Settings.HasKeepRowsOrder() && Settings.GetKeepRowsOrder();
+ }
+
+ bool IsRowSeqNoValid(const ui64& seqNo) const {
+ if (!ShoulKeepRowsOrder()) {
+ return true;
+ }
+
+ // we should check row seqNo only if we need to keep the order
+ return seqNo >= CurrentResultSeqNo;
+ }
+
void FillReadRequest(ui64 readId, THolder<TEvDataShard::TEvRead>& request, const std::vector<TOwnedTableRange>& ranges) {
auto& record = request->Record;
@@ -887,7 +935,7 @@ private:
}
NUdf::TUnboxedValue TryBuildResultRow(TLeftRowInfo& leftRowInfo, TConstArrayRef<TCell> rightRow,
- TReadResultStats& rowStats, i64 freeSpace, TMaybe<ui64> shardId = {}) {
+ TReadResultStats& rowStats, TMaybe<ui64> shardId = {}) {
NUdf::TUnboxedValue* resultRowItems = nullptr;
auto resultRow = HolderFactory.CreateDirectArrayHolder(2, resultRowItems);
@@ -938,12 +986,6 @@ private:
rowStats.ResultRowsCount += 1;
rowStats.ResultBytesCount += leftRowSize + rightRowSize;
- if (rowStats.ResultBytesCount > (ui64)freeSpace) {
- resultRow.DeleteUnreferenced();
- rowStats.Clear();
- return NUdf::TUnboxedValuePod();
- }
-
return resultRow;
}
@@ -953,7 +995,9 @@ private:
std::deque<TOwnedTableRange> UnprocessedKeys;
std::unordered_map<ui64, std::vector<TOwnedTableRange>> PendingKeysByReadId;
absl::flat_hash_map<TOwnedCellVec, TLeftRowInfo, TKeyHash, TKeyEq> PendingLeftRowsByKey;
- std::deque<TShardReadResult> ReadResults;
+ std::unordered_map<ui64, TResultBatch> ResultRowsBySeqNo;
+ ui64 InputRowSeqNo = 0;
+ ui64 CurrentResultSeqNo = 0;
};
std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
index e7129f2e04..e4aad8652e 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
@@ -63,6 +63,7 @@ public:
virtual void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) = 0;
protected:
+ const NKikimrKqp::TKqpStreamLookupSettings Settings;
const NMiniKQL::TTypeEnvironment& TypeEnv;
const NMiniKQL::THolderFactory& HolderFactory;
const NYql::NDqProto::TTaskInput& InputDesc;
@@ -71,7 +72,6 @@ protected:
std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
std::vector<TSysTables::TTableColumnInfo> Columns;
- const NKqpProto::EStreamLookupStrategy Strategy;
};
std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
diff --git a/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp b/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp
index fb60e4508d..4c7f59dd50 100644
--- a/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp
+++ b/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp
@@ -736,4 +736,4 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
}
}
-} \ No newline at end of file
+}
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index fe1293c094..2ce495241b 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -748,6 +748,7 @@ message TKqpStreamLookupSettings {
optional NKqpProto.EStreamLookupStrategy LookupStrategy = 8;
optional bool AllowInconsistentReads = 9 [default = false];
optional uint32 LockNodeId = 10;
+ optional bool KeepRowsOrder = 11;
}
message TKqpSequencerSettings {
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 8c237161a2..5a52ed188a 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -287,6 +287,7 @@ message TKqpPhyCnStreamLookup {
bytes LookupKeysType = 4;
bytes ResultType = 5;
EStreamLookupStrategy LookupStrategy = 6;
+ bool KeepRowsOrder = 7;
}
message TKqpPhyCnSequencer {