diff options
author | Maksim Zinal <zinal@ydb.tech> | 2025-05-23 12:41:24 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-23 12:41:24 +0300 |
commit | a536360eb0b96a2bb2730e42f523ae060a8a058c (patch) | |
tree | dbbd3220048cfccdc65bf95bbcbeaa3a7be098cb | |
parent | f6235158936735d9cefe90f1ca7354e80d0a9014 (diff) | |
download | ydb-a536360eb0b96a2bb2730e42f523ae060a8a058c.tar.gz |
stream lookup: GetRangePartitioning() optimization (#13180)
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp | 34 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.h | 6 |
2 files changed, 28 insertions, 12 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index 69c02cbf953..32eea7ec74e 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -19,10 +19,26 @@ std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpSt YQL_ENSURE(partitionInfo); + // Binary search of the index to start with. + size_t idxStart = 0; + size_t idxFinish = partitionInfo->size(); + while ((idxFinish - idxStart) > 1) { + size_t idxCur = (idxFinish + idxStart) / 2; + const auto& partCur = (*partitionInfo)[idxCur].Range->EndKeyPrefix.GetCells(); + YQL_ENSURE(partCur.size() <= keyColumnTypes.size()); + int cmp = CompareTypedCellVectors(partCur.data(), range.From.data(), keyColumnTypes.data(), + std::min(partCur.size(), range.From.size())); + if (cmp < 0) { + idxStart = idxCur; + } else { + idxFinish = idxCur; + } + } + std::vector<TCell> minusInf(keyColumnTypes.size()); std::vector<std::pair<ui64, TOwnedTableRange>> rangePartition; - for (size_t idx = 0; idx < partitionInfo->size(); ++idx) { + for (size_t idx = idxStart; idx < partitionInfo->size(); ++idx) { TTableRange partitionRange{ idx == 0 ? minusInf : (*partitionInfo)[idx - 1].Range->EndKeyPrefix.GetCells(), idx == 0 ? true : !(*partitionInfo)[idx - 1].Range->IsInclusive, @@ -110,6 +126,12 @@ TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSetti column.GetTypeInfo().GetPgTypeMod() }); } + + KeyColumnTypes.resize(KeyColumns.size()); + for (const auto& [_, columnInfo] : KeyColumns) { + YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(KeyColumnTypes.size())); + KeyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType; + } } TKqpStreamLookupWorker::~TKqpStreamLookupWorker() { @@ -123,16 +145,6 @@ TTableId TKqpStreamLookupWorker::GetTableId() const { return TableId; } -std::vector<NScheme::TTypeInfo> TKqpStreamLookupWorker::GetKeyColumnTypes() const { - std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size()); - for (const auto& [_, columnInfo] : KeyColumns) { - YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(keyColumnTypes.size())); - keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType; - } - - return keyColumnTypes; -} - class TKqpLookupRows : public TKqpStreamLookupWorker { public: TKqpLookupRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h index e4aad8652e3..1a80c8bbee1 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h @@ -51,7 +51,10 @@ public: virtual std::string GetTablePath() const; virtual TTableId GetTableId() const; - virtual std::vector<NScheme::TTypeInfo> GetKeyColumnTypes() const; + + const std::vector<NScheme::TTypeInfo>& GetKeyColumnTypes() const { + return KeyColumnTypes; + } virtual void AddInputRow(NUdf::TUnboxedValue inputRow) = 0; virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery, @@ -72,6 +75,7 @@ protected: std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns; std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns; std::vector<TSysTables::TTableColumnInfo> Columns; + std::vector<NScheme::TTypeInfo> KeyColumnTypes; }; std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, |