aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsvc <svc@yandex-team.ru>2022-04-04 11:49:07 +0300
committersvc <svc@yandex-team.ru>2022-04-04 11:49:07 +0300
commit5168b8eda9113d0673ebddd789f7315f12deab64 (patch)
tree59b171fcb6a6d26685a608ed5e059be0d08047c4
parent3de3baa5532f2d6a78b7ded20a65895cb6dff8a4 (diff)
downloadydb-5168b8eda9113d0673ebddd789f7315f12deab64.tar.gz
KIKIMR-14636 slit after move
ref:339a725351ef2ba28e38916a12ff0055f9df2fe5
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp.autosave1024
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h3
-rw-r--r--ydb/services/ydb/ydb_table_split_ut.cpp173
6 files changed, 187 insertions, 1032 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp
index b657f792c91..1c68baad915 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp
@@ -228,6 +228,7 @@ public:
Y_VERIFY(context.SS->Tables.contains(srcPath.Base()->PathId));
TTableInfo::TPtr tableInfo = new TTableInfo(*context.SS->Tables.at(srcPath.Base()->PathId));
+ tableInfo->ResetDescriptionCache();
tableInfo->AlterVersion += 1;
// copy table info
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp.autosave b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp.autosave
deleted file mode 100644
index 07d9913782b..00000000000
--- a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp.autosave
+++ /dev/null
@@ -1,1024 +0,0 @@
-#include "schemeshard__operation_part.h"
-#include "schemeshard__operation_common.h"
-#include "schemeshard_impl.h"
-
-#include <ydb/core/base/subdomain.h>
-
-namespace {
-
-using namespace NKikimr;
-using namespace NSchemeShard;
-
-class TConfigureDestination: public TSubOperationState {
-private:
- TOperationId OperationId;
-
- TString DebugHint() const override {
- return TStringBuilder()
- << "TSplitMerge TConfigureDestination"
- << " operationId#" << OperationId;
- }
-
-public:
- TConfigureDestination(TOperationId id)
- : OperationId(id)
- {
- IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType});
- }
-
- bool HandleReply(TEvDataShard::TEvInitSplitMergeDestinationAck::TPtr& ev, TOperationContext& context) override {
- TTabletId ssId = context.SS->SelfTabletId();
-
- LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- DebugHint() << " HandleReply TEvInitSplitMergeDestinationAck"
- << ", operationId: " << OperationId
- << ", at schemeshard: " << ssId
- << " message# " << ev->Get()->Record.ShortDebugString());
-
- NIceDb::TNiceDb db(context.Txc.DB);
-
- TTxState* txState = context.SS->FindTx(OperationId);
- Y_VERIFY(txState);
- Y_VERIFY(txState->TxType == TTxState::TxSplitTablePartition || txState->TxType == TTxState::TxMergeTablePartition);
- Y_VERIFY(txState->State == TTxState::ConfigureParts);
-
- TTabletId tabletId = TTabletId(ev->Get()->Record.GetTabletId());
- TShardIdx idx = context.SS->MustGetShardIdx(tabletId);
- if (!idx) {
- LOG_ERROR(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Tablet %" PRIu64 " is not known in TxId %" PRIu64,
- tabletId, OperationId.GetTxId());
- return false;
- }
-
- if (!context.SS->ShardInfos.contains(idx)) {
- LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- DebugHint() << " Got InitSplitMergeDestinationAck"
- << " for unknown shard idx " << idx
- << " tabletId " << tabletId);
- return false;
- }
-https://st.yandex-team.ru/KIKIMR-13104
- txState->ShardsInProgress.erase(idx);
-
- context.OnComplete.UnbindMsgFromPipe(OperationId, tabletId, idx);
-
- // If all dst datashards have been initialized
- if (txState->ShardsInProgress.empty()) {
- context.SS->ChangeTxState(db, OperationId, TTxState::TransferData);
- context.OnComplete.ActivateTx(OperationId);
- return true;
- }
-
- return false;
- }
-
- bool ProgressState(TOperationContext& context) override {
- TTabletId ssId = context.SS->SelfTabletId();
-
- LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TSplitMerge TConfigureDestination ProgressState"
- << ", operationId: " << OperationId
- << ", at schemeshard: " << ssId);
-
- TTxState* txState = context.SS->TxInFlight.FindPtr(OperationId);
- Y_VERIFY(txState);
- Y_VERIFY(txState->TxType == TTxState::TxSplitTablePartition || txState->TxType == TTxState::TxMergeTablePartition);
- Y_VERIFY(txState->State == TTxState::ConfigureParts);
-
- txState->ClearShardsInProgress();
-
- // A helper hash map translating tablet id to destination range index
- THashMap<TTabletId, ui32> dstTabletToRangeIdx;
-
- auto getDstRangeIdx = [&dstTabletToRangeIdx] (TTabletId tabletId) -> ui32 {
- auto it = dstTabletToRangeIdx.find(tabletId);
- Y_VERIFY_S(it != dstTabletToRangeIdx.end(),
- "Cannot find range info for destination tablet " << tabletId);
- return it->second;
- };
-
- // Fill tablet ids in split description
- NKikimrTxDataShard::TSplitMergeDescription& splitDescr = *txState->SplitDescription;
- for (ui32 i = 0; i < splitDescr.DestinationRangesSize(); ++i) {
- auto* rangeDescr = splitDescr.MutableDestinationRanges()->Mutable(i);
- auto shardIdx = context.SS->MakeLocalId(TLocalShardIdx(rangeDescr->GetShardIdx()));
- auto datashardId = context.SS->ShardInfos[shardIdx].TabletID;
- rangeDescr->SetTabletID(ui64(datashardId));
- dstTabletToRangeIdx[datashardId] = i;
- }
-
- // Save updated split description
- TString extraData;
- bool serializeRes = txState->SplitDescription->SerializeToString(&extraData);
- Y_VERIFY(serializeRes);
- NIceDb::TNiceDb db(context.Txc.DB);
- db.Table<Schema::TxInFlightV2>().Key(OperationId.GetTxId(), OperationId.GetSubTxId()).Update(
- NIceDb::TUpdate<Schema::TxInFlightV2::ExtraBytes>(extraData));
-
- const auto tableInfo = context.SS->Tables.FindPtr(txState->TargetPathId);
- Y_VERIFY(tableInfo);
-
- const ui64 alterVersion = (*tableInfo)->AlterVersion;
-
- const ui64 subDomainPathId = context.SS->ResolveDomainId(txState->TargetPathId).LocalPathId;
-
- for (const auto& shard: txState->Shards) {
- // Skip src shard
- if (shard.Operation != TTxState::CreateParts) {
- continue;
- }
-
- TTabletId datashardId = context.SS->ShardInfos[shard.Idx].TabletID;
-
- LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Initializing scheme "
- << "on dst datashard: " << datashardId
- << " splitOp: " << OperationId
- << " alterVersion: " << alterVersion
- << " at tablet: " << context.SS->TabletID());
-
- const ui32 rangeIdx = getDstRangeIdx(datashardId);
- const auto& rangeDescr = splitDescr.GetDestinationRanges(rangeIdx);
-
- // For each destination shard we construct an individual description
- // that contains all src shards and only this one dst shard with its range
- NKikimrTxDataShard::TSplitMergeDescription splitDescForShard;
- splitDescForShard.MutableSourceRanges()->CopyFrom(txState->SplitDescription->GetSourceRanges());
- splitDescForShard.AddDestinationRanges()->CopyFrom(rangeDescr);
-
- Y_VERIFY(txState->SplitDescription);
- THolder<TEvDataShard::TEvInitSplitMergeDestination> event =
- THolder(new TEvDataShard::TEvInitSplitMergeDestination(ui64(OperationId.GetTxId()), context.SS->TabletID(),
- subDomainPathId,
- splitDescForShard,
- context.SS->SelectProcessingPrarams(txState->TargetPathId)));
-
- // Add a new-style CreateTable with correct per-shard settings
- // WARNING: legacy datashard will ignore this and use the schema
- // received from some source datashard instead, so schemas must not
- // diverge during a migration period. That's ok though, since
- // schemas may only become incompatible after column family storage
- // configuration is altered, and it's protected with a feature flag.
- auto tableDesc = event->Record.MutableCreateTable();
- context.SS->FillTableDescriptionForShardIdx(
- txState->TargetPathId,
- shard.Idx,
- tableDesc,
- rangeDescr.GetKeyRangeBegin(),
- rangeDescr.GetKeyRangeEnd(),
- true, false);
- context.SS->FillTableSchemaVersion(alterVersion, tableDesc);
-
- context.OnComplete.BindMsgToPipe(OperationId, datashardId, shard.Idx, event.Release());
- }
-
- txState->UpdateShardsInProgress(TTxState::CreateParts);
- return false;
- }
-};
-
-
-class TTranserData: public TSubOperationState {
-private:
- TOperationId OperationId;
-
- TString DebugHint() const override {
- return TStringBuilder()
- << "TSplitMerge TTranserData"
- << " operationId#" << OperationId;
- }
-
-public:
- TTranserData(TOperationId id)
- : OperationId(id)
- {
- IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType, TEvDataShard::TEvInitSplitMergeDestinationAck::EventType});
- }
-
- bool HandleReply(TEvDataShard::TEvSplitAck::TPtr& ev, TOperationContext& context) override {
- TTabletId ssId = context.SS->SelfTabletId();
-
- LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- DebugHint() << " HandleReply TEvSplitAck"
- << ", at schemeshard: " << ssId
- << ", message: " << ev->Get()->Record.ShortDebugString());
-
- NIceDb::TNiceDb db(context.Txc.DB);
-
- TTxState* txState = context.SS->FindTx(OperationId);
- Y_VERIFY(txState);
- Y_VERIFY(txState->TxType == TTxState::TxSplitTablePartition || txState->TxType == TTxState::TxMergeTablePartition);
- Y_VERIFY(txState->State == TTxState::TransferData);
-
- auto tabletId = TTabletId(ev->Get()->Record.GetTabletId());
- auto srcShardIdx = context.SS->GetShardIdx(tabletId);
- if (!srcShardIdx) {
- LOG_ERROR(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Tablet %" PRIu64 " is not known in TxId %" PRIu64,
- tabletId, OperationId.GetTxId());
- return false;
- }
-
- if (!context.SS->ShardInfos.contains(srcShardIdx)) {
- LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- DebugHint() << " Got SplitAck for unknown shard"
- << " idx " << srcShardIdx
- << " tabletId " << tabletId);
- return false;
- }
-
- txState->ShardsInProgress.erase(srcShardIdx);
- context.OnComplete.UnbindMsgFromPipe(OperationId, tabletId, srcShardIdx);
-
- if (!txState->ShardsInProgress.empty()) {
- // TODO: verify that we only wait for Src shards
- return false;
- }
-
- // Switch table partitioning: exclude src shard and include all dst shards
- TPathId tableId = txState->TargetPathId;
- TTableInfo::TPtr tableInfo = *context.SS->Tables.FindPtr(tableId);
- Y_VERIFY(tableInfo);
-
- // Replace all Src datashard(s) with Dst datashard(s)
- TVector<TTableShardInfo> newPartitioning;
- THashSet<TShardIdx> allSrcShardIdxs;
- for (const auto& txShard : txState->Shards) {
- if (txShard.Operation == TTxState::TransferData)
- allSrcShardIdxs.insert(txShard.Idx);
- }
- bool dstAdded = false;
- for (const auto& shard : tableInfo->GetPartitions()) {
- if (allSrcShardIdxs.contains(shard.ShardIdx)) {
- if (dstAdded)
- continue;
- for (const auto& txShard : txState->Shards) {
- if (txShard.Operation != TTxState::CreateParts)
- continue;
-
- // TODO: make sure dst are sorted by range end
- Y_VERIFY(context.SS->ShardInfos.contains(txShard.Idx));
- TTableShardInfo dst(txShard.Idx, txShard.RangeEnd);
- newPartitioning.push_back(dst);
- }
- dstAdded = true;
- } else {
- newPartitioning.push_back(shard);
- }
- }
-
- auto oldAggrStats = tableInfo->GetStats().Aggregated;
-
- // Delete the whole old partitioning and persist the whole new partitionig as the indexes have changed
- context.SS->DeleteTablePartitioning(db, tableId, tableInfo);
- context.SS->SetPartitioning(tableId, tableInfo, std::move(newPartitioning));
- context.SS->PersistTablePartitioning(db, tableId, tableInfo);
- context.SS->PersistTablePartitionStats(db, tableId, tableInfo);
- context.SS->TabletCounters->Simple()[COUNTER_TABLE_SHARD_ACTIVE_COUNT].Sub(allSrcShardIdxs.size());
- context.SS->TabletCounters->Simple()[COUNTER_TABLE_SHARD_INACTIVE_COUNT].Add(allSrcShardIdxs.size());
-
- if (!tableInfo->IsBackup && !tableInfo->IsShardsStatsDetached()) {
- auto newAggrStats = tableInfo->GetStats().Aggregated;
- auto subDomainId = context.SS->ResolveDomainId(tableId);
- auto subDomainInfo = context.SS->ResolveDomainInfo(tableId);
- subDomainInfo->AggrDiskSpaceUsage(context.SS, newAggrStats, oldAggrStats);
- if (subDomainInfo->CheckDiskSpaceQuotas(context.SS)) {
- context.SS->PersistSubDomainState(db, subDomainId, *subDomainInfo);
- context.OnComplete.PublishToSchemeBoard(OperationId, subDomainId);
- }
- }
-
- Y_VERIFY(txState->ShardsInProgress.empty(), "All shards should have already completed their steps");
-
- context.SS->ChangeTxState(db, OperationId, TTxState::NotifyPartitioningChanged);
- context.OnComplete.ActivateTx(OperationId);
-
- context.OnComplete.PublishToSchemeBoard(OperationId, tableId);
-
- return true;
- }
-
- bool ProgressState(TOperationContext& context) override {
- TTabletId ssId = context.SS->SelfTabletId();
-
- LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- DebugHint() << " ProgressState"
- << ", at schemeshard: " << ssId);
-
- TTxState* txState = context.SS->TxInFlight.FindPtr(OperationId);
- Y_VERIFY(txState);
- Y_VERIFY(txState->TxType == TTxState::TxSplitTablePartition || txState->TxType == TTxState::TxMergeTablePartition);
- Y_VERIFY(txState->State == TTxState::TransferData);
-
- txState->ClearShardsInProgress();
-
- for (const auto& shard : txState->Shards) {
- // Skip Dst shards
- if (shard.Operation != TTxState::TransferData)
- continue;
-
- auto datashardId = context.SS->ShardInfos[shard.Idx].TabletID;
-
- LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- DebugHint() << " Starting split on src datashard " << datashardId
- << " splitOpId# " << OperationId
- << " at tablet " << context.SS->TabletID());
-
- THolder<TEvDataShard::TEvSplit> event =
- THolder(new TEvDataShard::TEvSplit(ui64(OperationId.GetTxId())));
-
- Y_VERIFY(txState->SplitDescription);
- event->Record.MutableSplitDescription()->CopyFrom(*txState->SplitDescription);
-
- context.OnComplete.BindMsgToPipe(OperationId, datashardId, shard.Idx, event.Release());
- }
-
- txState->UpdateShardsInProgress(TTxState::TransferData);
- return false;
- }
-};
-
-class TNotifySrc: public TSubOperationState {
-private:
- TOperationId OperationId;
-
- TString DebugHint() const override {
- return TStringBuilder()
- << "TSplitMerge TNotifySrc"
- << ", operationId: " << OperationId;
- }
-public:
- TNotifySrc(TOperationId id)
- : OperationId(id)
- {
- IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType, TEvDataShard::TEvInitSplitMergeDestinationAck::EventType, TEvDataShard::TEvSplitAck::EventType});
- }
-
-
- bool HandleReply(TEvDataShard::TEvSplitPartitioningChangedAck::TPtr& ev, TOperationContext& context) override {
- auto ssId = context.SS->SelfTabletId();
- auto tabletId = TTabletId(ev->Get()->Record.GetTabletId());
-
- LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- DebugHint() << " HandleReply TEvSplitPartitioningChangedAck"
- << ", from datashard: " << tabletId
- << ", at schemeshard: " << ssId);
-
- TTxState* txState = context.SS->FindTx(OperationId);
- Y_VERIFY(txState);
- Y_VERIFY(txState->TxType == TTxState::TxSplitTablePartition || txState->TxType == TTxState::TxMergeTablePartition);
- Y_VERIFY(txState->State == TTxState::NotifyPartitioningChanged);
-
-
- auto idx = context.SS->GetShardIdx(tabletId);
- if (!idx) {
- LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Datashard is not listed in tablet to shard map"
- << ", datashard: " << tabletId
- << ", opId: " << OperationId);
- return false;
- }
-
- if (!txState->ShardsInProgress.contains(idx)) {
- // TODO: verify that this is a repeated event from known Src shard, not a random one
- LOG_INFO(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Got SplitPartitioningChangedAck from shard %" PRIu64 " that is not a part ot Tx %" PRIu64, tabletId, OperationId.GetTxId());
- return false;
- }
-
- txState->ShardsInProgress.erase(idx);
- context.OnComplete.UnbindMsgFromPipe(OperationId, tabletId, idx);
-
- if (!txState->ShardsInProgress.empty()) {
- // TODO: verify that we only wait for Src shards
- return false;
- }
-
- context.SS->DeleteSplitOp(OperationId, *txState);
-
- context.OnComplete.DoneOperation(OperationId);
- return true;
- }
-
- bool ProgressState(TOperationContext& context) override {
- //By this moment the partitioning scheme of the table has been switched and is visible to the users
- //We just want notify Src shard that it should reject all new transactions and return SchemeChanged error
- TTabletId ssId = context.SS->SelfTabletId();
-
- LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- DebugHint() << " ProgressState"
- << ", at schemeshard: " << ssId);
-
- TTxState* txState = context.SS->TxInFlight.FindPtr(OperationId);
- Y_VERIFY(txState);
- Y_VERIFY(txState->TxType == TTxState::TxSplitTablePartition || txState->TxType == TTxState::TxMergeTablePartition);
- Y_VERIFY(txState->State == TTxState::NotifyPartitioningChanged);
-
-// Y_VERIFY(txState->Notify.Empty(), "All notifications for split op shouldn't have been sent before switching to NotifyPartitioningChanged state");
-
- txState->ClearShardsInProgress();
-
- bool needToNotifySrc = false;
-
- for (const auto& shard : txState->Shards) {
- // Skip Dst shards
- if (shard.Operation != TTxState::TransferData) {
- continue;
- }
-
- if (!context.SS->ShardInfos.contains(shard.Idx) || context.SS->ShardDeleter.Has(shard.Idx)) {
- LOG_DEBUG(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Src datashard idx %" PRIu64 " for splitOp# %" PRIu64 " is already deleted or is intended to at tablet %" PRIu64,
- shard.Idx, OperationId.GetTxId(), context.SS->TabletID());
- continue;
- }
-
- auto datashardId = context.SS->ShardInfos[shard.Idx].TabletID;
-
- needToNotifySrc = true;
- LOG_DEBUG(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Notify src datashard %" PRIu64 " on partitioning changed splitOp# %" PRIu64 " at tablet %" PRIu64,
- datashardId, OperationId.GetTxId(), context.SS->TabletID());
-
- THolder<TEvDataShard::TEvSplitPartitioningChanged> event = MakeHolder<TEvDataShard::TEvSplitPartitioningChanged>(ui64(OperationId.GetTxId()));
-
- context.OnComplete.BindMsgToPipe(OperationId, datashardId, shard.Idx, event.Release());
-
- txState->ShardsInProgress.insert(shard.Idx);
- }
-
- if (!needToNotifySrc) {
- // The Src datashard could have already completed all its work, reported Offline state and got deleted
- // In this case the transaction is finished because this was the last step
- context.SS->DeleteSplitOp(OperationId, *txState);
-
- context.OnComplete.DoneOperation(OperationId);
- return true;
- }
-
- return false;
- }
-};
-
-class TSplitMerge: public TSubOperation {
-private:
- const TOperationId OperationId;
- const TTxTransaction Transaction;
- TTxState::ETxState State = TTxState::Invalid;
-
- TTxState::ETxState NextState() {
- return TTxState::CreateParts;
- }
-
- TTxState::ETxState NextState(TTxState::ETxState state) {
- switch(state) {
- case TTxState::CreateParts:
- return TTxState::ConfigureParts;
- case TTxState::ConfigureParts:
- return TTxState::TransferData;
- case TTxState::TransferData:
- return TTxState::NotifyPartitioningChanged;
- case TTxState::NotifyPartitioningChanged:
- return TTxState::Done;
- default:
- return TTxState::Invalid;
- }
- }
-
- TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) {
- switch(state) {
- case TTxState::CreateParts:
- return THolder(new TCreateParts(OperationId));
- case TTxState::ConfigureParts:
- return THolder(new TConfigureDestination(OperationId));
- case TTxState::TransferData:
- return THolder(new TTranserData(OperationId));
- case TTxState::NotifyPartitioningChanged:
- return THolder(new TNotifySrc(OperationId));
- default:
- return nullptr;
- }
- }
-
- void StateDone(TOperationContext& context) override {
- State = NextState(State);
-
- if (State != TTxState::Invalid) {
- SetState(SelectStateFunc(State));
- context.OnComplete.ActivateTx(OperationId);
- }
- }
-
-public:
- TSplitMerge(TOperationId id, const TTxTransaction& tx)
- : OperationId(id)
- , Transaction(tx)
- {
- }
-
- TSplitMerge(TOperationId id, TTxState::ETxState state)
- : OperationId(id)
- , State(state)
- {
- SetState(SelectStateFunc(state));
- }
-
- bool AllocateDstForMerge(
- const NKikimrSchemeOp::TSplitMergeTablePartitions& info,
- TTxId txId,
- const TPathId& pathId,
- const TVector<ui64>& srcPartitionIdxs,
- const TTableInfo::TCPtr tableInfo,
- TTxState& op,
- const TChannelsBindings& channels,
- TString& errStr,
- TOperationContext& context)
- {
- // N source shards are merged into 1
- Y_VERIFY(srcPartitionIdxs.size() > 1);
- Y_VERIFY(info.SplitBoundarySize() == 0);
-
- if (tableInfo->GetExpectedPartitionCount() + 1 - srcPartitionIdxs.size() < tableInfo->GetMinPartitionsCount()) {
- errStr = "Reached MinPartitionsCount limit: " + ToString(tableInfo->GetMinPartitionsCount());
- return false;
- }
-
- // Check that partitions for merge are consecutive in tableInfo->Partitions (e.g. don't allow to merge #1 with #3 tree and leave #2)
- for (ui32 i = 1; i < srcPartitionIdxs.size(); ++i) {
- ui64 pi = srcPartitionIdxs[i];
- ui64 piPrev = srcPartitionIdxs[i-1];
-
- if (pi != piPrev + 1) {
- auto shardIdx = tableInfo->GetPartitions()[pi].ShardIdx;
- auto shardIdxPrev = tableInfo->GetPartitions()[piPrev].ShardIdx;
-
- errStr = TStringBuilder()
- << "Partitions are not consecutive at index " << i << " : #" << piPrev << "(" << context.SS->ShardInfos[shardIdxPrev].TabletID << ")"
- << " then #" << pi << "(" << context.SS->ShardInfos[shardIdx].TabletID << ")";
- return false;
- }
- }
-
- TString firstRangeBegin;
- if (srcPartitionIdxs[0] != 0) {
- // Take the end of previous shard
- firstRangeBegin = tableInfo->GetPartitions()[srcPartitionIdxs[0]-1].EndOfRange;
- } else {
- TVector<TCell> firstKey;
- ui32 keyColCount = 0;
- for (const auto& col : tableInfo->Columns) {
- if (col.second.IsKey()) {
- ++keyColCount;
- }
- }
- // Or start from (NULL, NULL, .., NULL)
- firstKey.resize(keyColCount);
- firstRangeBegin = TSerializedCellVec::Serialize(firstKey);
- }
-
- op.SplitDescription = std::make_shared<NKikimrTxDataShard::TSplitMergeDescription>();
- // Fill src shards
- TString prevRangeEnd = firstRangeBegin;
- for (ui64 pi : srcPartitionIdxs) {
- auto* srcRange = op.SplitDescription->AddSourceRanges();
- auto shardIdx = tableInfo->GetPartitions()[pi].ShardIdx;
- srcRange->SetShardIdx(ui64(shardIdx.GetLocalId()));
- srcRange->SetTabletID(ui64(context.SS->ShardInfos[shardIdx].TabletID));
- srcRange->SetKeyRangeBegin(prevRangeEnd);
- TString rangeEnd = tableInfo->GetPartitions()[pi].EndOfRange;
- srcRange->SetKeyRangeEnd(rangeEnd);
- prevRangeEnd = rangeEnd;
- }
-
- // Fill dst shard
- TShardInfo datashardInfo = TShardInfo::DataShardInfo(txId, pathId);
- datashardInfo.BindedChannels = channels;
-
- auto idx = context.SS->RegisterShardInfo(datashardInfo);
-
- ui64 lastSrcPartition = srcPartitionIdxs.back();
- TString lastRangeEnd = tableInfo->GetPartitions()[lastSrcPartition].EndOfRange;
-
- TTxState::TShardOperation dstShardOp(idx, ETabletType::DataShard, TTxState::CreateParts);
- dstShardOp.RangeEnd = lastRangeEnd;
- op.Shards.push_back(dstShardOp);
-
- auto* dstRange = op.SplitDescription->AddDestinationRanges();
- dstRange->SetShardIdx(ui64(idx.GetLocalId()));
- dstRange->SetKeyRangeBegin(firstRangeBegin);
- dstRange->SetKeyRangeEnd(lastRangeEnd);
-
- return true;
- }
-
- bool AllocateDstForSplit(
- const NKikimrSchemeOp::TSplitMergeTablePartitions& info,
- TTxId txId,
- const TPathId& pathId,
- ui64 srcPartitionIdx,
- const TTableInfo::TCPtr tableInfo,
- TTxState& op,
- const TChannelsBindings& channels,
- TString& errStr,
- TOperationContext& context)
- {
- // n split points produce n+1 parts
- ui64 count = info.SplitBoundarySize() + 1;
- if (count == 1) {
- errStr = "No split boundaries specified";
- return false;
- }
-
- if (tableInfo->GetExpectedPartitionCount() + count - 1 > tableInfo->GetMaxPartitionsCount()) {
- errStr = "Reached MaxPartitionsCount limit: " + ToString(tableInfo->GetMaxPartitionsCount());
- return false;
- }
-
- TShardInfo datashardInfo = TShardInfo::DataShardInfo(txId, pathId);
- datashardInfo.BindedChannels = channels;
-
- // Build vector of key column types
- TVector<NScheme::TTypeId> keyColTypeIds;
- for (const auto& col : tableInfo->Columns) {
- if (!col.second.IsKey())
- continue;
- size_t keyIdx = col.second.KeyOrder;
- keyColTypeIds.resize(Max(keyColTypeIds.size(), keyIdx+1));
- keyColTypeIds[keyIdx] = col.second.PType;
- }
-
- TVector<TString> rangeEnds;
- if (!TSchemeShard::FillSplitPartitioning(rangeEnds, keyColTypeIds, info.GetSplitBoundary(), errStr)) {
- return false;
- }
-
- // Last dst shard ends where src shard used to end
- rangeEnds.push_back(tableInfo->GetPartitions()[srcPartitionIdx].EndOfRange);
-
- op.SplitDescription = std::make_shared<NKikimrTxDataShard::TSplitMergeDescription>();
- auto* srcRange = op.SplitDescription->AddSourceRanges();
- auto srcShardIdx = tableInfo->GetPartitions()[srcPartitionIdx].ShardIdx;
- srcRange->SetShardIdx(ui64(srcShardIdx.GetLocalId()));
- srcRange->SetTabletID(ui64(context.SS->ShardInfos[srcShardIdx].TabletID));
- srcRange->SetKeyRangeEnd(tableInfo->GetPartitions()[srcPartitionIdx].EndOfRange);
-
- // Check that ranges are sorted in ascending order
- TVector<TCell> prevKey;
- if (srcPartitionIdx != 0) {
- // Take the end of previous shard
- TSerializedCellVec key(tableInfo->GetPartitions()[srcPartitionIdx-1].EndOfRange);
- prevKey.assign(key.GetCells().begin(), key.GetCells().end());
- } else {
- // Or start from (NULL, NULL, .., NULL)
- prevKey.resize(keyColTypeIds.size());
- }
- TString firstRangeBegin = TSerializedCellVec::Serialize(prevKey);
- srcRange->SetKeyRangeBegin(firstRangeBegin);
-
- for (ui32 i = 0; i < rangeEnds.size(); ++i) {
- TSerializedCellVec key(rangeEnds[i]);
- if (CompareBorders<true, true>(prevKey, key.GetCells(), true, true, keyColTypeIds) >= 0) {
- errStr = Sprintf("Partition ranges are not sorted at index %u", i);
- return false;
- }
- prevKey.assign(key.GetCells().begin(), key.GetCells().end());
- }
-
- // Allocate new datashard ids
- TString rangeBegin = firstRangeBegin;
- for (ui64 i = 0; i < count; ++i) {
- const auto idx = context.SS->RegisterShardInfo(datashardInfo);
-
- TString rangeEnd = rangeEnds[i];
- TTxState::TShardOperation dstShardOp(idx, ETabletType::DataShard, TTxState::CreateParts);
- dstShardOp.RangeEnd = rangeEnd;
- op.Shards.push_back(dstShardOp);
-
- auto* dstRange = op.SplitDescription->AddDestinationRanges();
- dstRange->SetShardIdx(ui64(idx.GetLocalId()));
- dstRange->SetKeyRangeBegin(rangeBegin);
- dstRange->SetKeyRangeEnd(rangeEnd);
-
- rangeBegin = rangeEnd;
- }
-
- return true;
- }
-
- THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override {
- const TTabletId ssId = context.SS->SelfTabletId();
-
- const auto& info = Transaction.GetSplitMergeTablePartitions();
- const ui64 dstCount = info.SplitBoundarySize() + 1;
- const ui64 srcCount = info.SourceTabletIdSize();
-
- TPathId pathId = InvalidPathId;
- if (info.HasTableOwnerId()) {
- pathId = TPathId(TOwnerId(info.GetTableOwnerId()),
- TLocalPathId(info.GetTableLocalId()));
- } else if (info.HasTableLocalId()) {
- pathId = context.SS->MakeLocalId(TLocalPathId(info.GetTableLocalId()));
- }
-
- LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TSplitMerge Propose"
- << ", tableStr: " << info.GetTablePath()
- << ", tableId: " << pathId
- << ", opId: " << OperationId
- << ", at schemeshard: " << ssId);
-
- auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), ui64(ssId));
- TString errStr;
-
- if (!info.HasTablePath() && !info.HasTableLocalId()) {
- errStr = "Neither table name nor pathId in SplitMergeInfo";
- result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
- return result;
- }
-
- TPath path = pathId
- ? TPath::Init(pathId, context.SS)
- : TPath::Resolve(info.GetTablePath(), context.SS);
- {
- TPath::TChecker checks = path.Check();
- checks
- .NotEmpty()
- .NotUnderDomainUpgrade()
- .IsAtLocalSchemeShard()
- .IsResolved()
- .NotDeleted()
- .IsTable()
- .NotUnderOperation();
-
- if (checks) {
- if (dstCount >= srcCount) { //allow over commit for merge
- checks
- .ShardsLimit(dstCount)
- .PathShardsLimit(dstCount);
- }
- }
-
- if (!checks) {
- TString explain = TStringBuilder() << "path fail checks"
- << ", path: " << path.PathString();
- auto status = checks.GetStatus(&explain);
- result->SetError(status, explain);
- return result;
- }
- }
-
- if (!context.SS->CheckApplyIf(Transaction, errStr)) {
- result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
- return result;
- }
-
- if (!context.SS->CheckLocks(path.Base()->PathId, Transaction, errStr)) {
- result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
- return result;
- }
-
- Y_VERIFY(context.SS->Tables.contains(path.Base()->PathId));
- TTableInfo::TCPtr tableInfo = context.SS->Tables.at(path.Base()->PathId);
- Y_VERIFY(tableInfo);
-
- if (tableInfo->IsBackup) {
- TString errMsg = TStringBuilder()
- << "cannot split/merge backup table " << info.GetTablePath();
- result->SetError(NKikimrScheme::StatusInvalidParameter, errMsg);
- return result;
- }
-
- const THashMap<TShardIdx, ui64>& shardIdx2partition = tableInfo->GetShard2PartitionIdx();
-
- TVector<ui64> srcPartitionIdxs;
- i64 totalSrcPartCount = 0;
- for (ui32 si = 0; si < info.SourceTabletIdSize(); ++si) {
- auto srcTabletId = TTabletId(info.GetSourceTabletId(si));
- auto srcShardIdx = context.SS->GetShardIdx(srcTabletId);
- if (!srcShardIdx) {
- TString errMsg = TStringBuilder() << "Unknown SourceTabletId: " << srcTabletId;
- result->SetError(NKikimrScheme::StatusInvalidParameter, errMsg);
- return result;
- }
-
- if (!context.SS->ShardInfos.contains(srcShardIdx)) {
- TString errMsg = TStringBuilder()
- << "shard doesn't present at schemesahrd at all"
- << ", tablet: " << srcTabletId
- << ", srcShardIdx: " << srcShardIdx
- << ", pathId: " << path.Base()->PathId;
- result->SetError(NKikimrScheme::StatusInvalidParameter, errMsg);
- return result;
- }
-
- if (!shardIdx2partition.contains(srcShardIdx)) {
- TString errMsg = TStringBuilder()
- << "shard doesn't present at schemesahrd at table"
- << ", tablet: " << srcTabletId
- << ", srcShardIdx: " << srcShardIdx
- << ", pathId: " << path.Base()->PathId;
- result->SetError(NKikimrScheme::StatusInvalidParameter, errMsg);
- return result;
- }
-
- if (context.SS->ShardInfos.FindPtr(srcShardIdx)->PathId != path.Base()->PathId || !shardIdx2partition.contains(srcShardIdx)) {
- TString errMsg = TStringBuilder() << "TabletId " << srcTabletId << " is not a partition of table " << info.GetTablePath();
- result->SetError(NKikimrScheme::StatusInvalidParameter, errMsg);
- return result;
- }
-
-
- if (context.SS->ShardIsUnderSplitMergeOp(srcShardIdx)) {
- TString errMsg = TStringBuilder() << "TabletId " << srcTabletId << " is already in process of split";
- result->SetError(NKikimrScheme::StatusMultipleModifications, errMsg);
- return result;
- }
-
- if (context.SS->SplitSettings.SplitMergePartCountLimit != -1) {
- const auto* stats = tableInfo->GetStats().PartitionStats.FindPtr(srcShardIdx);
- if (!stats || stats->ShardState != NKikimrTxDataShard::Ready) {
- TString errMsg = TStringBuilder() << "Src TabletId " << srcTabletId << " is not in Ready state";
- result->SetError(NKikimrScheme::StatusNotAvailable, errMsg);
- return result;
- }
-
- totalSrcPartCount += stats->PartCount;
- }
-
- auto pi = shardIdx2partition.at(srcShardIdx);
- Y_VERIFY_S(pi < tableInfo->GetPartitions().size(), "pi: " << pi << " partitions.size: " << tableInfo->GetPartitions().size());
- srcPartitionIdxs.push_back(pi);
- }
-
- if (context.SS->SplitSettings.SplitMergePartCountLimit != -1 &&
- totalSrcPartCount >= context.SS->SplitSettings.SplitMergePartCountLimit)
- {
- result->SetError(NKikimrScheme::StatusNotAvailable,
- Sprintf("Split/Merge operation involves too many parts: %" PRIu64, totalSrcPartCount));
-
- LOG_CRIT_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Cannot start split/merge operation"
- << " for table \"" << info.GetTablePath() << "\" (id " << path.Base()->PathId << ")"
- << " at tablet " << context.SS->TabletID()
- << " because the operation involves too many parts: " << totalSrcPartCount);
- return result;
- }
-
- if (srcPartitionIdxs.empty()) {
- result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder() << "No source partitions specified for split/merge TxId " << OperationId.GetTxId());
- return result;
- }
-
- TChannelsBindings channelsBinding;
-
- bool storePerShardConfig = false;
- NKikimrSchemeOp::TPartitionConfig perShardConfig;
-
- if (context.SS->IsStorageConfigLogic(tableInfo)) {
- TVector<TStorageRoom> storageRooms;
- THashMap<ui32, ui32> familyRooms;
- storageRooms.emplace_back(0);
-
- if (!context.SS->GetBindingsRooms(path.DomainId(), tableInfo->PartitionConfig(), storageRooms, familyRooms, channelsBinding, errStr)) {
- errStr = TString("database doesn't have required storage pools to create tablet with storage config, details: ") + errStr;
- result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
- return result;
- }
-
- storePerShardConfig = true;
- for (const auto& room : storageRooms) {
- perShardConfig.AddStorageRooms()->CopyFrom(room);
- }
- for (const auto& familyRoom : familyRooms) {
- auto* protoFamily = perShardConfig.AddColumnFamilies();
- protoFamily->SetId(familyRoom.first);
- protoFamily->SetRoom(familyRoom.second);
- }
- } else if (context.SS->IsCompatibleChannelProfileLogic(path.DomainId(), tableInfo)) {
- if (!context.SS->GetChannelsBindings(path.DomainId(), tableInfo, channelsBinding, errStr)) {
- result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
- return result;
- }
- }
-
- TTxState op;
-
- op.TxType = TTxState::TxSplitTablePartition;
- op.TargetPathId = path.Base()->PathId;
- op.State = TTxState::CreateParts;
-
- // Fill Src shards for tx
- for (ui64 pi : srcPartitionIdxs) {
- auto srcShardIdx = tableInfo->GetPartitions()[pi].ShardIdx;
- op.Shards.emplace_back(srcShardIdx, ETabletType::DataShard, TTxState::TransferData);
- }
-
- if (srcPartitionIdxs.size() == 1 && dstCount > 1) {
- // This is Split operation, allocate new shards for split Dsts
- if (!AllocateDstForSplit(info, OperationId.GetTxId(), path.Base()->PathId, srcPartitionIdxs[0], tableInfo, op, channelsBinding, errStr, context)) {
- result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
- return result;
- }
- } else if (dstCount == 1 && srcPartitionIdxs.size() > 1) {
- // This is merge, allocate 1 Dst shard
- if (!AllocateDstForMerge(info, OperationId.GetTxId(), path.Base()->PathId, srcPartitionIdxs, tableInfo, op, channelsBinding, errStr, context)) {
- result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
- return result;
- }
- } else {
- result->SetError(NKikimrScheme::StatusInvalidParameter, "Invalid request: only 1->N or N->1 are supported");
- return result;
- }
-
- ///////////
- /// Accept operation
- ///
-
- context.MemChanges.GrabNewTxState(context.SS, OperationId);
- context.MemChanges.GrabDomain(context.SS, path.DomainId());
- context.MemChanges.GrabPath(context.SS, path->PathId);
- context.MemChanges.GrabTable(context.SS, path->PathId);
-
- context.DbChanges.PersistTxState(OperationId);
- for (const auto& shard : op.Shards) {
- if (shard.Operation == TTxState::CreateParts) {
- context.MemChanges.GrabNewShard(context.SS, shard.Idx);
- } else {
- context.MemChanges.GrabShard(context.SS, shard.Idx);
- }
- context.DbChanges.PersistShard(shard.Idx);
- }
-
- TTableInfo::TPtr mutableTableInfo = context.SS->Tables.at(path->PathId);
-
- mutableTableInfo->RegisterSplitMegreOp(OperationId, op);
- context.SS->CreateTx(OperationId, TTxState::TxSplitTablePartition, path->PathId) = op;
- context.OnComplete.ActivateTx(OperationId);
-
- for (const auto& shard : op.Shards) {
- Y_VERIFY(shard.Operation == TTxState::TransferData || shard.Operation == TTxState::CreateParts);
- // Add new (DST) shards to the list of all shards and update LastTxId for the old (SRC) shards
- Y_VERIFY(context.SS->ShardInfos.contains(shard.Idx));
- TShardInfo& shardInfo = context.SS->ShardInfos[shard.Idx];
- shardInfo.CurrentTxId = OperationId.GetTxId();
-
- if (shard.Operation == TTxState::CreateParts) {
- if (storePerShardConfig) {
- mutableTableInfo->PerShardPartitionConfig[shard.Idx].CopyFrom(perShardConfig);
- }
- }
- }
-
- path.DomainInfo()->AddInternalShards(op); //allow over commit for merge
- path->IncShardsInside(dstCount);
-
- State = NextState();
- SetState(SelectStateFunc(State));
- return result;
- }
-
- void AbortPropose(TOperationContext&) override {
- Y_FAIL("no AbortPropose for TSplitMerge");
- }
-
- void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override {
- LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TSplitMerge AbortUnsafe"
- << ", opId: " << OperationId
- << ", forceDropId: " << forceDropTxId
- << ", at schemeshard: " << context.SS->TabletID());
-
- TTxState* txState = context.SS->FindTx(OperationId);
- Y_VERIFY(txState);
-
- TPathId pathId = txState->TargetPathId;
- Y_VERIFY(context.SS->PathsById.contains(pathId));
- TPathElement::TPtr path = context.SS->PathsById.at(pathId);
- Y_VERIFY(path);
-
- Y_VERIFY(context.SS->Tables.contains(pathId));
- TTableInfo::TPtr tableInfo = context.SS->Tables.at(pathId);
- Y_VERIFY(tableInfo);
- tableInfo->AbortSplitMergeOp(OperationId);
-
- context.OnComplete.DoneOperation(OperationId);
- }
-};
-
-}
-
-namespace NKikimr {
-namespace NSchemeShard {
-
-ISubOperationBase::TPtr CreateSplitMerge(TOperationId id, const TTxTransaction& tx) {
- return new TSplitMerge(id, tx);
-}
-
-ISubOperationBase::TPtr CreateSplitMerge(TOperationId id, TTxState::ETxState state) {
- Y_VERIFY(state != TTxState::Invalid);
- return new TSplitMerge(id, state);
-}
-
-}
-}
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 8059d196d7f..ccaf7805d0f 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -5650,7 +5650,7 @@ void TSchemeShard::FillTableDescriptionForShardIdx(
const TTableInfo::TPtr tinfo = Tables.at(tableId);
TPathElement::TPtr pinfo = *PathsById.FindPtr(tableId);
- TVector<ui32> keyColumnIds = tinfo->FillDescription(pinfo);
+ TVector<ui32> keyColumnIds = tinfo->FillDescriptionCache(pinfo);
if (!tinfo->TableDescription.HasPath()) {
tinfo->TableDescription.SetPath(PathToString(pinfo));
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
index 67c209da1e1..b5a9000841d 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
@@ -276,7 +276,15 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData(
return alterData;
}
-TVector<ui32> TTableInfo::FillDescription(TPathElement::TPtr pathInfo) {
+void TTableInfo::ResetDescriptionCache() {
+ TableDescription.ClearId_Deprecated();
+ TableDescription.ClearPathId();
+ TableDescription.ClearName();
+ TableDescription.ClearColumns();
+ TableDescription.ClearKeyColumnIds();
+}
+
+TVector<ui32> TTableInfo::FillDescriptionCache(TPathElement::TPtr pathInfo) {
Y_VERIFY(pathInfo && pathInfo->IsTable());
TVector<ui32> keyColumnIds;
@@ -1197,11 +1205,7 @@ void TTableInfo::FinishAlter() {
}
// Force FillDescription to regenerate TableDescription
- TableDescription.ClearId_Deprecated();
- TableDescription.ClearPathId();
- TableDescription.ClearName();
- TableDescription.ClearColumns();
- TableDescription.ClearKeyColumnIds();
+ ResetDescriptionCache();
AlterData.Reset();
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 9175c1b86e3..cae07292f81 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -479,7 +479,8 @@ public:
}
}
- TVector<ui32> FillDescription(TPathElement::TPtr pathInfo);
+ void ResetDescriptionCache();
+ TVector<ui32> FillDescriptionCache(TPathElement::TPtr pathInfo);
void SetRoom(const TStorageRoom& room) {
// WARNING: this is legacy support code
diff --git a/ydb/services/ydb/ydb_table_split_ut.cpp b/ydb/services/ydb/ydb_table_split_ut.cpp
index f492820f7a1..401cc36dd2d 100644
--- a/ydb/services/ydb/ydb_table_split_ut.cpp
+++ b/ydb/services/ydb/ydb_table_split_ut.cpp
@@ -1,6 +1,7 @@
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
+#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/client/flat_ut_client.h>
@@ -419,4 +420,176 @@ Y_UNIT_TEST_SUITE(YdbTableSplit) {
}
UNIT_ASSERT_C(shardsAfter < shardsBefore, "Merge didn't happen!!11 O_O");
}
+
+ Y_UNIT_TEST(RenameTablesAndSplit) {
+ // KIKIMR-14636
+
+ NDataShard::gDbStatsReportInterval = TDuration::Seconds(2);
+ NDataShard::gDbStatsDataSizeResolution = 10;
+ NDataShard::gDbStatsRowCountResolution = 10;
+
+ TIntrusivePtr<ITimeProvider> originalTimeProvider = NKikimr::TAppData::TimeProvider;
+ TIntrusivePtr<TTestTimeProvider> testTimeProvider = new TTestTimeProvider(originalTimeProvider);
+ NKikimr::TAppData::TimeProvider = testTimeProvider;
+
+ TKikimrWithGrpcAndRootSchemaNoSystemViews server;
+ server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_NOTICE);
+ server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_NOTICE);
+ server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_NOTICE);
+
+ auto connection = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
+
+ NYdb::NTable::TTableClient client(connection);
+
+ auto sessionResult = client.CreateSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
+ auto session = sessionResult.GetSession();
+
+ {
+ auto query = TStringBuilder() << R"(
+ --!syntax_v1
+ CREATE TABLE `/Root/Foo` (
+ NameHash Uint32,
+ Name Utf8,
+ Version Uint32,
+ `Timestamp` Int64,
+ Data String,
+ PRIMARY KEY (NameHash, Name)
+ ) WITH ( UNIFORM_PARTITIONS = 2 );)";
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+
+ Cerr << result.GetIssues().ToString();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ { // prepare for split
+ auto query = TStringBuilder() << R"(
+ --!syntax_v1
+ ALTER TABLE `/Root/Foo`
+ SET (
+ AUTO_PARTITIONING_BY_SIZE = ENABLED,
+ AUTO_PARTITIONING_PARTITION_SIZE_MB = 1,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
+ AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 10
+ );)";
+
+ auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+
+ Cerr << result.GetIssues().ToString();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ ui64 partitions = 2;
+ do { // wait until merge
+ Cerr << "Fast forward 1m" << Endl;
+ testTimeProvider->AddShift(TDuration::Minutes(2));
+ Sleep(TDuration::Seconds(3));
+
+ auto result = session.DescribeTable("/Root/Foo", NYdb::NTable::TDescribeTableSettings().WithTableStatistics(true)).ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+ partitions = result.GetTableDescription().GetPartitionsCount();
+ Cerr << "partitions " << partitions << Endl;
+
+ } while (partitions == 2);
+
+ { //rename
+ auto result = session.RenameTables({{"/Root/Foo", "/Root/Bar"}}).ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ { // add data for triger split
+ int key = 0;
+ for (int i = 0 ; i < 100; ++i) {
+ TValueBuilder rows;
+ rows.BeginList();
+ for (int j = 0; j < 500; ++j) {
+ key += 1;
+ TString name = "key " + ToString(key);
+
+ rows.AddListItem()
+ .BeginStruct()
+ .AddMember("NameHash").Uint32(MurmurHash<ui32>(name.data(), name.size()))
+ .AddMember("Name").Utf8(name)
+ .AddMember("Version").Uint32(key%5)
+ .AddMember("Timestamp").Int64(key%10)
+ .EndStruct();
+ }
+ rows.EndList();
+
+ auto result = client.BulkUpsert("/Root/Bar", rows.Build()).ExtractValueSync();
+
+ if (!result.IsSuccess() && result.GetStatus() != NYdb::EStatus::OVERLOADED) {
+ TString err = result.GetIssues().ToString();
+ Cerr << result.GetStatus() << ": " << err << Endl;
+ }
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ }
+ }
+
+ server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_DEBUG);
+ server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG);
+
+ partitions = 1;
+ do { // wait until split
+ Cerr << "Fast forward 1m" << Endl;
+ testTimeProvider->AddShift(TDuration::Minutes(1));
+ Sleep(TDuration::Seconds(3));
+
+ auto result = session.DescribeTable("/Root/Bar", NYdb::NTable::TDescribeTableSettings().WithTableStatistics(true)).ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+ partitions = result.GetTableDescription().GetPartitionsCount();
+ Cerr << "partitions " << partitions << Endl;
+
+ } while (partitions == 1);
+
+
+ { // fail if shema has been broken
+ TString readQuery =
+ "SELECT * FROM `/Root/Bar`;";
+
+ TExecDataQuerySettings querySettings;
+ querySettings.KeepInQueryCache(true);
+
+ auto result = session.ExecuteDataQuery(
+ readQuery,
+ TTxControl::BeginTx().CommitTx(),
+ querySettings)
+ .ExtractValueSync();
+
+ if (!result.IsSuccess() && result.GetStatus() != NYdb::EStatus::OVERLOADED) {
+ TString err = result.GetIssues().ToString();
+ Cerr << result.GetStatus() << ": " << err << Endl;
+ }
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ {
+ auto asyncDescDir = NYdb::NScheme::TSchemeClient(connection).ListDirectory("/Root");
+ asyncDescDir.Wait();
+ const auto& val = asyncDescDir.GetValue();
+ auto entry = val.GetEntry();
+ UNIT_ASSERT_EQUAL(entry.Name, "Root");
+ UNIT_ASSERT_EQUAL(entry.Type, NYdb::NScheme::ESchemeEntryType::Directory);
+
+ auto children = val.GetChildren();
+ UNIT_ASSERT_EQUAL_C(children.size(), 1, children.size());
+ for (const auto& child: children) {
+ UNIT_ASSERT_EQUAL(child.Type, NYdb::NScheme::ESchemeEntryType::Table);
+
+ auto result = session.DropTable(TStringBuilder() << "Root" << "/" << child.Name).ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetStatus());
+ }
+ }
+ }
}