diff options
author | svc <svc@yandex-team.ru> | 2022-04-04 11:49:07 +0300 |
---|---|---|
committer | svc <svc@yandex-team.ru> | 2022-04-04 11:49:07 +0300 |
commit | 5168b8eda9113d0673ebddd789f7315f12deab64 (patch) | |
tree | 59b171fcb6a6d26685a608ed5e059be0d08047c4 | |
parent | 3de3baa5532f2d6a78b7ded20a65895cb6dff8a4 (diff) | |
download | ydb-5168b8eda9113d0673ebddd789f7315f12deab64.tar.gz |
KIKIMR-14636 slit after move
ref:339a725351ef2ba28e38916a12ff0055f9df2fe5
6 files changed, 187 insertions, 1032 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp index b657f792c91..1c68baad915 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp @@ -228,6 +228,7 @@ public: Y_VERIFY(context.SS->Tables.contains(srcPath.Base()->PathId)); TTableInfo::TPtr tableInfo = new TTableInfo(*context.SS->Tables.at(srcPath.Base()->PathId)); + tableInfo->ResetDescriptionCache(); tableInfo->AlterVersion += 1; // copy table info diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp.autosave b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp.autosave deleted file mode 100644 index 07d9913782b..00000000000 --- a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp.autosave +++ /dev/null @@ -1,1024 +0,0 @@ -#include "schemeshard__operation_part.h" -#include "schemeshard__operation_common.h" -#include "schemeshard_impl.h" - -#include <ydb/core/base/subdomain.h> - -namespace { - -using namespace NKikimr; -using namespace NSchemeShard; - -class TConfigureDestination: public TSubOperationState { -private: - TOperationId OperationId; - - TString DebugHint() const override { - return TStringBuilder() - << "TSplitMerge TConfigureDestination" - << " operationId#" << OperationId; - } - -public: - TConfigureDestination(TOperationId id) - : OperationId(id) - { - IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType}); - } - - bool HandleReply(TEvDataShard::TEvInitSplitMergeDestinationAck::TPtr& ev, TOperationContext& context) override { - TTabletId ssId = context.SS->SelfTabletId(); - - LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - DebugHint() << " HandleReply TEvInitSplitMergeDestinationAck" - << ", operationId: " << OperationId - << ", at schemeshard: " << ssId - << " message# " << ev->Get()->Record.ShortDebugString()); - - NIceDb::TNiceDb db(context.Txc.DB); - - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState); - Y_VERIFY(txState->TxType == TTxState::TxSplitTablePartition || txState->TxType == TTxState::TxMergeTablePartition); - Y_VERIFY(txState->State == TTxState::ConfigureParts); - - TTabletId tabletId = TTabletId(ev->Get()->Record.GetTabletId()); - TShardIdx idx = context.SS->MustGetShardIdx(tabletId); - if (!idx) { - LOG_ERROR(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Tablet %" PRIu64 " is not known in TxId %" PRIu64, - tabletId, OperationId.GetTxId()); - return false; - } - - if (!context.SS->ShardInfos.contains(idx)) { - LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - DebugHint() << " Got InitSplitMergeDestinationAck" - << " for unknown shard idx " << idx - << " tabletId " << tabletId); - return false; - } -https://st.yandex-team.ru/KIKIMR-13104 - txState->ShardsInProgress.erase(idx); - - context.OnComplete.UnbindMsgFromPipe(OperationId, tabletId, idx); - - // If all dst datashards have been initialized - if (txState->ShardsInProgress.empty()) { - context.SS->ChangeTxState(db, OperationId, TTxState::TransferData); - context.OnComplete.ActivateTx(OperationId); - return true; - } - - return false; - } - - bool ProgressState(TOperationContext& context) override { - TTabletId ssId = context.SS->SelfTabletId(); - - LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TSplitMerge TConfigureDestination ProgressState" - << ", operationId: " << OperationId - << ", at schemeshard: " << ssId); - - TTxState* txState = context.SS->TxInFlight.FindPtr(OperationId); - Y_VERIFY(txState); - Y_VERIFY(txState->TxType == TTxState::TxSplitTablePartition || txState->TxType == TTxState::TxMergeTablePartition); - Y_VERIFY(txState->State == TTxState::ConfigureParts); - - txState->ClearShardsInProgress(); - - // A helper hash map translating tablet id to destination range index - THashMap<TTabletId, ui32> dstTabletToRangeIdx; - - auto getDstRangeIdx = [&dstTabletToRangeIdx] (TTabletId tabletId) -> ui32 { - auto it = dstTabletToRangeIdx.find(tabletId); - Y_VERIFY_S(it != dstTabletToRangeIdx.end(), - "Cannot find range info for destination tablet " << tabletId); - return it->second; - }; - - // Fill tablet ids in split description - NKikimrTxDataShard::TSplitMergeDescription& splitDescr = *txState->SplitDescription; - for (ui32 i = 0; i < splitDescr.DestinationRangesSize(); ++i) { - auto* rangeDescr = splitDescr.MutableDestinationRanges()->Mutable(i); - auto shardIdx = context.SS->MakeLocalId(TLocalShardIdx(rangeDescr->GetShardIdx())); - auto datashardId = context.SS->ShardInfos[shardIdx].TabletID; - rangeDescr->SetTabletID(ui64(datashardId)); - dstTabletToRangeIdx[datashardId] = i; - } - - // Save updated split description - TString extraData; - bool serializeRes = txState->SplitDescription->SerializeToString(&extraData); - Y_VERIFY(serializeRes); - NIceDb::TNiceDb db(context.Txc.DB); - db.Table<Schema::TxInFlightV2>().Key(OperationId.GetTxId(), OperationId.GetSubTxId()).Update( - NIceDb::TUpdate<Schema::TxInFlightV2::ExtraBytes>(extraData)); - - const auto tableInfo = context.SS->Tables.FindPtr(txState->TargetPathId); - Y_VERIFY(tableInfo); - - const ui64 alterVersion = (*tableInfo)->AlterVersion; - - const ui64 subDomainPathId = context.SS->ResolveDomainId(txState->TargetPathId).LocalPathId; - - for (const auto& shard: txState->Shards) { - // Skip src shard - if (shard.Operation != TTxState::CreateParts) { - continue; - } - - TTabletId datashardId = context.SS->ShardInfos[shard.Idx].TabletID; - - LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Initializing scheme " - << "on dst datashard: " << datashardId - << " splitOp: " << OperationId - << " alterVersion: " << alterVersion - << " at tablet: " << context.SS->TabletID()); - - const ui32 rangeIdx = getDstRangeIdx(datashardId); - const auto& rangeDescr = splitDescr.GetDestinationRanges(rangeIdx); - - // For each destination shard we construct an individual description - // that contains all src shards and only this one dst shard with its range - NKikimrTxDataShard::TSplitMergeDescription splitDescForShard; - splitDescForShard.MutableSourceRanges()->CopyFrom(txState->SplitDescription->GetSourceRanges()); - splitDescForShard.AddDestinationRanges()->CopyFrom(rangeDescr); - - Y_VERIFY(txState->SplitDescription); - THolder<TEvDataShard::TEvInitSplitMergeDestination> event = - THolder(new TEvDataShard::TEvInitSplitMergeDestination(ui64(OperationId.GetTxId()), context.SS->TabletID(), - subDomainPathId, - splitDescForShard, - context.SS->SelectProcessingPrarams(txState->TargetPathId))); - - // Add a new-style CreateTable with correct per-shard settings - // WARNING: legacy datashard will ignore this and use the schema - // received from some source datashard instead, so schemas must not - // diverge during a migration period. That's ok though, since - // schemas may only become incompatible after column family storage - // configuration is altered, and it's protected with a feature flag. - auto tableDesc = event->Record.MutableCreateTable(); - context.SS->FillTableDescriptionForShardIdx( - txState->TargetPathId, - shard.Idx, - tableDesc, - rangeDescr.GetKeyRangeBegin(), - rangeDescr.GetKeyRangeEnd(), - true, false); - context.SS->FillTableSchemaVersion(alterVersion, tableDesc); - - context.OnComplete.BindMsgToPipe(OperationId, datashardId, shard.Idx, event.Release()); - } - - txState->UpdateShardsInProgress(TTxState::CreateParts); - return false; - } -}; - - -class TTranserData: public TSubOperationState { -private: - TOperationId OperationId; - - TString DebugHint() const override { - return TStringBuilder() - << "TSplitMerge TTranserData" - << " operationId#" << OperationId; - } - -public: - TTranserData(TOperationId id) - : OperationId(id) - { - IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType, TEvDataShard::TEvInitSplitMergeDestinationAck::EventType}); - } - - bool HandleReply(TEvDataShard::TEvSplitAck::TPtr& ev, TOperationContext& context) override { - TTabletId ssId = context.SS->SelfTabletId(); - - LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - DebugHint() << " HandleReply TEvSplitAck" - << ", at schemeshard: " << ssId - << ", message: " << ev->Get()->Record.ShortDebugString()); - - NIceDb::TNiceDb db(context.Txc.DB); - - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState); - Y_VERIFY(txState->TxType == TTxState::TxSplitTablePartition || txState->TxType == TTxState::TxMergeTablePartition); - Y_VERIFY(txState->State == TTxState::TransferData); - - auto tabletId = TTabletId(ev->Get()->Record.GetTabletId()); - auto srcShardIdx = context.SS->GetShardIdx(tabletId); - if (!srcShardIdx) { - LOG_ERROR(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Tablet %" PRIu64 " is not known in TxId %" PRIu64, - tabletId, OperationId.GetTxId()); - return false; - } - - if (!context.SS->ShardInfos.contains(srcShardIdx)) { - LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - DebugHint() << " Got SplitAck for unknown shard" - << " idx " << srcShardIdx - << " tabletId " << tabletId); - return false; - } - - txState->ShardsInProgress.erase(srcShardIdx); - context.OnComplete.UnbindMsgFromPipe(OperationId, tabletId, srcShardIdx); - - if (!txState->ShardsInProgress.empty()) { - // TODO: verify that we only wait for Src shards - return false; - } - - // Switch table partitioning: exclude src shard and include all dst shards - TPathId tableId = txState->TargetPathId; - TTableInfo::TPtr tableInfo = *context.SS->Tables.FindPtr(tableId); - Y_VERIFY(tableInfo); - - // Replace all Src datashard(s) with Dst datashard(s) - TVector<TTableShardInfo> newPartitioning; - THashSet<TShardIdx> allSrcShardIdxs; - for (const auto& txShard : txState->Shards) { - if (txShard.Operation == TTxState::TransferData) - allSrcShardIdxs.insert(txShard.Idx); - } - bool dstAdded = false; - for (const auto& shard : tableInfo->GetPartitions()) { - if (allSrcShardIdxs.contains(shard.ShardIdx)) { - if (dstAdded) - continue; - for (const auto& txShard : txState->Shards) { - if (txShard.Operation != TTxState::CreateParts) - continue; - - // TODO: make sure dst are sorted by range end - Y_VERIFY(context.SS->ShardInfos.contains(txShard.Idx)); - TTableShardInfo dst(txShard.Idx, txShard.RangeEnd); - newPartitioning.push_back(dst); - } - dstAdded = true; - } else { - newPartitioning.push_back(shard); - } - } - - auto oldAggrStats = tableInfo->GetStats().Aggregated; - - // Delete the whole old partitioning and persist the whole new partitionig as the indexes have changed - context.SS->DeleteTablePartitioning(db, tableId, tableInfo); - context.SS->SetPartitioning(tableId, tableInfo, std::move(newPartitioning)); - context.SS->PersistTablePartitioning(db, tableId, tableInfo); - context.SS->PersistTablePartitionStats(db, tableId, tableInfo); - context.SS->TabletCounters->Simple()[COUNTER_TABLE_SHARD_ACTIVE_COUNT].Sub(allSrcShardIdxs.size()); - context.SS->TabletCounters->Simple()[COUNTER_TABLE_SHARD_INACTIVE_COUNT].Add(allSrcShardIdxs.size()); - - if (!tableInfo->IsBackup && !tableInfo->IsShardsStatsDetached()) { - auto newAggrStats = tableInfo->GetStats().Aggregated; - auto subDomainId = context.SS->ResolveDomainId(tableId); - auto subDomainInfo = context.SS->ResolveDomainInfo(tableId); - subDomainInfo->AggrDiskSpaceUsage(context.SS, newAggrStats, oldAggrStats); - if (subDomainInfo->CheckDiskSpaceQuotas(context.SS)) { - context.SS->PersistSubDomainState(db, subDomainId, *subDomainInfo); - context.OnComplete.PublishToSchemeBoard(OperationId, subDomainId); - } - } - - Y_VERIFY(txState->ShardsInProgress.empty(), "All shards should have already completed their steps"); - - context.SS->ChangeTxState(db, OperationId, TTxState::NotifyPartitioningChanged); - context.OnComplete.ActivateTx(OperationId); - - context.OnComplete.PublishToSchemeBoard(OperationId, tableId); - - return true; - } - - bool ProgressState(TOperationContext& context) override { - TTabletId ssId = context.SS->SelfTabletId(); - - LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - DebugHint() << " ProgressState" - << ", at schemeshard: " << ssId); - - TTxState* txState = context.SS->TxInFlight.FindPtr(OperationId); - Y_VERIFY(txState); - Y_VERIFY(txState->TxType == TTxState::TxSplitTablePartition || txState->TxType == TTxState::TxMergeTablePartition); - Y_VERIFY(txState->State == TTxState::TransferData); - - txState->ClearShardsInProgress(); - - for (const auto& shard : txState->Shards) { - // Skip Dst shards - if (shard.Operation != TTxState::TransferData) - continue; - - auto datashardId = context.SS->ShardInfos[shard.Idx].TabletID; - - LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - DebugHint() << " Starting split on src datashard " << datashardId - << " splitOpId# " << OperationId - << " at tablet " << context.SS->TabletID()); - - THolder<TEvDataShard::TEvSplit> event = - THolder(new TEvDataShard::TEvSplit(ui64(OperationId.GetTxId()))); - - Y_VERIFY(txState->SplitDescription); - event->Record.MutableSplitDescription()->CopyFrom(*txState->SplitDescription); - - context.OnComplete.BindMsgToPipe(OperationId, datashardId, shard.Idx, event.Release()); - } - - txState->UpdateShardsInProgress(TTxState::TransferData); - return false; - } -}; - -class TNotifySrc: public TSubOperationState { -private: - TOperationId OperationId; - - TString DebugHint() const override { - return TStringBuilder() - << "TSplitMerge TNotifySrc" - << ", operationId: " << OperationId; - } -public: - TNotifySrc(TOperationId id) - : OperationId(id) - { - IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType, TEvDataShard::TEvInitSplitMergeDestinationAck::EventType, TEvDataShard::TEvSplitAck::EventType}); - } - - - bool HandleReply(TEvDataShard::TEvSplitPartitioningChangedAck::TPtr& ev, TOperationContext& context) override { - auto ssId = context.SS->SelfTabletId(); - auto tabletId = TTabletId(ev->Get()->Record.GetTabletId()); - - LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - DebugHint() << " HandleReply TEvSplitPartitioningChangedAck" - << ", from datashard: " << tabletId - << ", at schemeshard: " << ssId); - - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState); - Y_VERIFY(txState->TxType == TTxState::TxSplitTablePartition || txState->TxType == TTxState::TxMergeTablePartition); - Y_VERIFY(txState->State == TTxState::NotifyPartitioningChanged); - - - auto idx = context.SS->GetShardIdx(tabletId); - if (!idx) { - LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Datashard is not listed in tablet to shard map" - << ", datashard: " << tabletId - << ", opId: " << OperationId); - return false; - } - - if (!txState->ShardsInProgress.contains(idx)) { - // TODO: verify that this is a repeated event from known Src shard, not a random one - LOG_INFO(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Got SplitPartitioningChangedAck from shard %" PRIu64 " that is not a part ot Tx %" PRIu64, tabletId, OperationId.GetTxId()); - return false; - } - - txState->ShardsInProgress.erase(idx); - context.OnComplete.UnbindMsgFromPipe(OperationId, tabletId, idx); - - if (!txState->ShardsInProgress.empty()) { - // TODO: verify that we only wait for Src shards - return false; - } - - context.SS->DeleteSplitOp(OperationId, *txState); - - context.OnComplete.DoneOperation(OperationId); - return true; - } - - bool ProgressState(TOperationContext& context) override { - //By this moment the partitioning scheme of the table has been switched and is visible to the users - //We just want notify Src shard that it should reject all new transactions and return SchemeChanged error - TTabletId ssId = context.SS->SelfTabletId(); - - LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - DebugHint() << " ProgressState" - << ", at schemeshard: " << ssId); - - TTxState* txState = context.SS->TxInFlight.FindPtr(OperationId); - Y_VERIFY(txState); - Y_VERIFY(txState->TxType == TTxState::TxSplitTablePartition || txState->TxType == TTxState::TxMergeTablePartition); - Y_VERIFY(txState->State == TTxState::NotifyPartitioningChanged); - -// Y_VERIFY(txState->Notify.Empty(), "All notifications for split op shouldn't have been sent before switching to NotifyPartitioningChanged state"); - - txState->ClearShardsInProgress(); - - bool needToNotifySrc = false; - - for (const auto& shard : txState->Shards) { - // Skip Dst shards - if (shard.Operation != TTxState::TransferData) { - continue; - } - - if (!context.SS->ShardInfos.contains(shard.Idx) || context.SS->ShardDeleter.Has(shard.Idx)) { - LOG_DEBUG(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Src datashard idx %" PRIu64 " for splitOp# %" PRIu64 " is already deleted or is intended to at tablet %" PRIu64, - shard.Idx, OperationId.GetTxId(), context.SS->TabletID()); - continue; - } - - auto datashardId = context.SS->ShardInfos[shard.Idx].TabletID; - - needToNotifySrc = true; - LOG_DEBUG(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Notify src datashard %" PRIu64 " on partitioning changed splitOp# %" PRIu64 " at tablet %" PRIu64, - datashardId, OperationId.GetTxId(), context.SS->TabletID()); - - THolder<TEvDataShard::TEvSplitPartitioningChanged> event = MakeHolder<TEvDataShard::TEvSplitPartitioningChanged>(ui64(OperationId.GetTxId())); - - context.OnComplete.BindMsgToPipe(OperationId, datashardId, shard.Idx, event.Release()); - - txState->ShardsInProgress.insert(shard.Idx); - } - - if (!needToNotifySrc) { - // The Src datashard could have already completed all its work, reported Offline state and got deleted - // In this case the transaction is finished because this was the last step - context.SS->DeleteSplitOp(OperationId, *txState); - - context.OnComplete.DoneOperation(OperationId); - return true; - } - - return false; - } -}; - -class TSplitMerge: public TSubOperation { -private: - const TOperationId OperationId; - const TTxTransaction Transaction; - TTxState::ETxState State = TTxState::Invalid; - - TTxState::ETxState NextState() { - return TTxState::CreateParts; - } - - TTxState::ETxState NextState(TTxState::ETxState state) { - switch(state) { - case TTxState::CreateParts: - return TTxState::ConfigureParts; - case TTxState::ConfigureParts: - return TTxState::TransferData; - case TTxState::TransferData: - return TTxState::NotifyPartitioningChanged; - case TTxState::NotifyPartitioningChanged: - return TTxState::Done; - default: - return TTxState::Invalid; - } - } - - TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) { - switch(state) { - case TTxState::CreateParts: - return THolder(new TCreateParts(OperationId)); - case TTxState::ConfigureParts: - return THolder(new TConfigureDestination(OperationId)); - case TTxState::TransferData: - return THolder(new TTranserData(OperationId)); - case TTxState::NotifyPartitioningChanged: - return THolder(new TNotifySrc(OperationId)); - default: - return nullptr; - } - } - - void StateDone(TOperationContext& context) override { - State = NextState(State); - - if (State != TTxState::Invalid) { - SetState(SelectStateFunc(State)); - context.OnComplete.ActivateTx(OperationId); - } - } - -public: - TSplitMerge(TOperationId id, const TTxTransaction& tx) - : OperationId(id) - , Transaction(tx) - { - } - - TSplitMerge(TOperationId id, TTxState::ETxState state) - : OperationId(id) - , State(state) - { - SetState(SelectStateFunc(state)); - } - - bool AllocateDstForMerge( - const NKikimrSchemeOp::TSplitMergeTablePartitions& info, - TTxId txId, - const TPathId& pathId, - const TVector<ui64>& srcPartitionIdxs, - const TTableInfo::TCPtr tableInfo, - TTxState& op, - const TChannelsBindings& channels, - TString& errStr, - TOperationContext& context) - { - // N source shards are merged into 1 - Y_VERIFY(srcPartitionIdxs.size() > 1); - Y_VERIFY(info.SplitBoundarySize() == 0); - - if (tableInfo->GetExpectedPartitionCount() + 1 - srcPartitionIdxs.size() < tableInfo->GetMinPartitionsCount()) { - errStr = "Reached MinPartitionsCount limit: " + ToString(tableInfo->GetMinPartitionsCount()); - return false; - } - - // Check that partitions for merge are consecutive in tableInfo->Partitions (e.g. don't allow to merge #1 with #3 tree and leave #2) - for (ui32 i = 1; i < srcPartitionIdxs.size(); ++i) { - ui64 pi = srcPartitionIdxs[i]; - ui64 piPrev = srcPartitionIdxs[i-1]; - - if (pi != piPrev + 1) { - auto shardIdx = tableInfo->GetPartitions()[pi].ShardIdx; - auto shardIdxPrev = tableInfo->GetPartitions()[piPrev].ShardIdx; - - errStr = TStringBuilder() - << "Partitions are not consecutive at index " << i << " : #" << piPrev << "(" << context.SS->ShardInfos[shardIdxPrev].TabletID << ")" - << " then #" << pi << "(" << context.SS->ShardInfos[shardIdx].TabletID << ")"; - return false; - } - } - - TString firstRangeBegin; - if (srcPartitionIdxs[0] != 0) { - // Take the end of previous shard - firstRangeBegin = tableInfo->GetPartitions()[srcPartitionIdxs[0]-1].EndOfRange; - } else { - TVector<TCell> firstKey; - ui32 keyColCount = 0; - for (const auto& col : tableInfo->Columns) { - if (col.second.IsKey()) { - ++keyColCount; - } - } - // Or start from (NULL, NULL, .., NULL) - firstKey.resize(keyColCount); - firstRangeBegin = TSerializedCellVec::Serialize(firstKey); - } - - op.SplitDescription = std::make_shared<NKikimrTxDataShard::TSplitMergeDescription>(); - // Fill src shards - TString prevRangeEnd = firstRangeBegin; - for (ui64 pi : srcPartitionIdxs) { - auto* srcRange = op.SplitDescription->AddSourceRanges(); - auto shardIdx = tableInfo->GetPartitions()[pi].ShardIdx; - srcRange->SetShardIdx(ui64(shardIdx.GetLocalId())); - srcRange->SetTabletID(ui64(context.SS->ShardInfos[shardIdx].TabletID)); - srcRange->SetKeyRangeBegin(prevRangeEnd); - TString rangeEnd = tableInfo->GetPartitions()[pi].EndOfRange; - srcRange->SetKeyRangeEnd(rangeEnd); - prevRangeEnd = rangeEnd; - } - - // Fill dst shard - TShardInfo datashardInfo = TShardInfo::DataShardInfo(txId, pathId); - datashardInfo.BindedChannels = channels; - - auto idx = context.SS->RegisterShardInfo(datashardInfo); - - ui64 lastSrcPartition = srcPartitionIdxs.back(); - TString lastRangeEnd = tableInfo->GetPartitions()[lastSrcPartition].EndOfRange; - - TTxState::TShardOperation dstShardOp(idx, ETabletType::DataShard, TTxState::CreateParts); - dstShardOp.RangeEnd = lastRangeEnd; - op.Shards.push_back(dstShardOp); - - auto* dstRange = op.SplitDescription->AddDestinationRanges(); - dstRange->SetShardIdx(ui64(idx.GetLocalId())); - dstRange->SetKeyRangeBegin(firstRangeBegin); - dstRange->SetKeyRangeEnd(lastRangeEnd); - - return true; - } - - bool AllocateDstForSplit( - const NKikimrSchemeOp::TSplitMergeTablePartitions& info, - TTxId txId, - const TPathId& pathId, - ui64 srcPartitionIdx, - const TTableInfo::TCPtr tableInfo, - TTxState& op, - const TChannelsBindings& channels, - TString& errStr, - TOperationContext& context) - { - // n split points produce n+1 parts - ui64 count = info.SplitBoundarySize() + 1; - if (count == 1) { - errStr = "No split boundaries specified"; - return false; - } - - if (tableInfo->GetExpectedPartitionCount() + count - 1 > tableInfo->GetMaxPartitionsCount()) { - errStr = "Reached MaxPartitionsCount limit: " + ToString(tableInfo->GetMaxPartitionsCount()); - return false; - } - - TShardInfo datashardInfo = TShardInfo::DataShardInfo(txId, pathId); - datashardInfo.BindedChannels = channels; - - // Build vector of key column types - TVector<NScheme::TTypeId> keyColTypeIds; - for (const auto& col : tableInfo->Columns) { - if (!col.second.IsKey()) - continue; - size_t keyIdx = col.second.KeyOrder; - keyColTypeIds.resize(Max(keyColTypeIds.size(), keyIdx+1)); - keyColTypeIds[keyIdx] = col.second.PType; - } - - TVector<TString> rangeEnds; - if (!TSchemeShard::FillSplitPartitioning(rangeEnds, keyColTypeIds, info.GetSplitBoundary(), errStr)) { - return false; - } - - // Last dst shard ends where src shard used to end - rangeEnds.push_back(tableInfo->GetPartitions()[srcPartitionIdx].EndOfRange); - - op.SplitDescription = std::make_shared<NKikimrTxDataShard::TSplitMergeDescription>(); - auto* srcRange = op.SplitDescription->AddSourceRanges(); - auto srcShardIdx = tableInfo->GetPartitions()[srcPartitionIdx].ShardIdx; - srcRange->SetShardIdx(ui64(srcShardIdx.GetLocalId())); - srcRange->SetTabletID(ui64(context.SS->ShardInfos[srcShardIdx].TabletID)); - srcRange->SetKeyRangeEnd(tableInfo->GetPartitions()[srcPartitionIdx].EndOfRange); - - // Check that ranges are sorted in ascending order - TVector<TCell> prevKey; - if (srcPartitionIdx != 0) { - // Take the end of previous shard - TSerializedCellVec key(tableInfo->GetPartitions()[srcPartitionIdx-1].EndOfRange); - prevKey.assign(key.GetCells().begin(), key.GetCells().end()); - } else { - // Or start from (NULL, NULL, .., NULL) - prevKey.resize(keyColTypeIds.size()); - } - TString firstRangeBegin = TSerializedCellVec::Serialize(prevKey); - srcRange->SetKeyRangeBegin(firstRangeBegin); - - for (ui32 i = 0; i < rangeEnds.size(); ++i) { - TSerializedCellVec key(rangeEnds[i]); - if (CompareBorders<true, true>(prevKey, key.GetCells(), true, true, keyColTypeIds) >= 0) { - errStr = Sprintf("Partition ranges are not sorted at index %u", i); - return false; - } - prevKey.assign(key.GetCells().begin(), key.GetCells().end()); - } - - // Allocate new datashard ids - TString rangeBegin = firstRangeBegin; - for (ui64 i = 0; i < count; ++i) { - const auto idx = context.SS->RegisterShardInfo(datashardInfo); - - TString rangeEnd = rangeEnds[i]; - TTxState::TShardOperation dstShardOp(idx, ETabletType::DataShard, TTxState::CreateParts); - dstShardOp.RangeEnd = rangeEnd; - op.Shards.push_back(dstShardOp); - - auto* dstRange = op.SplitDescription->AddDestinationRanges(); - dstRange->SetShardIdx(ui64(idx.GetLocalId())); - dstRange->SetKeyRangeBegin(rangeBegin); - dstRange->SetKeyRangeEnd(rangeEnd); - - rangeBegin = rangeEnd; - } - - return true; - } - - THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override { - const TTabletId ssId = context.SS->SelfTabletId(); - - const auto& info = Transaction.GetSplitMergeTablePartitions(); - const ui64 dstCount = info.SplitBoundarySize() + 1; - const ui64 srcCount = info.SourceTabletIdSize(); - - TPathId pathId = InvalidPathId; - if (info.HasTableOwnerId()) { - pathId = TPathId(TOwnerId(info.GetTableOwnerId()), - TLocalPathId(info.GetTableLocalId())); - } else if (info.HasTableLocalId()) { - pathId = context.SS->MakeLocalId(TLocalPathId(info.GetTableLocalId())); - } - - LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TSplitMerge Propose" - << ", tableStr: " << info.GetTablePath() - << ", tableId: " << pathId - << ", opId: " << OperationId - << ", at schemeshard: " << ssId); - - auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), ui64(ssId)); - TString errStr; - - if (!info.HasTablePath() && !info.HasTableLocalId()) { - errStr = "Neither table name nor pathId in SplitMergeInfo"; - result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); - return result; - } - - TPath path = pathId - ? TPath::Init(pathId, context.SS) - : TPath::Resolve(info.GetTablePath(), context.SS); - { - TPath::TChecker checks = path.Check(); - checks - .NotEmpty() - .NotUnderDomainUpgrade() - .IsAtLocalSchemeShard() - .IsResolved() - .NotDeleted() - .IsTable() - .NotUnderOperation(); - - if (checks) { - if (dstCount >= srcCount) { //allow over commit for merge - checks - .ShardsLimit(dstCount) - .PathShardsLimit(dstCount); - } - } - - if (!checks) { - TString explain = TStringBuilder() << "path fail checks" - << ", path: " << path.PathString(); - auto status = checks.GetStatus(&explain); - result->SetError(status, explain); - return result; - } - } - - if (!context.SS->CheckApplyIf(Transaction, errStr)) { - result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); - return result; - } - - if (!context.SS->CheckLocks(path.Base()->PathId, Transaction, errStr)) { - result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); - return result; - } - - Y_VERIFY(context.SS->Tables.contains(path.Base()->PathId)); - TTableInfo::TCPtr tableInfo = context.SS->Tables.at(path.Base()->PathId); - Y_VERIFY(tableInfo); - - if (tableInfo->IsBackup) { - TString errMsg = TStringBuilder() - << "cannot split/merge backup table " << info.GetTablePath(); - result->SetError(NKikimrScheme::StatusInvalidParameter, errMsg); - return result; - } - - const THashMap<TShardIdx, ui64>& shardIdx2partition = tableInfo->GetShard2PartitionIdx(); - - TVector<ui64> srcPartitionIdxs; - i64 totalSrcPartCount = 0; - for (ui32 si = 0; si < info.SourceTabletIdSize(); ++si) { - auto srcTabletId = TTabletId(info.GetSourceTabletId(si)); - auto srcShardIdx = context.SS->GetShardIdx(srcTabletId); - if (!srcShardIdx) { - TString errMsg = TStringBuilder() << "Unknown SourceTabletId: " << srcTabletId; - result->SetError(NKikimrScheme::StatusInvalidParameter, errMsg); - return result; - } - - if (!context.SS->ShardInfos.contains(srcShardIdx)) { - TString errMsg = TStringBuilder() - << "shard doesn't present at schemesahrd at all" - << ", tablet: " << srcTabletId - << ", srcShardIdx: " << srcShardIdx - << ", pathId: " << path.Base()->PathId; - result->SetError(NKikimrScheme::StatusInvalidParameter, errMsg); - return result; - } - - if (!shardIdx2partition.contains(srcShardIdx)) { - TString errMsg = TStringBuilder() - << "shard doesn't present at schemesahrd at table" - << ", tablet: " << srcTabletId - << ", srcShardIdx: " << srcShardIdx - << ", pathId: " << path.Base()->PathId; - result->SetError(NKikimrScheme::StatusInvalidParameter, errMsg); - return result; - } - - if (context.SS->ShardInfos.FindPtr(srcShardIdx)->PathId != path.Base()->PathId || !shardIdx2partition.contains(srcShardIdx)) { - TString errMsg = TStringBuilder() << "TabletId " << srcTabletId << " is not a partition of table " << info.GetTablePath(); - result->SetError(NKikimrScheme::StatusInvalidParameter, errMsg); - return result; - } - - - if (context.SS->ShardIsUnderSplitMergeOp(srcShardIdx)) { - TString errMsg = TStringBuilder() << "TabletId " << srcTabletId << " is already in process of split"; - result->SetError(NKikimrScheme::StatusMultipleModifications, errMsg); - return result; - } - - if (context.SS->SplitSettings.SplitMergePartCountLimit != -1) { - const auto* stats = tableInfo->GetStats().PartitionStats.FindPtr(srcShardIdx); - if (!stats || stats->ShardState != NKikimrTxDataShard::Ready) { - TString errMsg = TStringBuilder() << "Src TabletId " << srcTabletId << " is not in Ready state"; - result->SetError(NKikimrScheme::StatusNotAvailable, errMsg); - return result; - } - - totalSrcPartCount += stats->PartCount; - } - - auto pi = shardIdx2partition.at(srcShardIdx); - Y_VERIFY_S(pi < tableInfo->GetPartitions().size(), "pi: " << pi << " partitions.size: " << tableInfo->GetPartitions().size()); - srcPartitionIdxs.push_back(pi); - } - - if (context.SS->SplitSettings.SplitMergePartCountLimit != -1 && - totalSrcPartCount >= context.SS->SplitSettings.SplitMergePartCountLimit) - { - result->SetError(NKikimrScheme::StatusNotAvailable, - Sprintf("Split/Merge operation involves too many parts: %" PRIu64, totalSrcPartCount)); - - LOG_CRIT_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Cannot start split/merge operation" - << " for table \"" << info.GetTablePath() << "\" (id " << path.Base()->PathId << ")" - << " at tablet " << context.SS->TabletID() - << " because the operation involves too many parts: " << totalSrcPartCount); - return result; - } - - if (srcPartitionIdxs.empty()) { - result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder() << "No source partitions specified for split/merge TxId " << OperationId.GetTxId()); - return result; - } - - TChannelsBindings channelsBinding; - - bool storePerShardConfig = false; - NKikimrSchemeOp::TPartitionConfig perShardConfig; - - if (context.SS->IsStorageConfigLogic(tableInfo)) { - TVector<TStorageRoom> storageRooms; - THashMap<ui32, ui32> familyRooms; - storageRooms.emplace_back(0); - - if (!context.SS->GetBindingsRooms(path.DomainId(), tableInfo->PartitionConfig(), storageRooms, familyRooms, channelsBinding, errStr)) { - errStr = TString("database doesn't have required storage pools to create tablet with storage config, details: ") + errStr; - result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); - return result; - } - - storePerShardConfig = true; - for (const auto& room : storageRooms) { - perShardConfig.AddStorageRooms()->CopyFrom(room); - } - for (const auto& familyRoom : familyRooms) { - auto* protoFamily = perShardConfig.AddColumnFamilies(); - protoFamily->SetId(familyRoom.first); - protoFamily->SetRoom(familyRoom.second); - } - } else if (context.SS->IsCompatibleChannelProfileLogic(path.DomainId(), tableInfo)) { - if (!context.SS->GetChannelsBindings(path.DomainId(), tableInfo, channelsBinding, errStr)) { - result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); - return result; - } - } - - TTxState op; - - op.TxType = TTxState::TxSplitTablePartition; - op.TargetPathId = path.Base()->PathId; - op.State = TTxState::CreateParts; - - // Fill Src shards for tx - for (ui64 pi : srcPartitionIdxs) { - auto srcShardIdx = tableInfo->GetPartitions()[pi].ShardIdx; - op.Shards.emplace_back(srcShardIdx, ETabletType::DataShard, TTxState::TransferData); - } - - if (srcPartitionIdxs.size() == 1 && dstCount > 1) { - // This is Split operation, allocate new shards for split Dsts - if (!AllocateDstForSplit(info, OperationId.GetTxId(), path.Base()->PathId, srcPartitionIdxs[0], tableInfo, op, channelsBinding, errStr, context)) { - result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); - return result; - } - } else if (dstCount == 1 && srcPartitionIdxs.size() > 1) { - // This is merge, allocate 1 Dst shard - if (!AllocateDstForMerge(info, OperationId.GetTxId(), path.Base()->PathId, srcPartitionIdxs, tableInfo, op, channelsBinding, errStr, context)) { - result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); - return result; - } - } else { - result->SetError(NKikimrScheme::StatusInvalidParameter, "Invalid request: only 1->N or N->1 are supported"); - return result; - } - - /////////// - /// Accept operation - /// - - context.MemChanges.GrabNewTxState(context.SS, OperationId); - context.MemChanges.GrabDomain(context.SS, path.DomainId()); - context.MemChanges.GrabPath(context.SS, path->PathId); - context.MemChanges.GrabTable(context.SS, path->PathId); - - context.DbChanges.PersistTxState(OperationId); - for (const auto& shard : op.Shards) { - if (shard.Operation == TTxState::CreateParts) { - context.MemChanges.GrabNewShard(context.SS, shard.Idx); - } else { - context.MemChanges.GrabShard(context.SS, shard.Idx); - } - context.DbChanges.PersistShard(shard.Idx); - } - - TTableInfo::TPtr mutableTableInfo = context.SS->Tables.at(path->PathId); - - mutableTableInfo->RegisterSplitMegreOp(OperationId, op); - context.SS->CreateTx(OperationId, TTxState::TxSplitTablePartition, path->PathId) = op; - context.OnComplete.ActivateTx(OperationId); - - for (const auto& shard : op.Shards) { - Y_VERIFY(shard.Operation == TTxState::TransferData || shard.Operation == TTxState::CreateParts); - // Add new (DST) shards to the list of all shards and update LastTxId for the old (SRC) shards - Y_VERIFY(context.SS->ShardInfos.contains(shard.Idx)); - TShardInfo& shardInfo = context.SS->ShardInfos[shard.Idx]; - shardInfo.CurrentTxId = OperationId.GetTxId(); - - if (shard.Operation == TTxState::CreateParts) { - if (storePerShardConfig) { - mutableTableInfo->PerShardPartitionConfig[shard.Idx].CopyFrom(perShardConfig); - } - } - } - - path.DomainInfo()->AddInternalShards(op); //allow over commit for merge - path->IncShardsInside(dstCount); - - State = NextState(); - SetState(SelectStateFunc(State)); - return result; - } - - void AbortPropose(TOperationContext&) override { - Y_FAIL("no AbortPropose for TSplitMerge"); - } - - void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override { - LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TSplitMerge AbortUnsafe" - << ", opId: " << OperationId - << ", forceDropId: " << forceDropTxId - << ", at schemeshard: " << context.SS->TabletID()); - - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState); - - TPathId pathId = txState->TargetPathId; - Y_VERIFY(context.SS->PathsById.contains(pathId)); - TPathElement::TPtr path = context.SS->PathsById.at(pathId); - Y_VERIFY(path); - - Y_VERIFY(context.SS->Tables.contains(pathId)); - TTableInfo::TPtr tableInfo = context.SS->Tables.at(pathId); - Y_VERIFY(tableInfo); - tableInfo->AbortSplitMergeOp(OperationId); - - context.OnComplete.DoneOperation(OperationId); - } -}; - -} - -namespace NKikimr { -namespace NSchemeShard { - -ISubOperationBase::TPtr CreateSplitMerge(TOperationId id, const TTxTransaction& tx) { - return new TSplitMerge(id, tx); -} - -ISubOperationBase::TPtr CreateSplitMerge(TOperationId id, TTxState::ETxState state) { - Y_VERIFY(state != TTxState::Invalid); - return new TSplitMerge(id, state); -} - -} -} diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 8059d196d7f..ccaf7805d0f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -5650,7 +5650,7 @@ void TSchemeShard::FillTableDescriptionForShardIdx( const TTableInfo::TPtr tinfo = Tables.at(tableId); TPathElement::TPtr pinfo = *PathsById.FindPtr(tableId); - TVector<ui32> keyColumnIds = tinfo->FillDescription(pinfo); + TVector<ui32> keyColumnIds = tinfo->FillDescriptionCache(pinfo); if (!tinfo->TableDescription.HasPath()) { tinfo->TableDescription.SetPath(PathToString(pinfo)); } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 67c209da1e1..b5a9000841d 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -276,7 +276,15 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData( return alterData; } -TVector<ui32> TTableInfo::FillDescription(TPathElement::TPtr pathInfo) { +void TTableInfo::ResetDescriptionCache() { + TableDescription.ClearId_Deprecated(); + TableDescription.ClearPathId(); + TableDescription.ClearName(); + TableDescription.ClearColumns(); + TableDescription.ClearKeyColumnIds(); +} + +TVector<ui32> TTableInfo::FillDescriptionCache(TPathElement::TPtr pathInfo) { Y_VERIFY(pathInfo && pathInfo->IsTable()); TVector<ui32> keyColumnIds; @@ -1197,11 +1205,7 @@ void TTableInfo::FinishAlter() { } // Force FillDescription to regenerate TableDescription - TableDescription.ClearId_Deprecated(); - TableDescription.ClearPathId(); - TableDescription.ClearName(); - TableDescription.ClearColumns(); - TableDescription.ClearKeyColumnIds(); + ResetDescriptionCache(); AlterData.Reset(); } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 9175c1b86e3..cae07292f81 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -479,7 +479,8 @@ public: } } - TVector<ui32> FillDescription(TPathElement::TPtr pathInfo); + void ResetDescriptionCache(); + TVector<ui32> FillDescriptionCache(TPathElement::TPtr pathInfo); void SetRoom(const TStorageRoom& room) { // WARNING: this is legacy support code diff --git a/ydb/services/ydb/ydb_table_split_ut.cpp b/ydb/services/ydb/ydb_table_split_ut.cpp index f492820f7a1..401cc36dd2d 100644 --- a/ydb/services/ydb/ydb_table_split_ut.cpp +++ b/ydb/services/ydb/ydb_table_split_ut.cpp @@ -1,6 +1,7 @@ #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/public/sdk/cpp/client/ydb_params/params.h> #include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h> +#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> #include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/client/flat_ut_client.h> @@ -419,4 +420,176 @@ Y_UNIT_TEST_SUITE(YdbTableSplit) { } UNIT_ASSERT_C(shardsAfter < shardsBefore, "Merge didn't happen!!11 O_O"); } + + Y_UNIT_TEST(RenameTablesAndSplit) { + // KIKIMR-14636 + + NDataShard::gDbStatsReportInterval = TDuration::Seconds(2); + NDataShard::gDbStatsDataSizeResolution = 10; + NDataShard::gDbStatsRowCountResolution = 10; + + TIntrusivePtr<ITimeProvider> originalTimeProvider = NKikimr::TAppData::TimeProvider; + TIntrusivePtr<TTestTimeProvider> testTimeProvider = new TTestTimeProvider(originalTimeProvider); + NKikimr::TAppData::TimeProvider = testTimeProvider; + + TKikimrWithGrpcAndRootSchemaNoSystemViews server; + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_NOTICE); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_NOTICE); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_NOTICE); + + auto connection = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); + + NYdb::NTable::TTableClient client(connection); + + auto sessionResult = client.CreateSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session = sessionResult.GetSession(); + + { + auto query = TStringBuilder() << R"( + --!syntax_v1 + CREATE TABLE `/Root/Foo` ( + NameHash Uint32, + Name Utf8, + Version Uint32, + `Timestamp` Int64, + Data String, + PRIMARY KEY (NameHash, Name) + ) WITH ( UNIFORM_PARTITIONS = 2 );)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + + Cerr << result.GetIssues().ToString(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + { // prepare for split + auto query = TStringBuilder() << R"( + --!syntax_v1 + ALTER TABLE `/Root/Foo` + SET ( + AUTO_PARTITIONING_BY_SIZE = ENABLED, + AUTO_PARTITIONING_PARTITION_SIZE_MB = 1, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1, + AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 10 + );)"; + + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + + Cerr << result.GetIssues().ToString(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + ui64 partitions = 2; + do { // wait until merge + Cerr << "Fast forward 1m" << Endl; + testTimeProvider->AddShift(TDuration::Minutes(2)); + Sleep(TDuration::Seconds(3)); + + auto result = session.DescribeTable("/Root/Foo", NYdb::NTable::TDescribeTableSettings().WithTableStatistics(true)).ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + partitions = result.GetTableDescription().GetPartitionsCount(); + Cerr << "partitions " << partitions << Endl; + + } while (partitions == 2); + + { //rename + auto result = session.RenameTables({{"/Root/Foo", "/Root/Bar"}}).ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + { // add data for triger split + int key = 0; + for (int i = 0 ; i < 100; ++i) { + TValueBuilder rows; + rows.BeginList(); + for (int j = 0; j < 500; ++j) { + key += 1; + TString name = "key " + ToString(key); + + rows.AddListItem() + .BeginStruct() + .AddMember("NameHash").Uint32(MurmurHash<ui32>(name.data(), name.size())) + .AddMember("Name").Utf8(name) + .AddMember("Version").Uint32(key%5) + .AddMember("Timestamp").Int64(key%10) + .EndStruct(); + } + rows.EndList(); + + auto result = client.BulkUpsert("/Root/Bar", rows.Build()).ExtractValueSync(); + + if (!result.IsSuccess() && result.GetStatus() != NYdb::EStatus::OVERLOADED) { + TString err = result.GetIssues().ToString(); + Cerr << result.GetStatus() << ": " << err << Endl; + } + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + } + } + + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_DEBUG); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG); + + partitions = 1; + do { // wait until split + Cerr << "Fast forward 1m" << Endl; + testTimeProvider->AddShift(TDuration::Minutes(1)); + Sleep(TDuration::Seconds(3)); + + auto result = session.DescribeTable("/Root/Bar", NYdb::NTable::TDescribeTableSettings().WithTableStatistics(true)).ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + partitions = result.GetTableDescription().GetPartitionsCount(); + Cerr << "partitions " << partitions << Endl; + + } while (partitions == 1); + + + { // fail if shema has been broken + TString readQuery = + "SELECT * FROM `/Root/Bar`;"; + + TExecDataQuerySettings querySettings; + querySettings.KeepInQueryCache(true); + + auto result = session.ExecuteDataQuery( + readQuery, + TTxControl::BeginTx().CommitTx(), + querySettings) + .ExtractValueSync(); + + if (!result.IsSuccess() && result.GetStatus() != NYdb::EStatus::OVERLOADED) { + TString err = result.GetIssues().ToString(); + Cerr << result.GetStatus() << ": " << err << Endl; + } + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + { + auto asyncDescDir = NYdb::NScheme::TSchemeClient(connection).ListDirectory("/Root"); + asyncDescDir.Wait(); + const auto& val = asyncDescDir.GetValue(); + auto entry = val.GetEntry(); + UNIT_ASSERT_EQUAL(entry.Name, "Root"); + UNIT_ASSERT_EQUAL(entry.Type, NYdb::NScheme::ESchemeEntryType::Directory); + + auto children = val.GetChildren(); + UNIT_ASSERT_EQUAL_C(children.size(), 1, children.size()); + for (const auto& child: children) { + UNIT_ASSERT_EQUAL(child.Type, NYdb::NScheme::ESchemeEntryType::Table); + + auto result = session.DropTable(TStringBuilder() << "Root" << "/" << child.Name).ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetStatus()); + } + } + } } |