summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <[email protected]>2026-04-16 21:33:16 +0300
committerGitHub <[email protected]>2026-04-16 18:33:16 +0000
commit6bc35e1e98cebdfdbd4c5fdba205da2aada2ea1b (patch)
treeeb80d8dd1833bc786dcd72349b7d553c7d6ea9b4
parent560ab811120265e459f4bd685c057deb3ff85fa6 (diff)
continue partitioning incapsulation (#38237)
-rw-r--r--ydb/core/kqp/common/kqp_tx_manager.cpp6
-rw-r--r--ydb/core/kqp/common/kqp_tx_manager.h4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp2
-rw-r--r--ydb/core/kqp/runtime/kqp_buffer_lookup_actor.cpp2
-rw-r--r--ydb/core/kqp/runtime/kqp_full_text_source.cpp76
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp2
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp101
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.h3
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp2
-rw-r--r--ydb/core/kqp/runtime/kqp_write_table.cpp4
-rw-r--r--ydb/core/kqp/runtime/kqp_write_table.h2
-rw-r--r--ydb/core/scheme/scheme_tabledefs.cpp253
-rw-r--r--ydb/core/scheme/scheme_tabledefs.h14
-rw-r--r--ydb/core/tx/datashard/range_ops.cpp195
-rw-r--r--ydb/core/tx/datashard/range_ops.h2
15 files changed, 308 insertions, 360 deletions
diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp
index 2c64324bb09..f057730712c 100644
--- a/ydb/core/kqp/common/kqp_tx_manager.cpp
+++ b/ydb/core/kqp/common/kqp_tx_manager.cpp
@@ -191,11 +191,11 @@ public:
State = ETransactionState::ERROR;
}
- void SetPartitioning(const TTableId tableId, const std::shared_ptr<const TPartitioning>& partitioning) override {
+ void SetPartitioning(const TTableId tableId, const TPartitioning::TCPtr& partitioning) override {
TablePartitioning[tableId] = partitioning;
}
- std::shared_ptr<const TPartitioning> GetPartitioning(const TTableId tableId) const override {
+ TPartitioning::TCPtr GetPartitioning(const TTableId tableId) const override {
auto iterator = TablePartitioning.find(tableId);
if (iterator != std::end(TablePartitioning)) {
return iterator->second;
@@ -679,7 +679,7 @@ private:
THashSet<ui32> ParticipantNodes;
- THashMap<TTableId, std::shared_ptr<const TPartitioning>> TablePartitioning;
+ THashMap<TTableId, TPartitioning::TCPtr> TablePartitioning;
bool AllowVolatile = false;
bool ReadOnly = true;
diff --git a/ydb/core/kqp/common/kqp_tx_manager.h b/ydb/core/kqp/common/kqp_tx_manager.h
index 6398e915a14..8f5c1724c1d 100644
--- a/ydb/core/kqp/common/kqp_tx_manager.h
+++ b/ydb/core/kqp/common/kqp_tx_manager.h
@@ -65,8 +65,8 @@ public:
virtual void SetError(ui64 shardId) = 0;
virtual void SetError() = 0;
- virtual void SetPartitioning(const TTableId tableId, const std::shared_ptr<const TPartitioning>& partitioning) = 0;
- virtual std::shared_ptr<const TPartitioning> GetPartitioning(const TTableId tableId) const = 0;
+ virtual void SetPartitioning(const TTableId tableId, const TPartitioning::TCPtr& partitioning) = 0;
+ virtual TPartitioning::TCPtr GetPartitioning(const TTableId tableId) const = 0;
virtual void SetTopicOperations(NTopic::TTopicOperations&& topicOperations) = 0;
virtual const NTopic::TTopicOperations& GetTopicOperations() const = 0;
diff --git a/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp b/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp
index 4ed4c181013..0ec0824cf84 100644
--- a/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp
@@ -929,7 +929,7 @@ private:
TVector<NScheme::TTypeInfo> KeyColumnTypes;
THashMap<ui32, size_t> KeyColumnIdToPos;
- std::shared_ptr<const TPartitioning> TablePartitioning;
+ TPartitioning::TCPtr TablePartitioning;
THashMap<TPartitionIndex, TBatchPartitionInfo::TPtr> StartedPartitions;
TPartitionIndex NextPartitionIndex = 0;
diff --git a/ydb/core/kqp/runtime/kqp_buffer_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_buffer_lookup_actor.cpp
index 2087ebd4a2b..388fc72051f 100644
--- a/ydb/core/kqp/runtime/kqp_buffer_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_buffer_lookup_actor.cpp
@@ -699,7 +699,7 @@ public:
private:
TKqpBufferTableLookupSettings Settings;
- std::shared_ptr<const TPartitioning> Partitioning;
+ TPartitioning::TCPtr Partitioning;
const TString LogPrefix;
TVector<NScheme::TTypeInfo> KeyColumnTypes;
diff --git a/ydb/core/kqp/runtime/kqp_full_text_source.cpp b/ydb/core/kqp/runtime/kqp_full_text_source.cpp
index 68f6055df96..accad952f2a 100644
--- a/ydb/core/kqp/runtime/kqp_full_text_source.cpp
+++ b/ydb/core/kqp/runtime/kqp_full_text_source.cpp
@@ -149,7 +149,7 @@ class TTableReader : public TAtomicRefCount<T> {
TVector<NScheme::TTypeInfo> KeyColumnTypes;
TVector<NScheme::TTypeInfo> ResultColumnTypes;
TVector<i32> ResultColumnIds;
- std::shared_ptr<const TPartitioning> PartitionInfo;
+ TPartitioning::TCPtr PartitionInfo;
public:
@@ -290,64 +290,8 @@ public:
// Uses binary search on PartitionInfo boundaries to find the first partition
// that may overlap, then walks forward collecting intersections until the
// range is fully covered.
- std::vector<std::pair<ui64, TTableRange>> GetRangePartitioning(const TTableRange& range) {
-
- YQL_ENSURE(PartitionInfo);
-
- const auto& partitions = PartitionInfo->GetTablePartitioning();
-
- // Binary search of the index to start with.
- size_t idxStart = 0;
- size_t idxFinish = partitions.size();
- while ((idxFinish - idxStart) > 1) {
- size_t idxCur = (idxFinish + idxStart) / 2;
- const auto& partCur = partitions[idxCur].Range->EndKeyPrefix.GetCells();
- YQL_ENSURE(partCur.size() <= KeyColumnTypes.size());
- int cmp = CompareTypedCellVectors(partCur.data(), range.From.data(), KeyColumnTypes.data(),
- std::min(partCur.size(), range.From.size()));
- if (cmp < 0) {
- idxStart = idxCur;
- } else {
- idxFinish = idxCur;
- }
- }
-
- std::vector<TCell> minusInf(KeyColumnTypes.size());
-
- std::vector<std::pair<ui64, TTableRange>> rangePartition;
- for (size_t idx = idxStart; idx < partitions.size(); ++idx) {
- TTableRange partitionRange{
- idx == 0 ? minusInf : partitions[idx - 1].Range->EndKeyPrefix.GetCells(),
- idx == 0 ? true : !partitions[idx - 1].Range->IsInclusive,
- partitions[idx].Range->EndKeyPrefix.GetCells(),
- partitions[idx].Range->IsInclusive
- };
-
- if (range.Point) {
- int intersection = ComparePointAndRange(
- range.From,
- partitionRange,
- KeyColumnTypes,
- KeyColumnTypes);
-
- if (intersection == 0) {
- rangePartition.emplace_back(partitions[idx].ShardId, range);
- } else if (intersection < 0) {
- break;
- }
- } else {
- int intersection = CompareRanges(range, partitionRange, KeyColumnTypes);
-
- if (intersection == 0) {
- auto rangeIntersection = Intersect(KeyColumnTypes, range, partitionRange);
- rangePartition.emplace_back(partitions[idx].ShardId, rangeIntersection);
- } else if (intersection < 0) {
- break;
- }
- }
- }
-
- return rangePartition;
+ std::vector<TPartitioning::TIntersection> GetRangePartitioning(const TTableRange& range) {
+ return PartitionInfo->GetIntersectionWithRange(KeyColumnTypes, range);
}
};
@@ -925,7 +869,7 @@ public:
auto rangePartition = Reader->GetRangePartitioning(range);
for(const auto& [shardId, range] : rangePartition) {
- RangesToRead.emplace_back(shardId, range);
+ RangesToRead.emplace_back(shardId, std::move(range));
}
}
};
@@ -1972,22 +1916,22 @@ public:
for (auto& info : infos) {
auto ranges = reader->GetRangePartitioning(info->GetPoint());
YQL_ENSURE(ranges.size() == 1);
- if (ranges[0].first != lastShard && lastShard != 0 && !allowMultipleShards) {
+ if (ranges[0].ShardId != lastShard && lastShard != 0 && !allowMultipleShards) {
break;
}
prefixSize++;
- lastShard = ranges[0].first;
- auto& shardItems = inflightItems[ranges[0].first];
+ lastShard = ranges[0].ShardId;
+ auto& shardItems = inflightItems[ranges[0].ShardId];
if (shardItems.ShardId == 0) {
- shardItems.ShardId = ranges[0].first;
+ shardItems.ShardId = ranges[0].ShardId;
}
- YQL_ENSURE(shardItems.ShardId == ranges[0].first);
+ YQL_ENSURE(shardItems.ShardId == ranges[0].ShardId);
if (shardItems.ReadId == 0) {
shardItems.ReadId = ReadsState.GetNextReadId();
}
- shardItems.Points.emplace_back(ranges[0].second);
+ shardItems.Points.emplace_back(TOwnedTableRange(ranges[0].TableRange));
shardItems.Items.emplace_back(info);
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 80bd953928b..c88c425fdc3 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -903,7 +903,7 @@ private:
TReads Reads;
bool SentResultsAvailable = false;
NUdf::EFetchStatus LastFetchStatus = NUdf::EFetchStatus::Yield;
- std::shared_ptr<const TPartitioning> Partitioning;
+ TPartitioning::TCPtr Partitioning;
const TDuration SchemeCacheRequestTimeout;
NActors::TActorId SchemeCacheRequestTimeoutTimer;
TVector<NKikimrDataEvents::TLock> Locks;
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
index f7eb2b9002a..f84d761576c 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
@@ -41,66 +41,7 @@ TStreamLookupShardReadResult::~TStreamLookupShardReadResult() {
}
namespace {
-std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpStreamLookupWorker::TPartitionInfo& partitionInfo,
- const std::vector<NScheme::TTypeInfo>& keyColumnTypes, const TOwnedTableRange& range) {
- YQL_ENSURE(partitionInfo);
-
- const auto& partitions = partitionInfo->GetTablePartitioning();
-
- // Binary search of the index to start with.
- size_t idxStart = 0;
- size_t idxFinish = partitions.size();
- while ((idxFinish - idxStart) > 1) {
- size_t idxCur = (idxFinish + idxStart) / 2;
- const auto& partCur = partitions[idxCur].Range->EndKeyPrefix.GetCells();
- YQL_ENSURE(partCur.size() <= keyColumnTypes.size());
- int cmp = CompareTypedCellVectors(partCur.data(), range.From.data(), keyColumnTypes.data(),
- std::min(partCur.size(), range.From.size()));
- if (cmp < 0) {
- idxStart = idxCur;
- } else {
- idxFinish = idxCur;
- }
- }
-
- std::vector<TCell> minusInf(keyColumnTypes.size());
-
- std::vector<std::pair<ui64, TOwnedTableRange>> rangePartition;
- for (size_t idx = idxStart; idx < partitions.size(); ++idx) {
- TTableRange partitionRange{
- idx == 0 ? minusInf : partitions[idx - 1].Range->EndKeyPrefix.GetCells(),
- idx == 0 ? true : !partitions[idx - 1].Range->IsInclusive,
- partitions[idx].Range->EndKeyPrefix.GetCells(),
- partitions[idx].Range->IsInclusive
- };
-
- if (range.Point) {
- int intersection = ComparePointAndRange(
- range.From,
- partitionRange,
- keyColumnTypes,
- keyColumnTypes);
-
- if (intersection == 0) {
- rangePartition.emplace_back(partitions[idx].ShardId, range);
- } else if (intersection < 0) {
- break;
- }
- } else {
- int intersection = CompareRanges(range, partitionRange, keyColumnTypes);
-
- if (intersection == 0) {
- auto rangeIntersection = Intersect(keyColumnTypes, range, partitionRange);
- rangePartition.emplace_back(partitions[idx].ShardId, rangeIntersection);
- } else if (intersection < 0) {
- break;
- }
- }
- }
-
- return rangePartition;
-}
struct TReadState {
std::vector<TOwnedTableRange> PendingKeys;
@@ -305,7 +246,7 @@ public:
auto range = std::move(UnprocessedKeys.front());
UnprocessedKeys.pop_front();
- auto partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), range);
+ auto partitions = partitioning->GetIntersectionWithRange(GetKeyColumnTypes(), range);
for (auto [shardId, range] : partitions) {
if (range.Point) {
pointsPerShard[shardId].push_back(std::move(range));
@@ -567,10 +508,21 @@ public:
struct TUnprocessedLeftRow {
TConstArrayRef<TCell> JoinKey;
+ TOwnedTableRange LookupKey;
- explicit TUnprocessedLeftRow(TConstArrayRef<TCell> joinKey)
- : JoinKey(std::move(joinKey))
- {}
+ explicit TUnprocessedLeftRow(TConstArrayRef<TCell> joinKey, size_t keySize)
+ : JoinKey(joinKey)
+ {
+ if (joinKey.size() < keySize) {
+ std::vector <TCell> fromCells(keySize - joinKey.size());
+ fromCells.insert(fromCells.begin(), joinKey.begin(), joinKey.end());
+ bool fromInclusive = true;
+ bool toInclusive = false;
+ LookupKey = TOwnedTableRange(fromCells, fromInclusive, joinKey, toInclusive);
+ } else {
+ LookupKey = TOwnedTableRange(joinKey);
+ }
+ }
};
void AddInputRow(NUdf::TUnboxedValue inputRow) final {
@@ -621,7 +573,7 @@ public:
} else {
auto [it, success] = PendingLeftRowsByKey.emplace(cellVec, TJoinKeyInfo(joinKeyId));
if (success) {
- UnprocessedRows.emplace_back(TUnprocessedLeftRow(cellVec));
+ UnprocessedRows.emplace_back(TUnprocessedLeftRow(cellVec, Settings.KeyColumns.size()));
}
it->second.ResultSeqNos.push_back(resultBatch);
resultBatch->AddJoinKey(it->second.JoinKeyId);
@@ -760,7 +712,7 @@ public:
auto range = std::move(UnprocessedKeys.front());
UnprocessedKeys.pop_front();
- auto partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), range);
+ auto partitions = partitioning->GetIntersectionWithRange(GetKeyColumnTypes(), range);
for (auto [shardId, range] : partitions) {
YQL_ENSURE(PendingLeftRowsByKey.contains(ExtractKeyPrefix(range)));
@@ -776,22 +728,7 @@ public:
TUnprocessedLeftRow unprocessedRow = std::move(UnprocessedRows.front());
UnprocessedRows.pop_front();
bool hasRanges = false;
- std::vector <std::pair<ui64, TOwnedTableRange>> partitions;
- if (unprocessedRow.JoinKey.size() < Settings.KeyColumns.size()) {
- // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf])
- std::vector <TCell> fromCells(Settings.KeyColumns.size() - unprocessedRow.JoinKey.size());
- fromCells.insert(fromCells.begin(), unprocessedRow.JoinKey.begin(), unprocessedRow.JoinKey.end());
- bool fromInclusive = true;
- bool toInclusive = false;
-
- partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(),
- TOwnedTableRange(fromCells, fromInclusive, unprocessedRow.JoinKey, toInclusive)
- );
- } else {
- // full pk, build point
- partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), TOwnedTableRange(unprocessedRow.JoinKey));
- }
-
+ auto partitions = partitioning->GetIntersectionWithRange(GetKeyColumnTypes(), TTableRange(unprocessedRow.LookupKey));
for (auto[shardId, range] : partitions) {
hasRanges = true;
if (range.Point) {
@@ -1205,7 +1142,7 @@ private:
}
}
- TConstArrayRef<TCell> ExtractKeyPrefix(const TOwnedTableRange& range) {
+ TConstArrayRef<TCell> ExtractKeyPrefix(const TTableRange& range) {
if (range.From.size() == Settings.LookupKeyColumns.size()) {
return range.From;
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
index 91fa169164b..a4990986ea6 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
@@ -57,8 +57,7 @@ public:
class TKqpStreamLookupWorker {
public:
using TReadList = std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>>;
- using TPartitionInfo = std::shared_ptr<const TPartitioning>;
-
+ using TPartitionInfo = TPartitioning::TCPtr;
struct TReadResultStats {
ui64 ReadRowsCount = 0;
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index ba6879815dd..8b8312555f3 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -1567,7 +1567,7 @@ private:
IKqpTableWriterCallbacks* Callbacks;
std::optional<NSchemeCache::TSchemeCacheNavigate::TEntry> SchemeEntry;
- std::shared_ptr<const TPartitioning> Partitioning;
+ TPartitioning::TCPtr Partitioning;
ui64 ResolveAttempts = 0;
IKqpTransactionManagerPtr TxManager;
diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp
index ac149b855d7..b426e3ea16d 100644
--- a/ydb/core/kqp/runtime/kqp_write_table.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_table.cpp
@@ -1524,7 +1524,7 @@ public:
}
void OnPartitioningChanged(
- const std::shared_ptr<const TPartitioning>& partitioning) override {
+ const TPartitioning::TCPtr& partitioning) override {
IsOlap = false;
Partitioning = partitioning;
BeforePartitioningChanged();
@@ -1926,7 +1926,7 @@ private:
std::vector<IShardedWriteController::TPendingShardInfo> ShardUpdates;
std::optional<NSchemeCache::TSchemeCacheNavigate::TEntry> SchemeEntry;
- std::shared_ptr<const TPartitioning> Partitioning;
+ TPartitioning::TCPtr Partitioning;
std::optional<bool> IsOlap;
};
diff --git a/ydb/core/kqp/runtime/kqp_write_table.h b/ydb/core/kqp/runtime/kqp_write_table.h
index 2347f864353..951fcb69645 100644
--- a/ydb/core/kqp/runtime/kqp_write_table.h
+++ b/ydb/core/kqp/runtime/kqp_write_table.h
@@ -143,7 +143,7 @@ public:
virtual void OnPartitioningChanged(
const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry) = 0;
virtual void OnPartitioningChanged(
- const std::shared_ptr<const TPartitioning>& partitioning) = 0;
+ const TPartitioning::TCPtr& partitioning) = 0;
using TWriteToken = ui64;
diff --git a/ydb/core/scheme/scheme_tabledefs.cpp b/ydb/core/scheme/scheme_tabledefs.cpp
index 74dc9389a48..1bf7c487b5a 100644
--- a/ydb/core/scheme/scheme_tabledefs.cpp
+++ b/ydb/core/scheme/scheme_tabledefs.cpp
@@ -74,6 +74,259 @@ const char* TTableRange::IsAmbiguousReason(size_t keyColumnsCount) const noexcep
return nullptr;
}
+NKikimr::TTableRange Intersect(TConstArrayRef<NScheme::TTypeInfo> types, const TTableRange& first, const TTableRange& second)
+{
+ // all variants
+ //=================
+ //-----------[aaa]---
+ //----[bbb]----------
+ //=================
+ //-----------[aaa]---
+ //----[bbbbbb]-------
+ //=================
+ //-----------[aaa]---
+ //----[bbbbbbbb]-----
+ //=================
+ //-----------[aaa]---
+ //----[bbbbbbbbbb]---
+ //=================
+ //---------[aaa]----
+ //--[bbbbbbbbbbbbb]-
+
+ //=================
+ //----[aaaaaa]----------
+ //----[]----------
+ //=================
+ //----[aaaaaa]----------
+ //----[bbb]----------
+ //=================
+ //----[aaaaaa]----------
+ //----[bbbbbb]----------
+ //=================
+ //----[aaaaaa]----------
+ //----[bbbbbbbb]----------
+
+
+ //=================
+ //-----[aaaaaaa]----------
+ //-------[bbb]----------
+ //=================
+ //-----[aaaaaaa]----------
+ //-------[bbbbb]----------
+ //=================
+ //-----[aaaaaaa]----------
+ //-------[bbbbbbb]----------
+
+ //=================
+ //-----[aaa]----------
+ //---------[]----------
+ //=================
+ //-----[aaa]----------
+ //---------[bbb]----------
+
+
+ //=================
+ //-----[aaa]----------
+ //------------[bbb]---
+
+ if (first.IsEmptyRange(types)) {
+ return first;
+ }
+
+ if (second.IsEmptyRange(types)) {
+ return second;
+ }
+
+ int cmpFF = CompareBorders<false, false>(first.From,
+ second.From,
+ first.InclusiveFrom,
+ second.InclusiveFrom,
+ types);
+ int cmpTT = CompareBorders<true, true>(first.To,
+ second.To,
+ first.InclusiveTo,
+ second.InclusiveTo,
+ types);
+ int cmpFT = CompareBorders<false, true>(first.From,
+ second.To,
+ first.InclusiveFrom,
+ second.InclusiveTo,
+ types);
+ int cmpTF = CompareBorders<true, false>(first.To,
+ second.From,
+ first.InclusiveTo,
+ second.InclusiveFrom,
+ types);
+ if (cmpFF < 0)
+ {
+ if (cmpTF < 0) {
+ //=================
+ //-----------[aaa]----------
+ //----[bbb]----------
+ return TTableRange(second.From, second.InclusiveFrom,
+ first.To, first.InclusiveTo);
+ } else if (cmpTF == 0) {
+ //=================
+ //-----------[aaa]----------
+ //----[bbbbbb]----------
+ return TTableRange(second.From, second.InclusiveFrom,
+ first.To, first.InclusiveTo);
+ } else { // if (cmpTF > 0) {
+ if (cmpTT < 0) {
+ //=================
+ //-----------[aaa]----------
+ //----[bbbbbbbb]----------
+ return TTableRange(second.From, second.InclusiveFrom,
+ first.To, first.InclusiveTo);
+ } else if (cmpTT == 0) {
+ //=================
+ //-----------[aaa]----------
+ //----[bbbbbbbbbb]----------
+ return TTableRange(second.From, second.InclusiveFrom,
+ second.To, second.InclusiveTo);
+ } else { // if (cmpTT > 0) {
+ //=================
+ //---------[aaa]----
+ //--[bbbbbbbbbbbbb]-
+ return TTableRange(second.From, second.InclusiveFrom,
+ second.To, second.InclusiveTo);
+ }
+ }
+ } else if (cmpFF == 0) {
+ if (cmpTT < 0) {
+ if (cmpTF == 0) {
+ //=================
+ //----[aaaaaa]----------
+ //----[]----------
+ return TTableRange(second.From, second.InclusiveFrom,
+ first.To, first.InclusiveTo);
+ } else if (cmpTF > 0) {
+ //=================
+ //----[aaaaaa]----------
+ //----[bbb]----------
+ return TTableRange(second.From, second.InclusiveFrom,
+ first.To, first.InclusiveTo);
+ } else { // if (cmpTF < 0)
+ Y_ENSURE(false, "unreachable");
+ }
+ } else if (cmpTT == 0) {
+ //=================
+ //----[aaaaaa]----------
+ //----[bbbbbb]----------
+ return TTableRange(second.From, second.InclusiveFrom,
+ first.To, first.InclusiveTo);
+ } else { // if (cmpTT > 0)
+ //=================
+ //----[aaaaaa]----------
+ //----[bbbbbbbb]----------
+ return TTableRange(second.From, second.InclusiveFrom,
+ second.To, second.InclusiveTo);
+ }
+ } else { //if (cmpFF > 0) {
+ if (cmpFT < 0) {
+ if (cmpTT < 0) {
+ //=================
+ //-----[aaaaaaa]----------
+ //-------[bbb]----------
+ return TTableRange(first.From, first.InclusiveFrom,
+ first.To, first.InclusiveTo);
+ } else if (cmpTT == 0) {
+ //=================
+ //-----[aaaaaaa]----------
+ //-------[bbbbb]----------
+ return TTableRange(first.From, first.InclusiveFrom,
+ first.To, first.InclusiveTo);
+ } else { //if (cmpTT > 0) {
+ //=================
+ //-----[aaaaaaa]----------
+ //-------[bbbbbbb]----------
+ return TTableRange(first.From, first.InclusiveFrom,
+ second.To, second.InclusiveTo);
+ }
+ } else if (cmpFT == 0) {
+ if (cmpTT == 0) {
+ //=================
+ //-----[aaa]----------
+ //---------[]----------
+ return TTableRange(first.From, first.InclusiveFrom,
+ first.To, first.InclusiveTo);
+ } else if (cmpTT > 0) {
+ //=================
+ //-----[aaa]----------
+ //---------[bbb]----------
+ return TTableRange(first.From, first.InclusiveFrom,
+ second.To, second.InclusiveTo);
+ } else {
+ // cmpTT < 0
+ Y_ENSURE(false, "unreachable");
+ }
+ } else { // if (cmpFT > 0)
+ //=================
+ //-----[aaa]----------
+ //------------[bbb]---
+ return TTableRange(first.From, first.InclusiveFrom,
+ second.To, second.InclusiveTo);
+ }
+ }
+}
+
+std::vector<TPartitioning::TIntersection> TPartitioning::GetIntersectionWithRange(const std::vector<NScheme::TTypeInfo>& keyColumnTypes, const TTableRange& range) const
+{
+ const auto& partitions = Partitions;
+ // Binary search of the index to start with.
+ size_t idxStart = 0;
+ size_t idxFinish = partitions.size();
+ while ((idxFinish - idxStart) > 1) {
+ size_t idxCur = (idxFinish + idxStart) / 2;
+ const auto& partCur = partitions[idxCur].Range->EndKeyPrefix.GetCells();
+ Y_ENSURE(partCur.size() <= keyColumnTypes.size());
+ int cmp = CompareTypedCellVectors(partCur.data(), range.From.data(), keyColumnTypes.data(),
+ std::min(partCur.size(), range.From.size()));
+ if (cmp < 0) {
+ idxStart = idxCur;
+ } else {
+ idxFinish = idxCur;
+ }
+ }
+
+ std::vector<TCell> minusInf(keyColumnTypes.size());
+
+ std::vector<TPartitioning::TIntersection> rangePartition;
+ for (size_t idx = idxStart; idx < partitions.size(); ++idx) {
+ TTableRange partitionRange{
+ idx == 0 ? minusInf : partitions[idx - 1].Range->EndKeyPrefix.GetCells(),
+ idx == 0 ? true : !partitions[idx - 1].Range->IsInclusive,
+ partitions[idx].Range->EndKeyPrefix.GetCells(),
+ partitions[idx].Range->IsInclusive
+ };
+
+ if (range.Point) {
+ int intersection = ComparePointAndRange(
+ range.From,
+ partitionRange,
+ keyColumnTypes,
+ keyColumnTypes);
+
+ if (intersection == 0) {
+ rangePartition.emplace_back(partitions[idx].ShardId, TOwnedTableRange(range));
+ } else if (intersection < 0) {
+ break;
+ }
+ } else {
+ int intersection = CompareRanges(range, partitionRange, keyColumnTypes);
+
+ if (intersection == 0) {
+ auto rangeIntersection = Intersect(keyColumnTypes, range, partitionRange);
+ rangePartition.emplace_back(partitions[idx].ShardId, TOwnedTableRange(rangeIntersection));
+ } else if (intersection < 0) {
+ break;
+ }
+ }
+ }
+
+ return rangePartition;
+}
+
bool TSerializedTableRange::IsEmpty(TConstArrayRef<NScheme::TTypeInfo> types) const
{
auto cmp = CompareBorders<true, false>(To.GetCells(), From.GetCells(), ToInclusive, FromInclusive, types);
diff --git a/ydb/core/scheme/scheme_tabledefs.h b/ydb/core/scheme/scheme_tabledefs.h
index 97a0c1c94d5..2f0ca09875c 100644
--- a/ydb/core/scheme/scheme_tabledefs.h
+++ b/ydb/core/scheme/scheme_tabledefs.h
@@ -726,13 +726,15 @@ public:
static THolder<TKeyDesc> CreateMiniKeyDesc(const TVector<NScheme::TTypeInfo> &keyColumnTypes);
private:
- TKeyDesc(const TVector<NScheme::TTypeInfo> &keyColumnTypes);
+ TKeyDesc(const TVector<NScheme::TTypeInfo>& keyColumnTypes);
};
class TPartitioning {
public:
TPartitioning() = default;
+ using TCPtr = std::shared_ptr<const TPartitioning>;
+
explicit TPartitioning(TVector<TKeyDesc::TPartitionInfo>&& partitions)
: Partitions(std::move(partitions)) {}
@@ -746,16 +748,26 @@ public:
const_iterator begin() const { return Partitions.begin(); }
const_iterator end() const { return Partitions.end(); }
+ struct TIntersection {
+ ui64 ShardId;
+ TOwnedTableRange TableRange;
+ };
+
// Escape hatch: returns raw sorted vector. Grep for this method name to find
// callers that need migration when the data structure changes.
const TVector<TKeyDesc::TPartitionInfo>& GetTablePartitioning() const {
return Partitions;
}
+ std::vector<TIntersection> GetIntersectionWithRange(const std::vector<NScheme::TTypeInfo>& keyColumnTypes, const TTableRange& range) const;
+
private:
TVector<TKeyDesc::TPartitionInfo> Partitions;
};
+
+TTableRange Intersect(TConstArrayRef<NScheme::TTypeInfo> types, const TTableRange& first, const TTableRange& second);
+
// Deferred inline definitions for TKeyDesc (require complete TPartitioning type)
inline const TVector<TKeyDesc::TPartitionInfo>& TKeyDesc::GetPartitions() const {
diff --git a/ydb/core/tx/datashard/range_ops.cpp b/ydb/core/tx/datashard/range_ops.cpp
index e9e24fbc8fd..c09e8bacf63 100644
--- a/ydb/core/tx/datashard/range_ops.cpp
+++ b/ydb/core/tx/datashard/range_ops.cpp
@@ -1,200 +1,5 @@
#include "range_ops.h"
-NKikimr::TTableRange NKikimr::Intersect(TConstArrayRef<NScheme::TTypeInfo> types, const TTableRange& first, const TTableRange& second)
-{
- // all variants
- //=================
- //-----------[aaa]---
- //----[bbb]----------
- //=================
- //-----------[aaa]---
- //----[bbbbbb]-------
- //=================
- //-----------[aaa]---
- //----[bbbbbbbb]-----
- //=================
- //-----------[aaa]---
- //----[bbbbbbbbbb]---
- //=================
- //---------[aaa]----
- //--[bbbbbbbbbbbbb]-
-
- //=================
- //----[aaaaaa]----------
- //----[]----------
- //=================
- //----[aaaaaa]----------
- //----[bbb]----------
- //=================
- //----[aaaaaa]----------
- //----[bbbbbb]----------
- //=================
- //----[aaaaaa]----------
- //----[bbbbbbbb]----------
-
-
- //=================
- //-----[aaaaaaa]----------
- //-------[bbb]----------
- //=================
- //-----[aaaaaaa]----------
- //-------[bbbbb]----------
- //=================
- //-----[aaaaaaa]----------
- //-------[bbbbbbb]----------
-
- //=================
- //-----[aaa]----------
- //---------[]----------
- //=================
- //-----[aaa]----------
- //---------[bbb]----------
-
-
- //=================
- //-----[aaa]----------
- //------------[bbb]---
-
- if (first.IsEmptyRange(types)) {
- return first;
- }
-
- if (second.IsEmptyRange(types)) {
- return second;
- }
-
- int cmpFF = CompareBorders<false, false>(first.From,
- second.From,
- first.InclusiveFrom,
- second.InclusiveFrom,
- types);
- int cmpTT = CompareBorders<true, true>(first.To,
- second.To,
- first.InclusiveTo,
- second.InclusiveTo,
- types);
- int cmpFT = CompareBorders<false, true>(first.From,
- second.To,
- first.InclusiveFrom,
- second.InclusiveTo,
- types);
- int cmpTF = CompareBorders<true, false>(first.To,
- second.From,
- first.InclusiveTo,
- second.InclusiveFrom,
- types);
- if (cmpFF < 0)
- {
- if (cmpTF < 0) {
- //=================
- //-----------[aaa]----------
- //----[bbb]----------
- return TTableRange(second.From, second.InclusiveFrom,
- first.To, first.InclusiveTo);
- } else if (cmpTF == 0) {
- //=================
- //-----------[aaa]----------
- //----[bbbbbb]----------
- return TTableRange(second.From, second.InclusiveFrom,
- first.To, first.InclusiveTo);
- } else { // if (cmpTF > 0) {
- if (cmpTT < 0) {
- //=================
- //-----------[aaa]----------
- //----[bbbbbbbb]----------
- return TTableRange(second.From, second.InclusiveFrom,
- first.To, first.InclusiveTo);
- } else if (cmpTT == 0) {
- //=================
- //-----------[aaa]----------
- //----[bbbbbbbbbb]----------
- return TTableRange(second.From, second.InclusiveFrom,
- second.To, second.InclusiveTo);
- } else { // if (cmpTT > 0) {
- //=================
- //---------[aaa]----
- //--[bbbbbbbbbbbbb]-
- return TTableRange(second.From, second.InclusiveFrom,
- second.To, second.InclusiveTo);
- }
- }
- } else if (cmpFF == 0) {
- if (cmpTT < 0) {
- if (cmpTF == 0) {
- //=================
- //----[aaaaaa]----------
- //----[]----------
- return TTableRange(second.From, second.InclusiveFrom,
- first.To, first.InclusiveTo);
- } else if (cmpTF > 0) {
- //=================
- //----[aaaaaa]----------
- //----[bbb]----------
- return TTableRange(second.From, second.InclusiveFrom,
- first.To, first.InclusiveTo);
- } else { // if (cmpTF < 0)
- Y_ENSURE(false, "unreachable");
- }
- } else if (cmpTT == 0) {
- //=================
- //----[aaaaaa]----------
- //----[bbbbbb]----------
- return TTableRange(second.From, second.InclusiveFrom,
- first.To, first.InclusiveTo);
- } else { // if (cmpTT > 0)
- //=================
- //----[aaaaaa]----------
- //----[bbbbbbbb]----------
- return TTableRange(second.From, second.InclusiveFrom,
- second.To, second.InclusiveTo);
- }
- } else { //if (cmpFF > 0) {
- if (cmpFT < 0) {
- if (cmpTT < 0) {
- //=================
- //-----[aaaaaaa]----------
- //-------[bbb]----------
- return TTableRange(first.From, first.InclusiveFrom,
- first.To, first.InclusiveTo);
- } else if (cmpTT == 0) {
- //=================
- //-----[aaaaaaa]----------
- //-------[bbbbb]----------
- return TTableRange(first.From, first.InclusiveFrom,
- first.To, first.InclusiveTo);
- } else { //if (cmpTT > 0) {
- //=================
- //-----[aaaaaaa]----------
- //-------[bbbbbbb]----------
- return TTableRange(first.From, first.InclusiveFrom,
- second.To, second.InclusiveTo);
- }
- } else if (cmpFT == 0) {
- if (cmpTT == 0) {
- //=================
- //-----[aaa]----------
- //---------[]----------
- return TTableRange(first.From, first.InclusiveFrom,
- first.To, first.InclusiveTo);
- } else if (cmpTT > 0) {
- //=================
- //-----[aaa]----------
- //---------[bbb]----------
- return TTableRange(first.From, first.InclusiveFrom,
- second.To, second.InclusiveTo);
- } else {
- // cmpTT < 0
- Y_ENSURE(false, "unreachable");
- }
- } else { // if (cmpFT > 0)
- //=================
- //-----[aaa]----------
- //------------[bbb]---
- return TTableRange(first.From, first.InclusiveFrom,
- second.To, second.InclusiveTo);
- }
- }
-}
TString NKikimr::DebugPrintRange(TConstArrayRef<NScheme::TTypeInfo> types, const NKikimr::TTableRange &range,
const NScheme::TTypeRegistry& typeRegistry)
diff --git a/ydb/core/tx/datashard/range_ops.h b/ydb/core/tx/datashard/range_ops.h
index b93ee8a48a8..9b7c6030a9f 100644
--- a/ydb/core/tx/datashard/range_ops.h
+++ b/ydb/core/tx/datashard/range_ops.h
@@ -6,8 +6,6 @@
namespace NKikimr {
-TTableRange Intersect(TConstArrayRef<NScheme::TTypeInfo> types, const TTableRange& first, const TTableRange& second);
-
TString DebugPrintRange(TConstArrayRef<NScheme::TTypeInfo> types, const TTableRange& range, const NScheme::TTypeRegistry& typeRegistry);
TString DebugPrintRanges(TConstArrayRef<NScheme::TTypeInfo> types, const TSmallVec<TSerializedTableRange>& ranges, const NScheme::TTypeRegistry& typeRegistry);
TString DebugPrintPoint(TConstArrayRef<NScheme::TTypeInfo> types, const TConstArrayRef<TCell>& point, const NScheme::TTypeRegistry& typeRegistry);