aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaksim Zinal <zinal@ydb.tech>2025-05-23 12:41:24 +0300
committerGitHub <noreply@github.com>2025-05-23 12:41:24 +0300
commita536360eb0b96a2bb2730e42f523ae060a8a058c (patch)
treedbbd3220048cfccdc65bf95bbcbeaa3a7be098cb
parentf6235158936735d9cefe90f1ca7354e80d0a9014 (diff)
downloadydb-a536360eb0b96a2bb2730e42f523ae060a8a058c.tar.gz
stream lookup: GetRangePartitioning() optimization (#13180)
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp34
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.h6
2 files changed, 28 insertions, 12 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
index 69c02cbf953..32eea7ec74e 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
@@ -19,10 +19,26 @@ std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpSt
YQL_ENSURE(partitionInfo);
+ // Binary search of the index to start with.
+ size_t idxStart = 0;
+ size_t idxFinish = partitionInfo->size();
+ while ((idxFinish - idxStart) > 1) {
+ size_t idxCur = (idxFinish + idxStart) / 2;
+ const auto& partCur = (*partitionInfo)[idxCur].Range->EndKeyPrefix.GetCells();
+ YQL_ENSURE(partCur.size() <= keyColumnTypes.size());
+ int cmp = CompareTypedCellVectors(partCur.data(), range.From.data(), keyColumnTypes.data(),
+ std::min(partCur.size(), range.From.size()));
+ if (cmp < 0) {
+ idxStart = idxCur;
+ } else {
+ idxFinish = idxCur;
+ }
+ }
+
std::vector<TCell> minusInf(keyColumnTypes.size());
std::vector<std::pair<ui64, TOwnedTableRange>> rangePartition;
- for (size_t idx = 0; idx < partitionInfo->size(); ++idx) {
+ for (size_t idx = idxStart; idx < partitionInfo->size(); ++idx) {
TTableRange partitionRange{
idx == 0 ? minusInf : (*partitionInfo)[idx - 1].Range->EndKeyPrefix.GetCells(),
idx == 0 ? true : !(*partitionInfo)[idx - 1].Range->IsInclusive,
@@ -110,6 +126,12 @@ TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSetti
column.GetTypeInfo().GetPgTypeMod()
});
}
+
+ KeyColumnTypes.resize(KeyColumns.size());
+ for (const auto& [_, columnInfo] : KeyColumns) {
+ YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(KeyColumnTypes.size()));
+ KeyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType;
+ }
}
TKqpStreamLookupWorker::~TKqpStreamLookupWorker() {
@@ -123,16 +145,6 @@ TTableId TKqpStreamLookupWorker::GetTableId() const {
return TableId;
}
-std::vector<NScheme::TTypeInfo> TKqpStreamLookupWorker::GetKeyColumnTypes() const {
- std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size());
- for (const auto& [_, columnInfo] : KeyColumns) {
- YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(keyColumnTypes.size()));
- keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType;
- }
-
- return keyColumnTypes;
-}
-
class TKqpLookupRows : public TKqpStreamLookupWorker {
public:
TKqpLookupRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
index e4aad8652e3..1a80c8bbee1 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
@@ -51,7 +51,10 @@ public:
virtual std::string GetTablePath() const;
virtual TTableId GetTableId() const;
- virtual std::vector<NScheme::TTypeInfo> GetKeyColumnTypes() const;
+
+ const std::vector<NScheme::TTypeInfo>& GetKeyColumnTypes() const {
+ return KeyColumnTypes;
+ }
virtual void AddInputRow(NUdf::TUnboxedValue inputRow) = 0;
virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
@@ -72,6 +75,7 @@ protected:
std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
std::vector<TSysTables::TTableColumnInfo> Columns;
+ std::vector<NScheme::TTypeInfo> KeyColumnTypes;
};
std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,