diff options
| author | Vitalii Gridnev <[email protected]> | 2026-04-16 21:33:16 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-04-16 18:33:16 +0000 |
| commit | 6bc35e1e98cebdfdbd4c5fdba205da2aada2ea1b (patch) | |
| tree | eb80d8dd1833bc786dcd72349b7d553c7d6ea9b4 | |
| parent | 560ab811120265e459f4bd685c057deb3ff85fa6 (diff) | |
continue partitioning incapsulation (#38237)
| -rw-r--r-- | ydb/core/kqp/common/kqp_tx_manager.cpp | 6 | ||||
| -rw-r--r-- | ydb/core/kqp/common/kqp_tx_manager.h | 4 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/runtime/kqp_buffer_lookup_actor.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/runtime/kqp_full_text_source.cpp | 76 | ||||
| -rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp | 101 | ||||
| -rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.h | 3 | ||||
| -rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/runtime/kqp_write_table.cpp | 4 | ||||
| -rw-r--r-- | ydb/core/kqp/runtime/kqp_write_table.h | 2 | ||||
| -rw-r--r-- | ydb/core/scheme/scheme_tabledefs.cpp | 253 | ||||
| -rw-r--r-- | ydb/core/scheme/scheme_tabledefs.h | 14 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/range_ops.cpp | 195 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/range_ops.h | 2 |
15 files changed, 308 insertions, 360 deletions
diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp index 2c64324bb09..f057730712c 100644 --- a/ydb/core/kqp/common/kqp_tx_manager.cpp +++ b/ydb/core/kqp/common/kqp_tx_manager.cpp @@ -191,11 +191,11 @@ public: State = ETransactionState::ERROR; } - void SetPartitioning(const TTableId tableId, const std::shared_ptr<const TPartitioning>& partitioning) override { + void SetPartitioning(const TTableId tableId, const TPartitioning::TCPtr& partitioning) override { TablePartitioning[tableId] = partitioning; } - std::shared_ptr<const TPartitioning> GetPartitioning(const TTableId tableId) const override { + TPartitioning::TCPtr GetPartitioning(const TTableId tableId) const override { auto iterator = TablePartitioning.find(tableId); if (iterator != std::end(TablePartitioning)) { return iterator->second; @@ -679,7 +679,7 @@ private: THashSet<ui32> ParticipantNodes; - THashMap<TTableId, std::shared_ptr<const TPartitioning>> TablePartitioning; + THashMap<TTableId, TPartitioning::TCPtr> TablePartitioning; bool AllowVolatile = false; bool ReadOnly = true; diff --git a/ydb/core/kqp/common/kqp_tx_manager.h b/ydb/core/kqp/common/kqp_tx_manager.h index 6398e915a14..8f5c1724c1d 100644 --- a/ydb/core/kqp/common/kqp_tx_manager.h +++ b/ydb/core/kqp/common/kqp_tx_manager.h @@ -65,8 +65,8 @@ public: virtual void SetError(ui64 shardId) = 0; virtual void SetError() = 0; - virtual void SetPartitioning(const TTableId tableId, const std::shared_ptr<const TPartitioning>& partitioning) = 0; - virtual std::shared_ptr<const TPartitioning> GetPartitioning(const TTableId tableId) const = 0; + virtual void SetPartitioning(const TTableId tableId, const TPartitioning::TCPtr& partitioning) = 0; + virtual TPartitioning::TCPtr GetPartitioning(const TTableId tableId) const = 0; virtual void SetTopicOperations(NTopic::TTopicOperations&& topicOperations) = 0; virtual const NTopic::TTopicOperations& GetTopicOperations() const = 0; diff --git a/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp b/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp index 4ed4c181013..0ec0824cf84 100644 --- a/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp @@ -929,7 +929,7 @@ private: TVector<NScheme::TTypeInfo> KeyColumnTypes; THashMap<ui32, size_t> KeyColumnIdToPos; - std::shared_ptr<const TPartitioning> TablePartitioning; + TPartitioning::TCPtr TablePartitioning; THashMap<TPartitionIndex, TBatchPartitionInfo::TPtr> StartedPartitions; TPartitionIndex NextPartitionIndex = 0; diff --git a/ydb/core/kqp/runtime/kqp_buffer_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_buffer_lookup_actor.cpp index 2087ebd4a2b..388fc72051f 100644 --- a/ydb/core/kqp/runtime/kqp_buffer_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_buffer_lookup_actor.cpp @@ -699,7 +699,7 @@ public: private: TKqpBufferTableLookupSettings Settings; - std::shared_ptr<const TPartitioning> Partitioning; + TPartitioning::TCPtr Partitioning; const TString LogPrefix; TVector<NScheme::TTypeInfo> KeyColumnTypes; diff --git a/ydb/core/kqp/runtime/kqp_full_text_source.cpp b/ydb/core/kqp/runtime/kqp_full_text_source.cpp index 68f6055df96..accad952f2a 100644 --- a/ydb/core/kqp/runtime/kqp_full_text_source.cpp +++ b/ydb/core/kqp/runtime/kqp_full_text_source.cpp @@ -149,7 +149,7 @@ class TTableReader : public TAtomicRefCount<T> { TVector<NScheme::TTypeInfo> KeyColumnTypes; TVector<NScheme::TTypeInfo> ResultColumnTypes; TVector<i32> ResultColumnIds; - std::shared_ptr<const TPartitioning> PartitionInfo; + TPartitioning::TCPtr PartitionInfo; public: @@ -290,64 +290,8 @@ public: // Uses binary search on PartitionInfo boundaries to find the first partition // that may overlap, then walks forward collecting intersections until the // range is fully covered. - std::vector<std::pair<ui64, TTableRange>> GetRangePartitioning(const TTableRange& range) { - - YQL_ENSURE(PartitionInfo); - - const auto& partitions = PartitionInfo->GetTablePartitioning(); - - // Binary search of the index to start with. - size_t idxStart = 0; - size_t idxFinish = partitions.size(); - while ((idxFinish - idxStart) > 1) { - size_t idxCur = (idxFinish + idxStart) / 2; - const auto& partCur = partitions[idxCur].Range->EndKeyPrefix.GetCells(); - YQL_ENSURE(partCur.size() <= KeyColumnTypes.size()); - int cmp = CompareTypedCellVectors(partCur.data(), range.From.data(), KeyColumnTypes.data(), - std::min(partCur.size(), range.From.size())); - if (cmp < 0) { - idxStart = idxCur; - } else { - idxFinish = idxCur; - } - } - - std::vector<TCell> minusInf(KeyColumnTypes.size()); - - std::vector<std::pair<ui64, TTableRange>> rangePartition; - for (size_t idx = idxStart; idx < partitions.size(); ++idx) { - TTableRange partitionRange{ - idx == 0 ? minusInf : partitions[idx - 1].Range->EndKeyPrefix.GetCells(), - idx == 0 ? true : !partitions[idx - 1].Range->IsInclusive, - partitions[idx].Range->EndKeyPrefix.GetCells(), - partitions[idx].Range->IsInclusive - }; - - if (range.Point) { - int intersection = ComparePointAndRange( - range.From, - partitionRange, - KeyColumnTypes, - KeyColumnTypes); - - if (intersection == 0) { - rangePartition.emplace_back(partitions[idx].ShardId, range); - } else if (intersection < 0) { - break; - } - } else { - int intersection = CompareRanges(range, partitionRange, KeyColumnTypes); - - if (intersection == 0) { - auto rangeIntersection = Intersect(KeyColumnTypes, range, partitionRange); - rangePartition.emplace_back(partitions[idx].ShardId, rangeIntersection); - } else if (intersection < 0) { - break; - } - } - } - - return rangePartition; + std::vector<TPartitioning::TIntersection> GetRangePartitioning(const TTableRange& range) { + return PartitionInfo->GetIntersectionWithRange(KeyColumnTypes, range); } }; @@ -925,7 +869,7 @@ public: auto rangePartition = Reader->GetRangePartitioning(range); for(const auto& [shardId, range] : rangePartition) { - RangesToRead.emplace_back(shardId, range); + RangesToRead.emplace_back(shardId, std::move(range)); } } }; @@ -1972,22 +1916,22 @@ public: for (auto& info : infos) { auto ranges = reader->GetRangePartitioning(info->GetPoint()); YQL_ENSURE(ranges.size() == 1); - if (ranges[0].first != lastShard && lastShard != 0 && !allowMultipleShards) { + if (ranges[0].ShardId != lastShard && lastShard != 0 && !allowMultipleShards) { break; } prefixSize++; - lastShard = ranges[0].first; - auto& shardItems = inflightItems[ranges[0].first]; + lastShard = ranges[0].ShardId; + auto& shardItems = inflightItems[ranges[0].ShardId]; if (shardItems.ShardId == 0) { - shardItems.ShardId = ranges[0].first; + shardItems.ShardId = ranges[0].ShardId; } - YQL_ENSURE(shardItems.ShardId == ranges[0].first); + YQL_ENSURE(shardItems.ShardId == ranges[0].ShardId); if (shardItems.ReadId == 0) { shardItems.ReadId = ReadsState.GetNextReadId(); } - shardItems.Points.emplace_back(ranges[0].second); + shardItems.Points.emplace_back(TOwnedTableRange(ranges[0].TableRange)); shardItems.Items.emplace_back(info); } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 80bd953928b..c88c425fdc3 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -903,7 +903,7 @@ private: TReads Reads; bool SentResultsAvailable = false; NUdf::EFetchStatus LastFetchStatus = NUdf::EFetchStatus::Yield; - std::shared_ptr<const TPartitioning> Partitioning; + TPartitioning::TCPtr Partitioning; const TDuration SchemeCacheRequestTimeout; NActors::TActorId SchemeCacheRequestTimeoutTimer; TVector<NKikimrDataEvents::TLock> Locks; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index f7eb2b9002a..f84d761576c 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -41,66 +41,7 @@ TStreamLookupShardReadResult::~TStreamLookupShardReadResult() { } namespace { -std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpStreamLookupWorker::TPartitionInfo& partitionInfo, - const std::vector<NScheme::TTypeInfo>& keyColumnTypes, const TOwnedTableRange& range) { - YQL_ENSURE(partitionInfo); - - const auto& partitions = partitionInfo->GetTablePartitioning(); - - // Binary search of the index to start with. - size_t idxStart = 0; - size_t idxFinish = partitions.size(); - while ((idxFinish - idxStart) > 1) { - size_t idxCur = (idxFinish + idxStart) / 2; - const auto& partCur = partitions[idxCur].Range->EndKeyPrefix.GetCells(); - YQL_ENSURE(partCur.size() <= keyColumnTypes.size()); - int cmp = CompareTypedCellVectors(partCur.data(), range.From.data(), keyColumnTypes.data(), - std::min(partCur.size(), range.From.size())); - if (cmp < 0) { - idxStart = idxCur; - } else { - idxFinish = idxCur; - } - } - - std::vector<TCell> minusInf(keyColumnTypes.size()); - - std::vector<std::pair<ui64, TOwnedTableRange>> rangePartition; - for (size_t idx = idxStart; idx < partitions.size(); ++idx) { - TTableRange partitionRange{ - idx == 0 ? minusInf : partitions[idx - 1].Range->EndKeyPrefix.GetCells(), - idx == 0 ? true : !partitions[idx - 1].Range->IsInclusive, - partitions[idx].Range->EndKeyPrefix.GetCells(), - partitions[idx].Range->IsInclusive - }; - - if (range.Point) { - int intersection = ComparePointAndRange( - range.From, - partitionRange, - keyColumnTypes, - keyColumnTypes); - - if (intersection == 0) { - rangePartition.emplace_back(partitions[idx].ShardId, range); - } else if (intersection < 0) { - break; - } - } else { - int intersection = CompareRanges(range, partitionRange, keyColumnTypes); - - if (intersection == 0) { - auto rangeIntersection = Intersect(keyColumnTypes, range, partitionRange); - rangePartition.emplace_back(partitions[idx].ShardId, rangeIntersection); - } else if (intersection < 0) { - break; - } - } - } - - return rangePartition; -} struct TReadState { std::vector<TOwnedTableRange> PendingKeys; @@ -305,7 +246,7 @@ public: auto range = std::move(UnprocessedKeys.front()); UnprocessedKeys.pop_front(); - auto partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), range); + auto partitions = partitioning->GetIntersectionWithRange(GetKeyColumnTypes(), range); for (auto [shardId, range] : partitions) { if (range.Point) { pointsPerShard[shardId].push_back(std::move(range)); @@ -567,10 +508,21 @@ public: struct TUnprocessedLeftRow { TConstArrayRef<TCell> JoinKey; + TOwnedTableRange LookupKey; - explicit TUnprocessedLeftRow(TConstArrayRef<TCell> joinKey) - : JoinKey(std::move(joinKey)) - {} + explicit TUnprocessedLeftRow(TConstArrayRef<TCell> joinKey, size_t keySize) + : JoinKey(joinKey) + { + if (joinKey.size() < keySize) { + std::vector <TCell> fromCells(keySize - joinKey.size()); + fromCells.insert(fromCells.begin(), joinKey.begin(), joinKey.end()); + bool fromInclusive = true; + bool toInclusive = false; + LookupKey = TOwnedTableRange(fromCells, fromInclusive, joinKey, toInclusive); + } else { + LookupKey = TOwnedTableRange(joinKey); + } + } }; void AddInputRow(NUdf::TUnboxedValue inputRow) final { @@ -621,7 +573,7 @@ public: } else { auto [it, success] = PendingLeftRowsByKey.emplace(cellVec, TJoinKeyInfo(joinKeyId)); if (success) { - UnprocessedRows.emplace_back(TUnprocessedLeftRow(cellVec)); + UnprocessedRows.emplace_back(TUnprocessedLeftRow(cellVec, Settings.KeyColumns.size())); } it->second.ResultSeqNos.push_back(resultBatch); resultBatch->AddJoinKey(it->second.JoinKeyId); @@ -760,7 +712,7 @@ public: auto range = std::move(UnprocessedKeys.front()); UnprocessedKeys.pop_front(); - auto partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), range); + auto partitions = partitioning->GetIntersectionWithRange(GetKeyColumnTypes(), range); for (auto [shardId, range] : partitions) { YQL_ENSURE(PendingLeftRowsByKey.contains(ExtractKeyPrefix(range))); @@ -776,22 +728,7 @@ public: TUnprocessedLeftRow unprocessedRow = std::move(UnprocessedRows.front()); UnprocessedRows.pop_front(); bool hasRanges = false; - std::vector <std::pair<ui64, TOwnedTableRange>> partitions; - if (unprocessedRow.JoinKey.size() < Settings.KeyColumns.size()) { - // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) - std::vector <TCell> fromCells(Settings.KeyColumns.size() - unprocessedRow.JoinKey.size()); - fromCells.insert(fromCells.begin(), unprocessedRow.JoinKey.begin(), unprocessedRow.JoinKey.end()); - bool fromInclusive = true; - bool toInclusive = false; - - partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), - TOwnedTableRange(fromCells, fromInclusive, unprocessedRow.JoinKey, toInclusive) - ); - } else { - // full pk, build point - partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), TOwnedTableRange(unprocessedRow.JoinKey)); - } - + auto partitions = partitioning->GetIntersectionWithRange(GetKeyColumnTypes(), TTableRange(unprocessedRow.LookupKey)); for (auto[shardId, range] : partitions) { hasRanges = true; if (range.Point) { @@ -1205,7 +1142,7 @@ private: } } - TConstArrayRef<TCell> ExtractKeyPrefix(const TOwnedTableRange& range) { + TConstArrayRef<TCell> ExtractKeyPrefix(const TTableRange& range) { if (range.From.size() == Settings.LookupKeyColumns.size()) { return range.From; } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h index 91fa169164b..a4990986ea6 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h @@ -57,8 +57,7 @@ public: class TKqpStreamLookupWorker { public: using TReadList = std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>>; - using TPartitionInfo = std::shared_ptr<const TPartitioning>; - + using TPartitionInfo = TPartitioning::TCPtr; struct TReadResultStats { ui64 ReadRowsCount = 0; diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index ba6879815dd..8b8312555f3 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -1567,7 +1567,7 @@ private: IKqpTableWriterCallbacks* Callbacks; std::optional<NSchemeCache::TSchemeCacheNavigate::TEntry> SchemeEntry; - std::shared_ptr<const TPartitioning> Partitioning; + TPartitioning::TCPtr Partitioning; ui64 ResolveAttempts = 0; IKqpTransactionManagerPtr TxManager; diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index ac149b855d7..b426e3ea16d 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -1524,7 +1524,7 @@ public: } void OnPartitioningChanged( - const std::shared_ptr<const TPartitioning>& partitioning) override { + const TPartitioning::TCPtr& partitioning) override { IsOlap = false; Partitioning = partitioning; BeforePartitioningChanged(); @@ -1926,7 +1926,7 @@ private: std::vector<IShardedWriteController::TPendingShardInfo> ShardUpdates; std::optional<NSchemeCache::TSchemeCacheNavigate::TEntry> SchemeEntry; - std::shared_ptr<const TPartitioning> Partitioning; + TPartitioning::TCPtr Partitioning; std::optional<bool> IsOlap; }; diff --git a/ydb/core/kqp/runtime/kqp_write_table.h b/ydb/core/kqp/runtime/kqp_write_table.h index 2347f864353..951fcb69645 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.h +++ b/ydb/core/kqp/runtime/kqp_write_table.h @@ -143,7 +143,7 @@ public: virtual void OnPartitioningChanged( const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry) = 0; virtual void OnPartitioningChanged( - const std::shared_ptr<const TPartitioning>& partitioning) = 0; + const TPartitioning::TCPtr& partitioning) = 0; using TWriteToken = ui64; diff --git a/ydb/core/scheme/scheme_tabledefs.cpp b/ydb/core/scheme/scheme_tabledefs.cpp index 74dc9389a48..1bf7c487b5a 100644 --- a/ydb/core/scheme/scheme_tabledefs.cpp +++ b/ydb/core/scheme/scheme_tabledefs.cpp @@ -74,6 +74,259 @@ const char* TTableRange::IsAmbiguousReason(size_t keyColumnsCount) const noexcep return nullptr; } +NKikimr::TTableRange Intersect(TConstArrayRef<NScheme::TTypeInfo> types, const TTableRange& first, const TTableRange& second) +{ + // all variants + //================= + //-----------[aaa]--- + //----[bbb]---------- + //================= + //-----------[aaa]--- + //----[bbbbbb]------- + //================= + //-----------[aaa]--- + //----[bbbbbbbb]----- + //================= + //-----------[aaa]--- + //----[bbbbbbbbbb]--- + //================= + //---------[aaa]---- + //--[bbbbbbbbbbbbb]- + + //================= + //----[aaaaaa]---------- + //----[]---------- + //================= + //----[aaaaaa]---------- + //----[bbb]---------- + //================= + //----[aaaaaa]---------- + //----[bbbbbb]---------- + //================= + //----[aaaaaa]---------- + //----[bbbbbbbb]---------- + + + //================= + //-----[aaaaaaa]---------- + //-------[bbb]---------- + //================= + //-----[aaaaaaa]---------- + //-------[bbbbb]---------- + //================= + //-----[aaaaaaa]---------- + //-------[bbbbbbb]---------- + + //================= + //-----[aaa]---------- + //---------[]---------- + //================= + //-----[aaa]---------- + //---------[bbb]---------- + + + //================= + //-----[aaa]---------- + //------------[bbb]--- + + if (first.IsEmptyRange(types)) { + return first; + } + + if (second.IsEmptyRange(types)) { + return second; + } + + int cmpFF = CompareBorders<false, false>(first.From, + second.From, + first.InclusiveFrom, + second.InclusiveFrom, + types); + int cmpTT = CompareBorders<true, true>(first.To, + second.To, + first.InclusiveTo, + second.InclusiveTo, + types); + int cmpFT = CompareBorders<false, true>(first.From, + second.To, + first.InclusiveFrom, + second.InclusiveTo, + types); + int cmpTF = CompareBorders<true, false>(first.To, + second.From, + first.InclusiveTo, + second.InclusiveFrom, + types); + if (cmpFF < 0) + { + if (cmpTF < 0) { + //================= + //-----------[aaa]---------- + //----[bbb]---------- + return TTableRange(second.From, second.InclusiveFrom, + first.To, first.InclusiveTo); + } else if (cmpTF == 0) { + //================= + //-----------[aaa]---------- + //----[bbbbbb]---------- + return TTableRange(second.From, second.InclusiveFrom, + first.To, first.InclusiveTo); + } else { // if (cmpTF > 0) { + if (cmpTT < 0) { + //================= + //-----------[aaa]---------- + //----[bbbbbbbb]---------- + return TTableRange(second.From, second.InclusiveFrom, + first.To, first.InclusiveTo); + } else if (cmpTT == 0) { + //================= + //-----------[aaa]---------- + //----[bbbbbbbbbb]---------- + return TTableRange(second.From, second.InclusiveFrom, + second.To, second.InclusiveTo); + } else { // if (cmpTT > 0) { + //================= + //---------[aaa]---- + //--[bbbbbbbbbbbbb]- + return TTableRange(second.From, second.InclusiveFrom, + second.To, second.InclusiveTo); + } + } + } else if (cmpFF == 0) { + if (cmpTT < 0) { + if (cmpTF == 0) { + //================= + //----[aaaaaa]---------- + //----[]---------- + return TTableRange(second.From, second.InclusiveFrom, + first.To, first.InclusiveTo); + } else if (cmpTF > 0) { + //================= + //----[aaaaaa]---------- + //----[bbb]---------- + return TTableRange(second.From, second.InclusiveFrom, + first.To, first.InclusiveTo); + } else { // if (cmpTF < 0) + Y_ENSURE(false, "unreachable"); + } + } else if (cmpTT == 0) { + //================= + //----[aaaaaa]---------- + //----[bbbbbb]---------- + return TTableRange(second.From, second.InclusiveFrom, + first.To, first.InclusiveTo); + } else { // if (cmpTT > 0) + //================= + //----[aaaaaa]---------- + //----[bbbbbbbb]---------- + return TTableRange(second.From, second.InclusiveFrom, + second.To, second.InclusiveTo); + } + } else { //if (cmpFF > 0) { + if (cmpFT < 0) { + if (cmpTT < 0) { + //================= + //-----[aaaaaaa]---------- + //-------[bbb]---------- + return TTableRange(first.From, first.InclusiveFrom, + first.To, first.InclusiveTo); + } else if (cmpTT == 0) { + //================= + //-----[aaaaaaa]---------- + //-------[bbbbb]---------- + return TTableRange(first.From, first.InclusiveFrom, + first.To, first.InclusiveTo); + } else { //if (cmpTT > 0) { + //================= + //-----[aaaaaaa]---------- + //-------[bbbbbbb]---------- + return TTableRange(first.From, first.InclusiveFrom, + second.To, second.InclusiveTo); + } + } else if (cmpFT == 0) { + if (cmpTT == 0) { + //================= + //-----[aaa]---------- + //---------[]---------- + return TTableRange(first.From, first.InclusiveFrom, + first.To, first.InclusiveTo); + } else if (cmpTT > 0) { + //================= + //-----[aaa]---------- + //---------[bbb]---------- + return TTableRange(first.From, first.InclusiveFrom, + second.To, second.InclusiveTo); + } else { + // cmpTT < 0 + Y_ENSURE(false, "unreachable"); + } + } else { // if (cmpFT > 0) + //================= + //-----[aaa]---------- + //------------[bbb]--- + return TTableRange(first.From, first.InclusiveFrom, + second.To, second.InclusiveTo); + } + } +} + +std::vector<TPartitioning::TIntersection> TPartitioning::GetIntersectionWithRange(const std::vector<NScheme::TTypeInfo>& keyColumnTypes, const TTableRange& range) const +{ + const auto& partitions = Partitions; + // Binary search of the index to start with. + size_t idxStart = 0; + size_t idxFinish = partitions.size(); + while ((idxFinish - idxStart) > 1) { + size_t idxCur = (idxFinish + idxStart) / 2; + const auto& partCur = partitions[idxCur].Range->EndKeyPrefix.GetCells(); + Y_ENSURE(partCur.size() <= keyColumnTypes.size()); + int cmp = CompareTypedCellVectors(partCur.data(), range.From.data(), keyColumnTypes.data(), + std::min(partCur.size(), range.From.size())); + if (cmp < 0) { + idxStart = idxCur; + } else { + idxFinish = idxCur; + } + } + + std::vector<TCell> minusInf(keyColumnTypes.size()); + + std::vector<TPartitioning::TIntersection> rangePartition; + for (size_t idx = idxStart; idx < partitions.size(); ++idx) { + TTableRange partitionRange{ + idx == 0 ? minusInf : partitions[idx - 1].Range->EndKeyPrefix.GetCells(), + idx == 0 ? true : !partitions[idx - 1].Range->IsInclusive, + partitions[idx].Range->EndKeyPrefix.GetCells(), + partitions[idx].Range->IsInclusive + }; + + if (range.Point) { + int intersection = ComparePointAndRange( + range.From, + partitionRange, + keyColumnTypes, + keyColumnTypes); + + if (intersection == 0) { + rangePartition.emplace_back(partitions[idx].ShardId, TOwnedTableRange(range)); + } else if (intersection < 0) { + break; + } + } else { + int intersection = CompareRanges(range, partitionRange, keyColumnTypes); + + if (intersection == 0) { + auto rangeIntersection = Intersect(keyColumnTypes, range, partitionRange); + rangePartition.emplace_back(partitions[idx].ShardId, TOwnedTableRange(rangeIntersection)); + } else if (intersection < 0) { + break; + } + } + } + + return rangePartition; +} + bool TSerializedTableRange::IsEmpty(TConstArrayRef<NScheme::TTypeInfo> types) const { auto cmp = CompareBorders<true, false>(To.GetCells(), From.GetCells(), ToInclusive, FromInclusive, types); diff --git a/ydb/core/scheme/scheme_tabledefs.h b/ydb/core/scheme/scheme_tabledefs.h index 97a0c1c94d5..2f0ca09875c 100644 --- a/ydb/core/scheme/scheme_tabledefs.h +++ b/ydb/core/scheme/scheme_tabledefs.h @@ -726,13 +726,15 @@ public: static THolder<TKeyDesc> CreateMiniKeyDesc(const TVector<NScheme::TTypeInfo> &keyColumnTypes); private: - TKeyDesc(const TVector<NScheme::TTypeInfo> &keyColumnTypes); + TKeyDesc(const TVector<NScheme::TTypeInfo>& keyColumnTypes); }; class TPartitioning { public: TPartitioning() = default; + using TCPtr = std::shared_ptr<const TPartitioning>; + explicit TPartitioning(TVector<TKeyDesc::TPartitionInfo>&& partitions) : Partitions(std::move(partitions)) {} @@ -746,16 +748,26 @@ public: const_iterator begin() const { return Partitions.begin(); } const_iterator end() const { return Partitions.end(); } + struct TIntersection { + ui64 ShardId; + TOwnedTableRange TableRange; + }; + // Escape hatch: returns raw sorted vector. Grep for this method name to find // callers that need migration when the data structure changes. const TVector<TKeyDesc::TPartitionInfo>& GetTablePartitioning() const { return Partitions; } + std::vector<TIntersection> GetIntersectionWithRange(const std::vector<NScheme::TTypeInfo>& keyColumnTypes, const TTableRange& range) const; + private: TVector<TKeyDesc::TPartitionInfo> Partitions; }; + +TTableRange Intersect(TConstArrayRef<NScheme::TTypeInfo> types, const TTableRange& first, const TTableRange& second); + // Deferred inline definitions for TKeyDesc (require complete TPartitioning type) inline const TVector<TKeyDesc::TPartitionInfo>& TKeyDesc::GetPartitions() const { diff --git a/ydb/core/tx/datashard/range_ops.cpp b/ydb/core/tx/datashard/range_ops.cpp index e9e24fbc8fd..c09e8bacf63 100644 --- a/ydb/core/tx/datashard/range_ops.cpp +++ b/ydb/core/tx/datashard/range_ops.cpp @@ -1,200 +1,5 @@ #include "range_ops.h" -NKikimr::TTableRange NKikimr::Intersect(TConstArrayRef<NScheme::TTypeInfo> types, const TTableRange& first, const TTableRange& second) -{ - // all variants - //================= - //-----------[aaa]--- - //----[bbb]---------- - //================= - //-----------[aaa]--- - //----[bbbbbb]------- - //================= - //-----------[aaa]--- - //----[bbbbbbbb]----- - //================= - //-----------[aaa]--- - //----[bbbbbbbbbb]--- - //================= - //---------[aaa]---- - //--[bbbbbbbbbbbbb]- - - //================= - //----[aaaaaa]---------- - //----[]---------- - //================= - //----[aaaaaa]---------- - //----[bbb]---------- - //================= - //----[aaaaaa]---------- - //----[bbbbbb]---------- - //================= - //----[aaaaaa]---------- - //----[bbbbbbbb]---------- - - - //================= - //-----[aaaaaaa]---------- - //-------[bbb]---------- - //================= - //-----[aaaaaaa]---------- - //-------[bbbbb]---------- - //================= - //-----[aaaaaaa]---------- - //-------[bbbbbbb]---------- - - //================= - //-----[aaa]---------- - //---------[]---------- - //================= - //-----[aaa]---------- - //---------[bbb]---------- - - - //================= - //-----[aaa]---------- - //------------[bbb]--- - - if (first.IsEmptyRange(types)) { - return first; - } - - if (second.IsEmptyRange(types)) { - return second; - } - - int cmpFF = CompareBorders<false, false>(first.From, - second.From, - first.InclusiveFrom, - second.InclusiveFrom, - types); - int cmpTT = CompareBorders<true, true>(first.To, - second.To, - first.InclusiveTo, - second.InclusiveTo, - types); - int cmpFT = CompareBorders<false, true>(first.From, - second.To, - first.InclusiveFrom, - second.InclusiveTo, - types); - int cmpTF = CompareBorders<true, false>(first.To, - second.From, - first.InclusiveTo, - second.InclusiveFrom, - types); - if (cmpFF < 0) - { - if (cmpTF < 0) { - //================= - //-----------[aaa]---------- - //----[bbb]---------- - return TTableRange(second.From, second.InclusiveFrom, - first.To, first.InclusiveTo); - } else if (cmpTF == 0) { - //================= - //-----------[aaa]---------- - //----[bbbbbb]---------- - return TTableRange(second.From, second.InclusiveFrom, - first.To, first.InclusiveTo); - } else { // if (cmpTF > 0) { - if (cmpTT < 0) { - //================= - //-----------[aaa]---------- - //----[bbbbbbbb]---------- - return TTableRange(second.From, second.InclusiveFrom, - first.To, first.InclusiveTo); - } else if (cmpTT == 0) { - //================= - //-----------[aaa]---------- - //----[bbbbbbbbbb]---------- - return TTableRange(second.From, second.InclusiveFrom, - second.To, second.InclusiveTo); - } else { // if (cmpTT > 0) { - //================= - //---------[aaa]---- - //--[bbbbbbbbbbbbb]- - return TTableRange(second.From, second.InclusiveFrom, - second.To, second.InclusiveTo); - } - } - } else if (cmpFF == 0) { - if (cmpTT < 0) { - if (cmpTF == 0) { - //================= - //----[aaaaaa]---------- - //----[]---------- - return TTableRange(second.From, second.InclusiveFrom, - first.To, first.InclusiveTo); - } else if (cmpTF > 0) { - //================= - //----[aaaaaa]---------- - //----[bbb]---------- - return TTableRange(second.From, second.InclusiveFrom, - first.To, first.InclusiveTo); - } else { // if (cmpTF < 0) - Y_ENSURE(false, "unreachable"); - } - } else if (cmpTT == 0) { - //================= - //----[aaaaaa]---------- - //----[bbbbbb]---------- - return TTableRange(second.From, second.InclusiveFrom, - first.To, first.InclusiveTo); - } else { // if (cmpTT > 0) - //================= - //----[aaaaaa]---------- - //----[bbbbbbbb]---------- - return TTableRange(second.From, second.InclusiveFrom, - second.To, second.InclusiveTo); - } - } else { //if (cmpFF > 0) { - if (cmpFT < 0) { - if (cmpTT < 0) { - //================= - //-----[aaaaaaa]---------- - //-------[bbb]---------- - return TTableRange(first.From, first.InclusiveFrom, - first.To, first.InclusiveTo); - } else if (cmpTT == 0) { - //================= - //-----[aaaaaaa]---------- - //-------[bbbbb]---------- - return TTableRange(first.From, first.InclusiveFrom, - first.To, first.InclusiveTo); - } else { //if (cmpTT > 0) { - //================= - //-----[aaaaaaa]---------- - //-------[bbbbbbb]---------- - return TTableRange(first.From, first.InclusiveFrom, - second.To, second.InclusiveTo); - } - } else if (cmpFT == 0) { - if (cmpTT == 0) { - //================= - //-----[aaa]---------- - //---------[]---------- - return TTableRange(first.From, first.InclusiveFrom, - first.To, first.InclusiveTo); - } else if (cmpTT > 0) { - //================= - //-----[aaa]---------- - //---------[bbb]---------- - return TTableRange(first.From, first.InclusiveFrom, - second.To, second.InclusiveTo); - } else { - // cmpTT < 0 - Y_ENSURE(false, "unreachable"); - } - } else { // if (cmpFT > 0) - //================= - //-----[aaa]---------- - //------------[bbb]--- - return TTableRange(first.From, first.InclusiveFrom, - second.To, second.InclusiveTo); - } - } -} TString NKikimr::DebugPrintRange(TConstArrayRef<NScheme::TTypeInfo> types, const NKikimr::TTableRange &range, const NScheme::TTypeRegistry& typeRegistry) diff --git a/ydb/core/tx/datashard/range_ops.h b/ydb/core/tx/datashard/range_ops.h index b93ee8a48a8..9b7c6030a9f 100644 --- a/ydb/core/tx/datashard/range_ops.h +++ b/ydb/core/tx/datashard/range_ops.h @@ -6,8 +6,6 @@ namespace NKikimr { -TTableRange Intersect(TConstArrayRef<NScheme::TTypeInfo> types, const TTableRange& first, const TTableRange& second); - TString DebugPrintRange(TConstArrayRef<NScheme::TTypeInfo> types, const TTableRange& range, const NScheme::TTypeRegistry& typeRegistry); TString DebugPrintRanges(TConstArrayRef<NScheme::TTypeInfo> types, const TSmallVec<TSerializedTableRange>& ranges, const NScheme::TTypeRegistry& typeRegistry); TString DebugPrintPoint(TConstArrayRef<NScheme::TTypeInfo> types, const TConstArrayRef<TCell>& point, const NScheme::TTypeRegistry& typeRegistry); |
