diff options
author | Ivan Nikolaev <ivannik@ydb.tech> | 2025-07-21 22:05:42 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-07-21 22:05:42 +0300 |
commit | b7ae5fa8996f021abbd45793ad8d367e1af8d1cd (patch) | |
tree | 922ab44e23bcc08fc9f8564cd63eebbd158a823b | |
parent | 5a230bc0247cccf9f1fb56bdb7d6168eacbdd8d0 (diff) | |
download | ydb-b7ae5fa8996f021abbd45793ad8d367e1af8d1cd.tar.gz |
SchemeShard: rename DataErasure to Shred (#20865)
31 files changed, 1224 insertions, 1221 deletions
diff --git a/ydb/core/base/appdata.cpp b/ydb/core/base/appdata.cpp index ec383ce0d27..d5a9a0686e8 100644 --- a/ydb/core/base/appdata.cpp +++ b/ydb/core/base/appdata.cpp @@ -71,7 +71,7 @@ struct TAppData::TImpl { NKikimrConfig::TMemoryControllerConfig MemoryControllerConfig; NKikimrReplication::TReplicationDefaults ReplicationConfig; NKikimrProto::TDataIntegrityTrailsConfig DataIntegrityTrailsConfig; - NKikimrConfig::TDataErasureConfig DataErasureConfig; + NKikimrConfig::TDataErasureConfig ShredConfig; NKikimrConfig::THealthCheckConfig HealthCheckConfig; NKikimrConfig::TWorkloadManagerConfig WorkloadManagerConfig; NKikimrConfig::TQueryServiceConfig QueryServiceConfig; @@ -133,7 +133,7 @@ TAppData::TAppData( , MemoryControllerConfig(Impl->MemoryControllerConfig) , ReplicationConfig(Impl->ReplicationConfig) , DataIntegrityTrailsConfig(Impl->DataIntegrityTrailsConfig) - , DataErasureConfig(Impl->DataErasureConfig) + , ShredConfig(Impl->ShredConfig) , HealthCheckConfig(Impl->HealthCheckConfig) , WorkloadManagerConfig(Impl->WorkloadManagerConfig) , QueryServiceConfig(Impl->QueryServiceConfig) diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index 0438e52b3ee..59293f9a848 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -256,7 +256,7 @@ struct TAppData { NKikimrConfig::TMemoryControllerConfig& MemoryControllerConfig; NKikimrReplication::TReplicationDefaults& ReplicationConfig; NKikimrProto::TDataIntegrityTrailsConfig& DataIntegrityTrailsConfig; - NKikimrConfig::TDataErasureConfig& DataErasureConfig; + NKikimrConfig::TDataErasureConfig& ShredConfig; NKikimrConfig::THealthCheckConfig& HealthCheckConfig; NKikimrConfig::TWorkloadManagerConfig& WorkloadManagerConfig; NKikimrConfig::TQueryServiceConfig& QueryServiceConfig; diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 458bf7fffc2..5e316519cf4 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -260,7 +260,7 @@ public: appData->CompactionConfig = Config.GetCompactionConfig(); appData->BackgroundCleaningConfig = Config.GetBackgroundCleaningConfig(); - appData->DataErasureConfig = Config.GetDataErasureConfig(); + appData->ShredConfig = Config.GetDataErasureConfig(); } }; diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 05bdbb453e8..e6e5b5209b3 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -242,11 +242,11 @@ enum ESimpleCounters { COUNTER_SHARDS_QUOTA = 190 [(CounterOpts) = {Name: "ShardsQuota"}]; COUNTER_SHARDS = 191 [(CounterOpts) = {Name: "Shards"}]; - COUNTER_DATA_ERASURE_QUEUE_SIZE = 192 [(CounterOpts) = {Name: "DataErasureQueueSize"}]; - COUNTER_DATA_ERASURE_QUEUE_RUNNING = 193 [(CounterOpts) = {Name: "DataErasureQueueRunning"}]; + COUNTER_SHRED_QUEUE_SIZE = 192 [(CounterOpts) = {Name: "ShredQueueSize"}]; + COUNTER_SHRED_QUEUE_RUNNING = 193 [(CounterOpts) = {Name: "ShredQueueRunning"}]; - COUNTER_TENANT_DATA_ERASURE_QUEUE_SIZE = 194 [(CounterOpts) = {Name: "TenantDataErasureQueueSize"}]; - COUNTER_TENANT_DATA_ERASURE_QUEUE_RUNNING = 195 [(CounterOpts) = {Name: "TenantDataErasureQueueRunning"}]; + COUNTER_TENANT_SHRED_QUEUE_SIZE = 194 [(CounterOpts) = {Name: "TenantShredQueueSize"}]; + COUNTER_TENANT_SHRED_QUEUE_RUNNING = 195 [(CounterOpts) = {Name: "TenantShredQueueRunning"}]; COUNTER_SYS_VIEW_COUNT = 196 [(CounterOpts) = {Name: "SysViewCount"}]; COUNTER_IN_FLIGHT_OPS_TxCreateSysView = 197 [(CounterOpts) = {Name: "InFlightOps/CreateSysView"}]; @@ -402,11 +402,11 @@ enum ECumulativeCounters { COUNTER_FINISHED_OPS_TxDropTransfer = 113 [(CounterOpts) = {Name: "FinishedOps/DropTransfer"}]; COUNTER_FINISHED_OPS_TxDropTransferCascade = 114 [(CounterOpts) = {Name: "FinishedOps/DropTransferCascade"}]; - COUNTER_DATA_ERASURE_OK = 115 [(CounterOpts) = {Name: "DataErasureOK"}]; - COUNTER_DATA_ERASURE_TIMEOUT = 116 [(CounterOpts) = {Name: "DataErasureTimeout"}]; + COUNTER_SHRED_OK = 115 [(CounterOpts) = {Name: "ShredOK"}]; + COUNTER_SHRED_TIMEOUT = 116 [(CounterOpts) = {Name: "ShredTimeout"}]; - COUNTER_TENANT_DATA_ERASURE_OK = 117 [(CounterOpts) = {Name: "TenantDataErasureOK"}]; - COUNTER_TENANT_DATA_ERASURE_TIMEOUT = 118 [(CounterOpts) = {Name: "TenantDataErasureTimeout"}]; + COUNTER_TENANT_SHRED_OK = 117 [(CounterOpts) = {Name: "TenantShredOK"}]; + COUNTER_TENANT_SHRED_TIMEOUT = 118 [(CounterOpts) = {Name: "TenantShredTimeout"}]; COUNTER_FINISHED_OPS_TxCreateSysView = 119 [(CounterOpts) = {Name: "FinishedOps/CreateSysView"}]; COUNTER_FINISHED_OPS_TxDropSysView = 120 [(CounterOpts) = {Name: "FinishedOps/DropSysView"}]; @@ -647,16 +647,16 @@ enum ETxTypes { TXTYPE_LIST_USERS = 90 [(TxTypeOpts) = {Name: "TxListUsers"}]; TXTYPE_UNMARK_RESTORE_TABLES = 91 [(TxTypeOpts) = {Name: "TxUnmarkRestoreTables"}]; - TXTYPE_DATA_ERASURE_INIT = 92 [(TxTypeOpts) = {Name: "TxDataErasureInit"}]; - TXTYPE_RUN_DATA_ERASURE = 93 [(TxTypeOpts) = {Name: "TxRunDataErasure"}]; - TXTYPE_RUN_DATA_ERASURE_TENANT = 94 [(TxTypeOpts) = {Name: "TxRunTenantDataErasure"}]; + TXTYPE_SHRED_INIT = 92 [(TxTypeOpts) = {Name: "TxShredInit"}]; + TXTYPE_RUN_SHRED = 93 [(TxTypeOpts) = {Name: "TxRunShred"}]; + TXTYPE_RUN_SHRED_TENANT = 94 [(TxTypeOpts) = {Name: "TxRunTenantShred"}]; - TXTYPE_COMPLETE_DATA_ERASURE_SHARD = 95 [(TxTypeOpts) = {Name: "TxCompleteDataErasureShard"}]; - TXTYPE_COMPLETE_DATA_ERASURE_TENANT = 96 [(TxTypeOpts) = {Name: "TxCompleteDataErasureTenant"}]; - TXTYPE_COMPLETE_DATA_ERASURE_BSC = 97 [(TxTypeOpts) = {Name: "TxCompleteDataErasureBSC"}]; + TXTYPE_COMPLETE_SHRED_SHARD = 95 [(TxTypeOpts) = {Name: "TxCompleteShredShard"}]; + TXTYPE_COMPLETE_SHRED_TENANT = 96 [(TxTypeOpts) = {Name: "TxCompleteShredTenant"}]; + TXTYPE_COMPLETE_SHRED_BSC = 97 [(TxTypeOpts) = {Name: "TxCompleteShredBSC"}]; - TXTYPE_ADD_SHARDS_DATA_ERASURE = 98 [(TxTypeOpts) = {Name: "TxAddShardsDataErasure"}]; - TXTYPE_CANCEL_SHARDS_DATA_ERASURE = 99 [(TxTypeOpts) = {Name: "TxCancelShardsDataErasure"}]; + TXTYPE_ADD_SHARDS_SHRED = 98 [(TxTypeOpts) = {Name: "TxAddShardsShred"}]; + TXTYPE_CANCEL_SHARDS_SHRED = 99 [(TxTypeOpts) = {Name: "TxCancelShardsShred"}]; TXTYPE_LOGIN_FINALIZE = 100 [(TxTypeOpts) = {Name: "TxLoginFinalize"}]; diff --git a/ydb/core/protos/flat_tx_scheme.proto b/ydb/core/protos/flat_tx_scheme.proto index 7bd6574115f..7b1ff94e729 100644 --- a/ydb/core/protos/flat_tx_scheme.proto +++ b/ydb/core/protos/flat_tx_scheme.proto @@ -417,11 +417,11 @@ message TEvFindTabletSubDomainPathIdResult { message TEvOwnerActorAck { } -message TEvTenantDataErasureRequest { +message TEvTenantShredRequest { optional uint64 Generation = 1; } -message TEvTenantDataErasureResponse { +message TEvTenantShredResponse { enum EStatus { UNSPECIFIED = 0; COMPLETED = 1; @@ -433,10 +433,10 @@ message TEvTenantDataErasureResponse { optional EStatus Status = 3; } -message TEvDataErasureInfoRequest { +message TEvShredInfoRequest { } -message TEvDataErasureInfoResponse { +message TEvShredInfoResponse { enum EStatus { UNSPECIFIED = 0; COMPLETED = 1; @@ -448,5 +448,5 @@ message TEvDataErasureInfoResponse { optional uint64 Generation = 2; } -message TEvDataErasureManualStartupRequest { +message TEvShredManualStartupRequest { } diff --git a/ydb/core/tx/schemeshard/schemeshard.h b/ydb/core/tx/schemeshard/schemeshard.h index af4a80f7329..25668b520d2 100644 --- a/ydb/core/tx/schemeshard/schemeshard.h +++ b/ydb/core/tx/schemeshard/schemeshard.h @@ -5,6 +5,7 @@ #include <ydb/core/base/path.h> #include <ydb/core/base/storage_pools.h> #include <ydb/core/base/subdomain.h> +#include <ydb/core/protos/config.pb.h> #include <ydb/core/protos/flat_tx_scheme.pb.h> #include <ydb/core/protos/tx_scheme.pb.h> #include <ydb/core/scheme/scheme_tablecell.h> @@ -98,15 +99,15 @@ namespace TEvSchemeShard { EvListUsers, EvListUsersResult, - EvTenantDataErasureRequest, - EvTenantDataErasureResponse, - EvWakeupToRunDataErasure, - EvMeasureDataErasureBSC, - EvWakeupToRunDataErasureBSC, - EvCompleteDataErasure, - EvDataErasureInfoRequest, - EvDataErasureInfoResponse, - EvDataErasureManualStartupRequest, + EvTenantShredRequest, + EvTenantShredResponse, + EvWakeupToRunShred, + EvMeasureShredBSC, + EvWakeupToRunShredBSC, + EvCompleteShred, + EvShredInfoRequest, + EvShredInfoResponse, + EvShredManualStartupRequest, EvEnd }; @@ -410,10 +411,10 @@ namespace TEvSchemeShard { struct TEvWakeupToMeasureSelfResponseTime : public TEventLocal<TEvWakeupToMeasureSelfResponseTime, EvWakeupToMeasureSelfResponseTime> { }; - struct TEvWakeupToRunDataErasure : public TEventLocal<TEvWakeupToRunDataErasure, EvWakeupToRunDataErasure> { + struct TEvWakeupToRunShred : public TEventLocal<TEvWakeupToRunShred, EvWakeupToRunShred> { }; - struct TEvWakeupToRunDataErasureBSC : public TEventLocal<TEvWakeupToRunDataErasureBSC, EvWakeupToRunDataErasureBSC> { + struct TEvWakeupToRunShredBSC : public TEventLocal<TEvWakeupToRunShredBSC, EvWakeupToRunShredBSC> { }; struct TEvInitTenantSchemeShard: public TEventPB<TEvInitTenantSchemeShard, @@ -689,41 +690,41 @@ namespace TEvSchemeShard { TEvListUsersResult() = default; }; - struct TEvTenantDataErasureRequest : TEventPB<TEvTenantDataErasureRequest, NKikimrScheme::TEvTenantDataErasureRequest, EvTenantDataErasureRequest> { - TEvTenantDataErasureRequest() = default; + struct TEvTenantShredRequest : TEventPB<TEvTenantShredRequest, NKikimrScheme::TEvTenantShredRequest, EvTenantShredRequest> { + TEvTenantShredRequest() = default; - TEvTenantDataErasureRequest(ui64 generation) { + TEvTenantShredRequest(ui64 generation) { Record.SetGeneration(generation); } }; - struct TEvTenantDataErasureResponse : TEventPB<TEvTenantDataErasureResponse, NKikimrScheme::TEvTenantDataErasureResponse, EvTenantDataErasureResponse> { + struct TEvTenantShredResponse : TEventPB<TEvTenantShredResponse, NKikimrScheme::TEvTenantShredResponse, EvTenantShredResponse> { - TEvTenantDataErasureResponse() = default; - TEvTenantDataErasureResponse(const TPathId& pathId, ui64 generation, const NKikimrScheme::TEvTenantDataErasureResponse::EStatus& status) { + TEvTenantShredResponse() = default; + TEvTenantShredResponse(const TPathId& pathId, ui64 generation, const NKikimrScheme::TEvTenantShredResponse::EStatus& status) { Record.MutablePathId()->SetOwnerId(pathId.OwnerId); Record.MutablePathId()->SetLocalId(pathId.LocalPathId); Record.SetGeneration(generation); Record.SetStatus(status); } - TEvTenantDataErasureResponse(ui64 ownerId, ui64 localPathId, ui64 generation, const NKikimrScheme::TEvTenantDataErasureResponse::EStatus& status) - : TEvTenantDataErasureResponse(TPathId(ownerId, localPathId), generation, status) + TEvTenantShredResponse(ui64 ownerId, ui64 localPathId, ui64 generation, const NKikimrScheme::TEvTenantShredResponse::EStatus& status) + : TEvTenantShredResponse(TPathId(ownerId, localPathId), generation, status) {} }; - struct TEvDataErasureInfoRequest : TEventPB<TEvDataErasureInfoRequest, NKikimrScheme::TEvDataErasureInfoRequest, EvDataErasureInfoRequest> {}; + struct TEvShredInfoRequest : TEventPB<TEvShredInfoRequest, NKikimrScheme::TEvShredInfoRequest, EvShredInfoRequest> {}; - struct TEvDataErasureInfoResponse : TEventPB<TEvDataErasureInfoResponse, NKikimrScheme::TEvDataErasureInfoResponse, EvDataErasureInfoResponse> { + struct TEvShredInfoResponse : TEventPB<TEvShredInfoResponse, NKikimrScheme::TEvShredInfoResponse, EvShredInfoResponse> { - TEvDataErasureInfoResponse() = default; - TEvDataErasureInfoResponse(ui64 generation, const NKikimrScheme::TEvDataErasureInfoResponse::EStatus& status) { + TEvShredInfoResponse() = default; + TEvShredInfoResponse(ui64 generation, const NKikimrScheme::TEvShredInfoResponse::EStatus& status) { Record.SetGeneration(generation); Record.SetStatus(status); } }; - struct TEvDataErasureManualStartupRequest : TEventPB<TEvDataErasureManualStartupRequest, NKikimrScheme::TEvDataErasureManualStartupRequest, EvDataErasureManualStartupRequest> {}; + struct TEvShredManualStartupRequest : TEventPB<TEvShredManualStartupRequest, NKikimrScheme::TEvShredManualStartupRequest, EvShredManualStartupRequest> {}; }; } diff --git a/ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.cpp b/ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.cpp deleted file mode 100644 index 0367387f7d4..00000000000 --- a/ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.cpp +++ /dev/null @@ -1,46 +0,0 @@ -#include "schemeshard__data_erasure_manager.h" - -namespace NKikimr::NSchemeShard { - -TDataErasureManager::TDataErasureManager(TSchemeShard* const schemeShard) - : SchemeShard(schemeShard) -{} - -EDataErasureStatus TDataErasureManager::GetStatus() const { - return Status; -} - -void TDataErasureManager::SetStatus(const EDataErasureStatus& status) { - Status = status; -} - -void TDataErasureManager::IncGeneration() { - ++Generation; -} - -void TDataErasureManager::SetGeneration(ui64 generation) { - Generation = generation; -} - -ui64 TDataErasureManager::GetGeneration() const { - return Generation; -} - -void TDataErasureManager::Clear() { - ClearOperationQueue(); - ClearWaitingDataErasureRequests(); -} - -void TDataErasureManager::Start() { - Running = true; -} - -void TDataErasureManager::Stop() { - Running = false; -} - -bool TDataErasureManager::IsRunning() const { - return Running; -} - -} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp index c19f0332fe4..ae0939c37b5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp @@ -2,7 +2,7 @@ #include <ydb/core/tablet/tablet_exception.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> -#include <ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.h> +#include <ydb/core/tx/schemeshard/schemeshard__shred_manager.h> namespace NKikimr { namespace NSchemeShard { @@ -180,8 +180,8 @@ struct TSchemeShard::TTxDeleteTabletReply : public TSchemeShard::TRwTxBase { "Close pipe to deleted shardIdx " << ShardIdx << " tabletId " << TabletId); Self->PipeClientCache->ForceClose(ctx, ui64(TabletId)); } - if (Self->EnableDataErasure && Self->DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) { - Self->Execute(Self->CreateTxCancelDataErasureShards({ShardIdx})); + if (Self->EnableShred && Self->ShredManager->GetStatus() == EShredStatus::IN_PROGRESS) { + Self->Execute(Self->CreateTxCancelShredShards({ShardIdx})); } } } diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index f9dc5eb3ac7..c0956dcf269 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -1,4 +1,4 @@ -#include "schemeshard__data_erasure_manager.h" +#include "schemeshard__shred_manager.h" #include "schemeshard_impl.h" #include "schemeshard_utils.h" // for PQGroupReserve @@ -1884,9 +1884,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { } - // Read Running data erasure for tenants + // Read Running shred for tenants { - if (!Self->DataErasureManager->Restore(db)) { + if (!Self->ShredManager->Restore(db)) { return false; } } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index 899dd3985b1..8c5c3321110 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -1,6 +1,6 @@ #include "schemeshard__operation_common.h" -#include "schemeshard__data_erasure_manager.h" +#include "schemeshard__shred_manager.h" #include <ydb/core/blob_depot/events.h> #include <ydb/core/blockstore/core/blockstore.h> @@ -973,8 +973,8 @@ void UpdatePartitioningForCopyTable(TOperationId operationId, TTxState &txState, newShardsIdx.push_back(part.ShardIdx); } context.SS->SetPartitioning(txState.TargetPathId, dstTableInfo, std::move(newPartitioning)); - if (context.SS->EnableDataErasure && context.SS->DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) { - context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvAddNewShardToDataErasure(std::move(newShardsIdx))); + if (context.SS->EnableShred && context.SS->ShredManager->GetStatus() == EShredStatus::IN_PROGRESS) { + context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvAddNewShardToShred(std::move(newShardsIdx))); } ui32 newShardCout = dstTableInfo->GetPartitions().size(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp index 3ce2b634d56..2d5a228d0f1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp @@ -1,4 +1,4 @@ -#include "schemeshard__data_erasure_manager.h" +#include "schemeshard__shred_manager.h" #include "schemeshard__operation_common.h" #include "schemeshard__operation_part.h" #include "schemeshard__operation_states.h" @@ -641,8 +641,8 @@ public: newShardsIdx.push_back(part.ShardIdx); } context.SS->SetPartitioning(newTable->PathId, tableInfo, std::move(newPartition)); - if (context.SS->EnableDataErasure && context.SS->DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) { - context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvAddNewShardToDataErasure(std::move(newShardsIdx))); + if (context.SS->EnableShred && context.SS->ShredManager->GetStatus() == EShredStatus::IN_PROGRESS) { + context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvAddNewShardToShred(std::move(newShardsIdx))); } for (const auto& shard : tableInfo->GetPartitions()) { Y_ABORT_UNLESS(context.SS->ShardInfos.contains(shard.ShardIdx), "shard info is set before"); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp index 50d392b3093..f471a0a5e58 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp @@ -1,4 +1,4 @@ -#include "schemeshard__data_erasure_manager.h" +#include "schemeshard__shred_manager.h" #include "schemeshard__operation_common.h" #include "schemeshard__operation_part.h" #include "schemeshard_impl.h" @@ -296,8 +296,8 @@ public: // Delete the whole old partitioning and persist the whole new partitioning as the indexes have changed context.SS->PersistTablePartitioningDeletion(db, tableId, tableInfo); context.SS->SetPartitioning(tableId, tableInfo, std::move(newPartitioning)); - if (context.SS->EnableDataErasure && context.SS->DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS) { - context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvAddNewShardToDataErasure(std::move(newShardsIdx))); + if (context.SS->EnableShred && context.SS->ShredManager->GetStatus() == EShredStatus::IN_PROGRESS) { + context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvAddNewShardToShred(std::move(newShardsIdx))); } context.SS->PersistTablePartitioning(db, tableId, tableInfo); context.SS->PersistTablePartitionStats(db, tableId, tableInfo); diff --git a/ydb/core/tx/schemeshard/schemeshard__root_data_erasure_manager.cpp b/ydb/core/tx/schemeshard/schemeshard__root_data_erasure_manager.cpp deleted file mode 100644 index 258bf928164..00000000000 --- a/ydb/core/tx/schemeshard/schemeshard__root_data_erasure_manager.cpp +++ /dev/null @@ -1,677 +0,0 @@ -#include "schemeshard__data_erasure_manager.h" - -#include <ydb/core/tx/schemeshard/schemeshard_impl.h> - -namespace NKikimr::NSchemeShard { - -TRootDataErasureManager::TStarter::TStarter(TRootDataErasureManager* const manager) - : Manager(manager) -{} - -NOperationQueue::EStartStatus TRootDataErasureManager::TStarter::StartOperation(const TPathId& pathId) { - return Manager->StartDataErasure(pathId); -} - -void TRootDataErasureManager::TStarter::OnTimeout(const TPathId& pathId) { - Manager->OnTimeout(pathId); -} - -TRootDataErasureManager::TRootDataErasureManager(TSchemeShard* const schemeShard, const NKikimrConfig::TDataErasureConfig& config) - : TDataErasureManager(schemeShard) - , Starter(this) - , Queue(new TQueue(ConvertConfig(config), Starter)) - , DataErasureInterval(TDuration::Seconds(config.GetDataErasureIntervalSeconds())) - , DataErasureBSCInterval(TDuration::Seconds(config.GetBlobStorageControllerRequestIntervalSeconds())) - , CurrentWakeupInterval(DataErasureInterval) - , BSC(MakeBSControllerID()) - , IsManualStartup((DataErasureInterval.Seconds() == 0 ? true : false)) -{ - const auto ctx = SchemeShard->ActorContext(); - ctx.RegisterWithSameMailbox(Queue); - - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] Created: Timeout# " << config.GetTimeoutSeconds() - << ", Rate# " << Queue->GetRate() - << ", InflightLimit# " << config.GetInflightLimit() - << ", DataErasureInterval# " << DataErasureInterval - << ", DataErasureBSCInterval# " << DataErasureBSCInterval - << ", CurrentWakeupInterval# " << CurrentWakeupInterval - << ", IsManualStartup# " << (IsManualStartup ? "true" : "false")); -} - -void TRootDataErasureManager::UpdateConfig(const NKikimrConfig::TDataErasureConfig& config) { - TRootDataErasureManager::TQueue::TConfig queueConfig = ConvertConfig(config); - Queue->UpdateConfig(queueConfig); - DataErasureInterval = TDuration::Seconds(config.GetDataErasureIntervalSeconds()); - DataErasureBSCInterval = TDuration::Seconds(config.GetBlobStorageControllerRequestIntervalSeconds()); - CurrentWakeupInterval = DataErasureInterval; - BSC = TTabletId(MakeBSControllerID()); - IsManualStartup = (DataErasureInterval.Seconds() == 0 ? true : false); - - const auto ctx = SchemeShard->ActorContext(); - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] Config updated: Timeout# " << queueConfig.Timeout - << ", Rate# " << Queue->GetRate() - << ", InflightLimit# " << queueConfig.InflightLimit - << ", DataErasureInterval# " << DataErasureInterval - << ", DataErasureBSCInterval# " << DataErasureBSCInterval - << ", CurrentWakeupInterval# " << CurrentWakeupInterval - << ", IsManualStartup# " << (IsManualStartup ? "true" : "false")); -} - -void TRootDataErasureManager::Start() { - TDataErasureManager::Start(); - const auto ctx = SchemeShard->ActorContext(); - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] Start: Status# " << static_cast<ui32>(Status)); - - Queue->Start(); - if (Status == EDataErasureStatus::UNSPECIFIED) { - SchemeShard->MarkFirstRunRootDataErasureManager(); - ScheduleDataErasureWakeup(); - } else if (Status == EDataErasureStatus::COMPLETED) { - ScheduleDataErasureWakeup(); - } else { - ClearOperationQueue(); - Continue(); - } -} - -void TRootDataErasureManager::Stop() { - TDataErasureManager::Stop(); - const auto ctx = SchemeShard->ActorContext(); - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] Stop"); - - Queue->Stop(); -} - -void TRootDataErasureManager::ClearOperationQueue() { - const auto ctx = SchemeShard->ActorContext(); - LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] Clear operation queue and active pipes"); - - Queue->Clear(); - ActivePipes.clear(); -} - -void TRootDataErasureManager::ClearWaitingDataErasureRequests(NIceDb::TNiceDb& db) { - const auto ctx = SchemeShard->ActorContext(); - LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] Clear WaitingDataErasureTenants: Size# " << WaitingDataErasureTenants.size()); - - for (const auto& [pathId, status] : WaitingDataErasureTenants) { - db.Table<Schema::WaitingDataErasureTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Delete(); - } - ClearWaitingDataErasureRequests(); -} - -void TRootDataErasureManager::ClearWaitingDataErasureRequests() { - WaitingDataErasureTenants.clear(); -} - -void TRootDataErasureManager::Run(NIceDb::TNiceDb& db) { - CounterDataErasureOk = 0; - CounterDataErasureTimeout = 0; - Status = EDataErasureStatus::IN_PROGRESS; - StartTime = AppData(SchemeShard->ActorContext())->TimeProvider->Now(); - for (auto& [pathId, subdomain] : SchemeShard->SubDomains) { - auto path = TPath::Init(pathId, SchemeShard); - if (path->IsRoot()) { - continue; - } - if (subdomain->GetTenantSchemeShardID() == InvalidTabletId) { // no tenant schemeshard - continue; - } - Enqueue(pathId); - WaitingDataErasureTenants[pathId] = EDataErasureStatus::IN_PROGRESS; - db.Table<Schema::WaitingDataErasureTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Update<Schema::WaitingDataErasureTenants::Status>(WaitingDataErasureTenants[pathId]); - } - if (WaitingDataErasureTenants.empty()) { - Status = EDataErasureStatus::IN_PROGRESS_BSC; - } - db.Table<Schema::DataErasureGenerations>().Key(Generation).Update<Schema::DataErasureGenerations::Status, - Schema::DataErasureGenerations::StartTime>(Status, StartTime.MicroSeconds()); - - const auto ctx = SchemeShard->ActorContext(); - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] Run: Queue.Size# " << Queue->Size() - << ", WaitingDataErasureTenants.size# " << WaitingDataErasureTenants.size() - << ", Status# " << static_cast<ui32>(Status)); -} - -void TRootDataErasureManager::Continue() { - if (Status == EDataErasureStatus::IN_PROGRESS) { - for (const auto& [pathId, status] : WaitingDataErasureTenants) { - if (status == EDataErasureStatus::IN_PROGRESS) { - Enqueue(pathId); - } - } - } else if (Status == EDataErasureStatus::IN_PROGRESS_BSC) { - SendRequestToBSC(); - } - - const auto ctx = SchemeShard->ActorContext(); - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] Continue: Queue.Size# " << Queue->Size() - << ", Status# " << static_cast<ui32>(Status)); -} - -void TRootDataErasureManager::ScheduleDataErasureWakeup() { - if (IsManualStartup || IsDataErasureWakeupScheduled) { - return; - } - - const auto ctx = SchemeShard->ActorContext(); - ctx.Schedule(CurrentWakeupInterval, new TEvSchemeShard::TEvWakeupToRunDataErasure); - IsDataErasureWakeupScheduled = true; - - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] ScheduleDataErasureWakeup: Interval# " << CurrentWakeupInterval << ", Timestamp# " << AppData(ctx)->TimeProvider->Now()); -} - -void TRootDataErasureManager::WakeupToRunDataErasure(TEvSchemeShard::TEvWakeupToRunDataErasure::TPtr& ev, const NActors::TActorContext& ctx) { - Y_UNUSED(ev); - IsDataErasureWakeupScheduled = false; - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] WakeupToRunDataErasure: Timestamp# " << AppData(ctx)->TimeProvider->Now()); - SchemeShard->RunDataErasure(true); -} - -NOperationQueue::EStartStatus TRootDataErasureManager::StartDataErasure(const TPathId& pathId) { - UpdateMetrics(); - - auto ctx = SchemeShard->ActorContext(); - auto it = SchemeShard->SubDomains.find(pathId); - if (it == SchemeShard->SubDomains.end()) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Start] Failed to resolve subdomain info " - "for pathId# " << pathId - << " at schemeshard# " << SchemeShard->TabletID()); - - return NOperationQueue::EStartStatus::EOperationRemove; - } - - const auto& tenantSchemeShardId = it->second->GetTenantSchemeShardID(); - - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Start] Data erasure " - "for pathId# " << pathId - << ", tenant schemeshard# " << tenantSchemeShardId - << ", next wakeup# " << Queue->GetWakeupDelta() - << ", rate# " << Queue->GetRate() - << ", in queue# " << Queue->Size() << " tenants" - << ", running# " << Queue->RunningSize() << " tenants" - << " at schemeshard " << SchemeShard->TabletID()); - - std::unique_ptr<TEvSchemeShard::TEvTenantDataErasureRequest> request( - new TEvSchemeShard::TEvTenantDataErasureRequest(Generation)); - - ActivePipes[pathId] = SchemeShard->PipeClientCache->Send( - ctx, - ui64(tenantSchemeShardId), - request.release()); - - return NOperationQueue::EStartStatus::EOperationRunning; -} - -void TRootDataErasureManager::OnTimeout(const TPathId& pathId) { - CounterDataErasureTimeout++; - UpdateMetrics(); - - ActivePipes.erase(pathId); - - auto ctx = SchemeShard->ActorContext(); - if (!SchemeShard->SubDomains.contains(pathId)) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Timeout] Failed to resolve subdomain info " - "for path# " << pathId - << " at schemeshard# " << SchemeShard->TabletID()); - return; - } - - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Timeout] Data erasure timeouted " - "for pathId# " << pathId - << ", next wakeup in# " << Queue->GetWakeupDelta() - << ", rate# " << Queue->GetRate() - << ", in queue# " << Queue->Size() << " tenants" - << ", running# " << Queue->RunningSize() << " tenants" - << " at schemeshard " << SchemeShard->TabletID()); - - // retry - Enqueue(pathId); -} - -void TRootDataErasureManager::Enqueue(const TPathId& pathId) { - auto ctx = SchemeShard->ActorContext(); - - if (Queue->Enqueue(pathId)) { - LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] [Enqueue] Enqueued pathId# " << pathId << " at schemeshard " << SchemeShard->TabletID()); - UpdateMetrics(); - } else { - LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] [Enqueue] Skipped or already exists pathId# " << pathId << " at schemeshard " << SchemeShard->TabletID()); - } -} - -void TRootDataErasureManager::HandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) { - if (tabletId == BSC) { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] HandleDisconnect resend request to BSC at schemeshard " << SchemeShard->TabletID()); - - SendRequestToBSC(); - return; - } - - const auto shardIdx = SchemeShard->GetShardIdx(tabletId); - if (!SchemeShard->ShardInfos.contains(shardIdx)) { - return; - } - - const auto& pathId = SchemeShard->ShardInfos.at(shardIdx).PathId; - if (!SchemeShard->TTLEnabledTables.contains(pathId)) { - return; - } - - const auto it = ActivePipes.find(pathId); - if (it == ActivePipes.end()) { - return; - } - - if (it->second != clientId) { - return; - } - - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Disconnect] Data erasure disconnect " - "to tablet: " << tabletId - << ", at schemeshard: " << SchemeShard->TabletID()); - - ActivePipes.erase(pathId); - StartDataErasure(pathId); -} - -void TRootDataErasureManager::OnDone(const TPathId& pathId, NIceDb::TNiceDb& db) { - auto duration = Queue->OnDone(pathId); - - auto ctx = SchemeShard->ActorContext(); - if (!SchemeShard->SubDomains.contains(pathId)) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Finished] Failed to resolve subdomain info " - "for pathId# " << pathId - << " in# " << duration.MilliSeconds() << " ms" - << ", next wakeup in# " << Queue->GetWakeupDelta() - << ", rate# " << Queue->GetRate() - << ", in queue# " << Queue->Size() << " tenants" - << ", running# " << Queue->RunningSize() << " tenants" - << " at schemeshard " << SchemeShard->TabletID()); - } else { - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Finished] Data erasure completed " - "for pathId# " << pathId - << " in# " << duration.MilliSeconds() << " ms" - << ", next wakeup# " << Queue->GetWakeupDelta() - << ", rate# " << Queue->GetRate() - << ", in queue# " << Queue->Size() << " tenants" - << ", running# " << Queue->RunningSize() << " tenants" - << " at schemeshard " << SchemeShard->TabletID()); - } - - ActivePipes.erase(pathId); - auto it = WaitingDataErasureTenants.find(pathId); - if (it != WaitingDataErasureTenants.end()) { - db.Table<Schema::WaitingDataErasureTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Delete(); - WaitingDataErasureTenants.erase(it); - } - - CounterDataErasureOk++; - UpdateMetrics(); - - if (WaitingDataErasureTenants.empty()) { - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] Data erasure in tenants is completed. Send request to BS controller"); - Status = EDataErasureStatus::IN_PROGRESS_BSC; - db.Table<Schema::DataErasureGenerations>().Key(Generation).Update<Schema::DataErasureGenerations::Status>(Status); - } -} - -void TRootDataErasureManager::OnDone(const TTabletId&, NIceDb::TNiceDb&) { - auto ctx = SchemeShard->ActorContext(); - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [OnDone] Cannot execute in root schemeshard: " << SchemeShard->TabletID()); -} - -void TRootDataErasureManager::ScheduleRequestToBSC() { - if (IsRequestToBSCScheduled) { - return; - } - - auto ctx = SchemeShard->ActorContext(); - ctx.Schedule(DataErasureBSCInterval, new TEvSchemeShard::TEvWakeupToRunDataErasureBSC); - IsRequestToBSCScheduled = true; - - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] ScheduleRequestToBSC: Interval# " << DataErasureBSCInterval); -} - -void TRootDataErasureManager::SendRequestToBSC() { - auto ctx = SchemeShard->ActorContext(); - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] SendRequestToBSC: Generation# " << Generation); - - IsRequestToBSCScheduled = false; - std::unique_ptr<TEvBlobStorage::TEvControllerShredRequest> request( - new TEvBlobStorage::TEvControllerShredRequest(Generation)); - SchemeShard->PipeClientCache->Send(ctx, MakeBSControllerID(), request.release()); -} - -void TRootDataErasureManager::Complete() { - Status = EDataErasureStatus::COMPLETED; - auto ctx = SchemeShard->ActorContext(); - FinishTime = AppData(ctx)->TimeProvider->Now(); - TDuration dataErasureDuration = FinishTime - StartTime; - if (dataErasureDuration > DataErasureInterval) { - if (!IsManualStartup) { - SchemeShard->RunDataErasure(true); - } - } else { - CurrentWakeupInterval = DataErasureInterval - dataErasureDuration; - ScheduleDataErasureWakeup(); - } - - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] Complete: Generation# " << Generation - << ", duration# " << dataErasureDuration.Seconds() << " s"); -} - -bool TRootDataErasureManager::Restore(NIceDb::TNiceDb& db) { - { - auto rowset = db.Table<Schema::DataErasureGenerations>().Range().Select(); - if (!rowset.IsReady()) { - return false; - } - if (rowset.EndOfSet()) { - Status = EDataErasureStatus::UNSPECIFIED; - } else { - Generation = 0; - Status = EDataErasureStatus::UNSPECIFIED; - while (!rowset.EndOfSet()) { - ui64 generation = rowset.GetValue<Schema::DataErasureGenerations::Generation>(); - if (generation >= Generation) { - Generation = generation; - StartTime = TInstant::FromValue(rowset.GetValue<Schema::DataErasureGenerations::StartTime>()); - Status = rowset.GetValue<Schema::DataErasureGenerations::Status>(); - } - - if (!rowset.Next()) { - return false; - } - } - if (Status == EDataErasureStatus::UNSPECIFIED || Status == EDataErasureStatus::COMPLETED) { - auto ctx = SchemeShard->ActorContext(); - TDuration interval = AppData(ctx)->TimeProvider->Now() - StartTime; - if (interval > DataErasureInterval) { - CurrentWakeupInterval = TDuration::Zero(); - } else { - CurrentWakeupInterval = DataErasureInterval - interval; - } - } - } - } - - ui32 numberDataErasureTenantsInRunning = 0; - { - auto rowset = db.Table<Schema::WaitingDataErasureTenants>().Range().Select(); - if (!rowset.IsReady()) { - return false; - } - while (!rowset.EndOfSet()) { - TOwnerId ownerPathId = rowset.GetValue<Schema::WaitingDataErasureTenants::OwnerPathId>(); - TLocalPathId localPathId = rowset.GetValue<Schema::WaitingDataErasureTenants::LocalPathId>(); - TPathId pathId(ownerPathId, localPathId); - Y_VERIFY_S(SchemeShard->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId); - TPathElement::TPtr path = SchemeShard->PathsById.at(pathId); - Y_VERIFY_S(path->IsDomainRoot(), "Path is not a subdomain, pathId: " << pathId); - - Y_ABORT_UNLESS(SchemeShard->SubDomains.contains(pathId)); - - EDataErasureStatus status = rowset.GetValue<Schema::WaitingDataErasureTenants::Status>(); - WaitingDataErasureTenants[pathId] = status; - if (status == EDataErasureStatus::IN_PROGRESS) { - numberDataErasureTenantsInRunning++; - } - - if (!rowset.Next()) { - return false; - } - } - if (Status == EDataErasureStatus::IN_PROGRESS && (WaitingDataErasureTenants.empty() || numberDataErasureTenantsInRunning == 0)) { - Status = EDataErasureStatus::IN_PROGRESS_BSC; - } - } - - auto ctx = SchemeShard->ActorContext(); - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[RootDataErasureManager] Restore: Generation# " << Generation - << ", Status# " << static_cast<ui32>(Status) - << ", WakeupInterval# " << CurrentWakeupInterval.Seconds() << " s" - << ", NumberDataErasureTenantsInRunning# " << numberDataErasureTenantsInRunning); - - return true; -} - -bool TRootDataErasureManager::Remove(const TPathId& pathId) { - auto it = WaitingDataErasureTenants.find(pathId); - if (it != WaitingDataErasureTenants.end()) { - Queue->Remove(pathId); - ActivePipes.erase(pathId); - WaitingDataErasureTenants.erase(it); - - if (WaitingDataErasureTenants.empty()) { - Status = EDataErasureStatus::IN_PROGRESS_BSC; - SendRequestToBSC(); - } - return true; - } - return false; -} - -bool TRootDataErasureManager::Remove(const TShardIdx&) { - auto ctx = SchemeShard->ActorContext(); - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Remove] Cannot execute in root schemeshard: " << SchemeShard->TabletID()); - return false; -} - -void TRootDataErasureManager::HandleNewPartitioning(const std::vector<TShardIdx>&, NIceDb::TNiceDb&) { - auto ctx = SchemeShard->ActorContext(); - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [HandleNewPartitioning] Cannot execute in root schemeshard: " << SchemeShard->TabletID()); -} - -void TRootDataErasureManager::SyncBscGeneration(NIceDb::TNiceDb& db, ui64 currentBscGeneration) { - db.Table<Schema::DataErasureGenerations>().Key(GetGeneration()).Delete(); - SetGeneration(currentBscGeneration + 1); - db.Table<Schema::DataErasureGenerations>().Key(GetGeneration()).Update<Schema::DataErasureGenerations::Status, - Schema::DataErasureGenerations::StartTime>(GetStatus(), StartTime.MicroSeconds()); -} - -void TRootDataErasureManager::UpdateMetrics() { - SchemeShard->TabletCounters->Simple()[COUNTER_DATA_ERASURE_QUEUE_SIZE].Set(Queue->Size()); - SchemeShard->TabletCounters->Simple()[COUNTER_DATA_ERASURE_QUEUE_RUNNING].Set(Queue->RunningSize()); - SchemeShard->TabletCounters->Simple()[COUNTER_DATA_ERASURE_OK].Set(CounterDataErasureOk); - SchemeShard->TabletCounters->Simple()[COUNTER_DATA_ERASURE_TIMEOUT].Set(CounterDataErasureTimeout); -} - -TRootDataErasureManager::TQueue::TConfig TRootDataErasureManager::ConvertConfig(const NKikimrConfig::TDataErasureConfig& config) { - TQueue::TConfig queueConfig; - queueConfig.IsCircular = false; - queueConfig.MaxRate = config.GetMaxRate(); - queueConfig.InflightLimit = config.GetInflightLimit(); - queueConfig.Timeout = TDuration::Seconds(config.GetTimeoutSeconds()); - - return queueConfig; -} - -struct TSchemeShard::TTxDataErasureManagerInit : public TSchemeShard::TRwTxBase { - TTxDataErasureManagerInit(TSelf* self) - : TRwTxBase(self) - {} - - TTxType GetTxType() const override { return TXTYPE_DATA_ERASURE_INIT; } - - void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxDataErasureManagerInit Execute at schemeshard: " << Self->TabletID()); - NIceDb::TNiceDb db(txc.DB); - Self->DataErasureManager->SetStatus(EDataErasureStatus::COMPLETED); - db.Table<Schema::DataErasureGenerations>().Key(0).Update<Schema::DataErasureGenerations::Status, - Schema::DataErasureGenerations::StartTime>(Self->DataErasureManager->GetStatus(), AppData(ctx)->TimeProvider->Now().MicroSeconds()); - } - - void DoComplete(const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxDataErasureManagerInit Complete at schemeshard: " << Self->TabletID()); - } -}; - -NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxDataErasureManagerInit() { - return new TTxDataErasureManagerInit(this); -} - -struct TSchemeShard::TTxRunDataErasure : public TSchemeShard::TRwTxBase { - bool IsNewDataErasure; - bool NeedSendRequestToBSC = false; - - TTxRunDataErasure(TSelf *self, bool isNewDataErasure) - : TRwTxBase(self) - , IsNewDataErasure(isNewDataErasure) - {} - - TTxType GetTxType() const override { return TXTYPE_RUN_DATA_ERASURE; } - - void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxRunDataErasure Execute at schemeshard: " << Self->TabletID()); - - NIceDb::TNiceDb db(txc.DB); - auto& dataErasureManager = Self->DataErasureManager; - if (IsNewDataErasure) { - dataErasureManager->ClearOperationQueue(); - - dataErasureManager->ClearWaitingDataErasureRequests(db); - dataErasureManager->IncGeneration(); - dataErasureManager->Run(db); - } - if (Self->DataErasureManager->GetStatus() == EDataErasureStatus::IN_PROGRESS_BSC) { - NeedSendRequestToBSC = true; - } - } - - void DoComplete(const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxRunDataErasure Complete at schemeshard: " << Self->TabletID() - << ", NeedSendRequestToBSC# " << (NeedSendRequestToBSC ? "true" : "false")); - - if (NeedSendRequestToBSC) { - Self->DataErasureManager->SendRequestToBSC(); - } - } -}; - -NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxRunDataErasure(bool isNewDataErasure) { - return new TTxRunDataErasure(this, isNewDataErasure); -} - -struct TSchemeShard::TTxCompleteDataErasureTenant : public TSchemeShard::TRwTxBase { - const TEvSchemeShard::TEvTenantDataErasureResponse::TPtr Ev; - bool NeedSendRequestToBSC = false; - - TTxCompleteDataErasureTenant(TSelf* self, const TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev) - : TRwTxBase(self) - , Ev(std::move(ev)) - {} - - TTxType GetTxType() const override { return TXTYPE_COMPLETE_DATA_ERASURE_TENANT; } - - void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCompleteDataErasureTenant Execute at schemeshard: " << Self->TabletID()); - - const auto& record = Ev->Get()->Record; - auto& manager = Self->DataErasureManager; - const ui64 completedGeneration = record.GetGeneration(); - if (completedGeneration != manager->GetGeneration()) { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCompleteDataErasureTenant Unknown generation#" << completedGeneration << ", Expected gen# " << manager->GetGeneration() << " at schemestard: " << Self->TabletID()); - return; - } - - NIceDb::TNiceDb db(txc.DB); - auto pathId = TPathId( - record.GetPathId().GetOwnerId(), - record.GetPathId().GetLocalId()); - manager->OnDone(pathId, db); - if (manager->GetStatus() == EDataErasureStatus::IN_PROGRESS_BSC) { - NeedSendRequestToBSC = true; - } - } - - void DoComplete(const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCompleteDataErasureTenant Complete at schemeshard: " << Self->TabletID() - << ", NeedSendRequestToBSC# " << (NeedSendRequestToBSC ? "true" : "false")); - if (NeedSendRequestToBSC) { - Self->DataErasureManager->SendRequestToBSC(); - } - } -}; - -NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasureTenant(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev) { - return new TTxCompleteDataErasureTenant(this, ev); -} - -struct TSchemeShard::TTxCompleteDataErasureBSC : public TSchemeShard::TRwTxBase { - const TEvBlobStorage::TEvControllerShredResponse::TPtr Ev; - bool NeedScheduleRequestToBSC = false; - - TTxCompleteDataErasureBSC(TSelf* self, const TEvBlobStorage::TEvControllerShredResponse::TPtr& ev) - : TRwTxBase(self) - , Ev(std::move(ev)) - {} - - TTxType GetTxType() const override { return TXTYPE_COMPLETE_DATA_ERASURE_BSC; } - - void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCompleteDataErasureBSC Execute at schemeshard: " << Self->TabletID()); - - const auto& record = Ev->Get()->Record; - auto& manager = Self->DataErasureManager; - NIceDb::TNiceDb db(txc.DB); - if (ui64 currentBscGeneration = record.GetCurrentGeneration(); currentBscGeneration > manager->GetGeneration()) { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCompleteDataErasureBSC Unknown generation#" << currentBscGeneration << ", Expected gen# " << manager->GetGeneration() << " at schemestard: " << Self->TabletID()); - manager->SyncBscGeneration(db, currentBscGeneration); - manager->SendRequestToBSC(); - return; - } - - if (record.GetCompleted()) { - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxCompleteDataErasureBSC: Data shred in BSC is completed"); - manager->Complete(); - db.Table<Schema::DataErasureGenerations>().Key(Self->DataErasureManager->GetGeneration()).Update<Schema::DataErasureGenerations::Status>(Self->DataErasureManager->GetStatus()); - } else { - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxCompleteDataErasureBSC: Progress data shred in BSC " << static_cast<double>(record.GetProgress10k()) / 100 << "%"); - NeedScheduleRequestToBSC = true; - } - } - - void DoComplete(const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCompleteDataErasureBSC Complete at schemeshard: " << Self->TabletID() - << ", NeedScheduleRequestToBSC# " << (NeedScheduleRequestToBSC ? "true" : "false")); - - if (NeedScheduleRequestToBSC) { - Self->DataErasureManager->ScheduleRequestToBSC(); - } - } -}; - -NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasureBSC(TEvBlobStorage::TEvControllerShredResponse::TPtr& ev) { - return new TTxCompleteDataErasureBSC(this, ev); -} - -} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__root_shred_manager.cpp b/ydb/core/tx/schemeshard/schemeshard__root_shred_manager.cpp new file mode 100644 index 00000000000..f5341292f19 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__root_shred_manager.cpp @@ -0,0 +1,677 @@ +#include "schemeshard__shred_manager.h" + +#include <ydb/core/tx/schemeshard/schemeshard_impl.h> + +namespace NKikimr::NSchemeShard { + +TRootShredManager::TStarter::TStarter(TRootShredManager* const manager) + : Manager(manager) +{} + +NOperationQueue::EStartStatus TRootShredManager::TStarter::StartOperation(const TPathId& pathId) { + return Manager->StartShred(pathId); +} + +void TRootShredManager::TStarter::OnTimeout(const TPathId& pathId) { + Manager->OnTimeout(pathId); +} + +TRootShredManager::TRootShredManager(TSchemeShard* const schemeShard, const NKikimrConfig::TDataErasureConfig& config) + : TShredManager(schemeShard) + , Starter(this) + , Queue(new TQueue(ConvertConfig(config), Starter)) + , ShredInterval(TDuration::Seconds(config.GetDataErasureIntervalSeconds())) + , ShredBSCInterval(TDuration::Seconds(config.GetBlobStorageControllerRequestIntervalSeconds())) + , CurrentWakeupInterval(ShredInterval) + , BSC(MakeBSControllerID()) + , IsManualStartup((ShredInterval.Seconds() == 0 ? true : false)) +{ + const auto ctx = SchemeShard->ActorContext(); + ctx.RegisterWithSameMailbox(Queue); + + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] Created: Timeout# " << config.GetTimeoutSeconds() + << ", Rate# " << Queue->GetRate() + << ", InflightLimit# " << config.GetInflightLimit() + << ", ShredInterval# " << ShredInterval + << ", ShredBSCInterval# " << ShredBSCInterval + << ", CurrentWakeupInterval# " << CurrentWakeupInterval + << ", IsManualStartup# " << (IsManualStartup ? "true" : "false")); +} + +void TRootShredManager::UpdateConfig(const NKikimrConfig::TDataErasureConfig& config) { + TRootShredManager::TQueue::TConfig queueConfig = ConvertConfig(config); + Queue->UpdateConfig(queueConfig); + ShredInterval = TDuration::Seconds(config.GetDataErasureIntervalSeconds()); + ShredBSCInterval = TDuration::Seconds(config.GetBlobStorageControllerRequestIntervalSeconds()); + CurrentWakeupInterval = ShredInterval; + BSC = TTabletId(MakeBSControllerID()); + IsManualStartup = (ShredInterval.Seconds() == 0 ? true : false); + + const auto ctx = SchemeShard->ActorContext(); + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] Config updated: Timeout# " << queueConfig.Timeout + << ", Rate# " << Queue->GetRate() + << ", InflightLimit# " << queueConfig.InflightLimit + << ", ShredInterval# " << ShredInterval + << ", ShredBSCInterval# " << ShredBSCInterval + << ", CurrentWakeupInterval# " << CurrentWakeupInterval + << ", IsManualStartup# " << (IsManualStartup ? "true" : "false")); +} + +void TRootShredManager::Start() { + TShredManager::Start(); + const auto ctx = SchemeShard->ActorContext(); + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] Start: Status# " << static_cast<ui32>(Status)); + + Queue->Start(); + if (Status == EShredStatus::UNSPECIFIED) { + SchemeShard->MarkFirstRunRootShredManager(); + ScheduleShredWakeup(); + } else if (Status == EShredStatus::COMPLETED) { + ScheduleShredWakeup(); + } else { + ClearOperationQueue(); + Continue(); + } +} + +void TRootShredManager::Stop() { + TShredManager::Stop(); + const auto ctx = SchemeShard->ActorContext(); + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] Stop"); + + Queue->Stop(); +} + +void TRootShredManager::ClearOperationQueue() { + const auto ctx = SchemeShard->ActorContext(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] Clear operation queue and active pipes"); + + Queue->Clear(); + ActivePipes.clear(); +} + +void TRootShredManager::ClearWaitingShredRequests(NIceDb::TNiceDb& db) { + const auto ctx = SchemeShard->ActorContext(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] Clear WaitingShredTenants: Size# " << WaitingShredTenants.size()); + + for (const auto& [pathId, status] : WaitingShredTenants) { + db.Table<Schema::WaitingShredTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Delete(); + } + ClearWaitingShredRequests(); +} + +void TRootShredManager::ClearWaitingShredRequests() { + WaitingShredTenants.clear(); +} + +void TRootShredManager::Run(NIceDb::TNiceDb& db) { + CounterShredOk = 0; + CounterShredTimeout = 0; + Status = EShredStatus::IN_PROGRESS; + StartTime = AppData(SchemeShard->ActorContext())->TimeProvider->Now(); + for (auto& [pathId, subdomain] : SchemeShard->SubDomains) { + auto path = TPath::Init(pathId, SchemeShard); + if (path->IsRoot()) { + continue; + } + if (subdomain->GetTenantSchemeShardID() == InvalidTabletId) { // no tenant schemeshard + continue; + } + Enqueue(pathId); + WaitingShredTenants[pathId] = EShredStatus::IN_PROGRESS; + db.Table<Schema::WaitingShredTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Update<Schema::WaitingShredTenants::Status>(WaitingShredTenants[pathId]); + } + if (WaitingShredTenants.empty()) { + Status = EShredStatus::IN_PROGRESS_BSC; + } + db.Table<Schema::ShredGenerations>().Key(Generation).Update<Schema::ShredGenerations::Status, + Schema::ShredGenerations::StartTime>(Status, StartTime.MicroSeconds()); + + const auto ctx = SchemeShard->ActorContext(); + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] Run: Queue.Size# " << Queue->Size() + << ", WaitingShredTenants.size# " << WaitingShredTenants.size() + << ", Status# " << static_cast<ui32>(Status)); +} + +void TRootShredManager::Continue() { + if (Status == EShredStatus::IN_PROGRESS) { + for (const auto& [pathId, status] : WaitingShredTenants) { + if (status == EShredStatus::IN_PROGRESS) { + Enqueue(pathId); + } + } + } else if (Status == EShredStatus::IN_PROGRESS_BSC) { + SendRequestToBSC(); + } + + const auto ctx = SchemeShard->ActorContext(); + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] Continue: Queue.Size# " << Queue->Size() + << ", Status# " << static_cast<ui32>(Status)); +} + +void TRootShredManager::ScheduleShredWakeup() { + if (IsManualStartup || IsShredWakeupScheduled) { + return; + } + + const auto ctx = SchemeShard->ActorContext(); + ctx.Schedule(CurrentWakeupInterval, new TEvSchemeShard::TEvWakeupToRunShred); + IsShredWakeupScheduled = true; + + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] ScheduleShredWakeup: Interval# " << CurrentWakeupInterval << ", Timestamp# " << AppData(ctx)->TimeProvider->Now()); +} + +void TRootShredManager::WakeupToRunShred(TEvSchemeShard::TEvWakeupToRunShred::TPtr& ev, const NActors::TActorContext& ctx) { + Y_UNUSED(ev); + IsShredWakeupScheduled = false; + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] WakeupToRunShred: Timestamp# " << AppData(ctx)->TimeProvider->Now()); + SchemeShard->RunShred(true); +} + +NOperationQueue::EStartStatus TRootShredManager::StartShred(const TPathId& pathId) { + UpdateMetrics(); + + auto ctx = SchemeShard->ActorContext(); + auto it = SchemeShard->SubDomains.find(pathId); + if (it == SchemeShard->SubDomains.end()) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootShredManager] [Start] Failed to resolve subdomain info " + "for pathId# " << pathId + << " at schemeshard# " << SchemeShard->TabletID()); + + return NOperationQueue::EStartStatus::EOperationRemove; + } + + const auto& tenantSchemeShardId = it->second->GetTenantSchemeShardID(); + + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootShredManager] [Start] Shred " + "for pathId# " << pathId + << ", tenant schemeshard# " << tenantSchemeShardId + << ", next wakeup# " << Queue->GetWakeupDelta() + << ", rate# " << Queue->GetRate() + << ", in queue# " << Queue->Size() << " tenants" + << ", running# " << Queue->RunningSize() << " tenants" + << " at schemeshard " << SchemeShard->TabletID()); + + std::unique_ptr<TEvSchemeShard::TEvTenantShredRequest> request( + new TEvSchemeShard::TEvTenantShredRequest(Generation)); + + ActivePipes[pathId] = SchemeShard->PipeClientCache->Send( + ctx, + ui64(tenantSchemeShardId), + request.release()); + + return NOperationQueue::EStartStatus::EOperationRunning; +} + +void TRootShredManager::OnTimeout(const TPathId& pathId) { + CounterShredTimeout++; + UpdateMetrics(); + + ActivePipes.erase(pathId); + + auto ctx = SchemeShard->ActorContext(); + if (!SchemeShard->SubDomains.contains(pathId)) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootShredManager] [Timeout] Failed to resolve subdomain info " + "for path# " << pathId + << " at schemeshard# " << SchemeShard->TabletID()); + return; + } + + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootShredManager] [Timeout] Shred timeouted " + "for pathId# " << pathId + << ", next wakeup in# " << Queue->GetWakeupDelta() + << ", rate# " << Queue->GetRate() + << ", in queue# " << Queue->Size() << " tenants" + << ", running# " << Queue->RunningSize() << " tenants" + << " at schemeshard " << SchemeShard->TabletID()); + + // retry + Enqueue(pathId); +} + +void TRootShredManager::Enqueue(const TPathId& pathId) { + auto ctx = SchemeShard->ActorContext(); + + if (Queue->Enqueue(pathId)) { + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] [Enqueue] Enqueued pathId# " << pathId << " at schemeshard " << SchemeShard->TabletID()); + UpdateMetrics(); + } else { + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] [Enqueue] Skipped or already exists pathId# " << pathId << " at schemeshard " << SchemeShard->TabletID()); + } +} + +void TRootShredManager::HandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) { + if (tabletId == BSC) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] HandleDisconnect resend request to BSC at schemeshard " << SchemeShard->TabletID()); + + SendRequestToBSC(); + return; + } + + const auto shardIdx = SchemeShard->GetShardIdx(tabletId); + if (!SchemeShard->ShardInfos.contains(shardIdx)) { + return; + } + + const auto& pathId = SchemeShard->ShardInfos.at(shardIdx).PathId; + if (!SchemeShard->TTLEnabledTables.contains(pathId)) { + return; + } + + const auto it = ActivePipes.find(pathId); + if (it == ActivePipes.end()) { + return; + } + + if (it->second != clientId) { + return; + } + + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootShredManager] [Disconnect] Shred disconnect " + "to tablet: " << tabletId + << ", at schemeshard: " << SchemeShard->TabletID()); + + ActivePipes.erase(pathId); + StartShred(pathId); +} + +void TRootShredManager::OnDone(const TPathId& pathId, NIceDb::TNiceDb& db) { + auto duration = Queue->OnDone(pathId); + + auto ctx = SchemeShard->ActorContext(); + if (!SchemeShard->SubDomains.contains(pathId)) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootShredManager] [Finished] Failed to resolve subdomain info " + "for pathId# " << pathId + << " in# " << duration.MilliSeconds() << " ms" + << ", next wakeup in# " << Queue->GetWakeupDelta() + << ", rate# " << Queue->GetRate() + << ", in queue# " << Queue->Size() << " tenants" + << ", running# " << Queue->RunningSize() << " tenants" + << " at schemeshard " << SchemeShard->TabletID()); + } else { + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootShredManager] [Finished] Shred completed " + "for pathId# " << pathId + << " in# " << duration.MilliSeconds() << " ms" + << ", next wakeup# " << Queue->GetWakeupDelta() + << ", rate# " << Queue->GetRate() + << ", in queue# " << Queue->Size() << " tenants" + << ", running# " << Queue->RunningSize() << " tenants" + << " at schemeshard " << SchemeShard->TabletID()); + } + + ActivePipes.erase(pathId); + auto it = WaitingShredTenants.find(pathId); + if (it != WaitingShredTenants.end()) { + db.Table<Schema::WaitingShredTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Delete(); + WaitingShredTenants.erase(it); + } + + CounterShredOk++; + UpdateMetrics(); + + if (WaitingShredTenants.empty()) { + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] Shred in tenants is completed. Send request to BS controller"); + Status = EShredStatus::IN_PROGRESS_BSC; + db.Table<Schema::ShredGenerations>().Key(Generation).Update<Schema::ShredGenerations::Status>(Status); + } +} + +void TRootShredManager::OnDone(const TTabletId&, NIceDb::TNiceDb&) { + auto ctx = SchemeShard->ActorContext(); + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootShredManager] [OnDone] Cannot execute in root schemeshard: " << SchemeShard->TabletID()); +} + +void TRootShredManager::ScheduleRequestToBSC() { + if (IsRequestToBSCScheduled) { + return; + } + + auto ctx = SchemeShard->ActorContext(); + ctx.Schedule(ShredBSCInterval, new TEvSchemeShard::TEvWakeupToRunShredBSC); + IsRequestToBSCScheduled = true; + + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] ScheduleRequestToBSC: Interval# " << ShredBSCInterval); +} + +void TRootShredManager::SendRequestToBSC() { + auto ctx = SchemeShard->ActorContext(); + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] SendRequestToBSC: Generation# " << Generation); + + IsRequestToBSCScheduled = false; + std::unique_ptr<TEvBlobStorage::TEvControllerShredRequest> request( + new TEvBlobStorage::TEvControllerShredRequest(Generation)); + SchemeShard->PipeClientCache->Send(ctx, MakeBSControllerID(), request.release()); +} + +void TRootShredManager::Complete() { + Status = EShredStatus::COMPLETED; + auto ctx = SchemeShard->ActorContext(); + FinishTime = AppData(ctx)->TimeProvider->Now(); + TDuration shredDuration = FinishTime - StartTime; + if (shredDuration > ShredInterval) { + if (!IsManualStartup) { + SchemeShard->RunShred(true); + } + } else { + CurrentWakeupInterval = ShredInterval - shredDuration; + ScheduleShredWakeup(); + } + + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] Complete: Generation# " << Generation + << ", duration# " << shredDuration.Seconds() << " s"); +} + +bool TRootShredManager::Restore(NIceDb::TNiceDb& db) { + { + auto rowset = db.Table<Schema::ShredGenerations>().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + if (rowset.EndOfSet()) { + Status = EShredStatus::UNSPECIFIED; + } else { + Generation = 0; + Status = EShredStatus::UNSPECIFIED; + while (!rowset.EndOfSet()) { + ui64 generation = rowset.GetValue<Schema::ShredGenerations::Generation>(); + if (generation >= Generation) { + Generation = generation; + StartTime = TInstant::FromValue(rowset.GetValue<Schema::ShredGenerations::StartTime>()); + Status = rowset.GetValue<Schema::ShredGenerations::Status>(); + } + + if (!rowset.Next()) { + return false; + } + } + if (Status == EShredStatus::UNSPECIFIED || Status == EShredStatus::COMPLETED) { + auto ctx = SchemeShard->ActorContext(); + TDuration interval = AppData(ctx)->TimeProvider->Now() - StartTime; + if (interval > ShredInterval) { + CurrentWakeupInterval = TDuration::Zero(); + } else { + CurrentWakeupInterval = ShredInterval - interval; + } + } + } + } + + ui32 numberShredTenantsInRunning = 0; + { + auto rowset = db.Table<Schema::WaitingShredTenants>().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + while (!rowset.EndOfSet()) { + TOwnerId ownerPathId = rowset.GetValue<Schema::WaitingShredTenants::OwnerPathId>(); + TLocalPathId localPathId = rowset.GetValue<Schema::WaitingShredTenants::LocalPathId>(); + TPathId pathId(ownerPathId, localPathId); + Y_VERIFY_S(SchemeShard->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId); + TPathElement::TPtr path = SchemeShard->PathsById.at(pathId); + Y_VERIFY_S(path->IsDomainRoot(), "Path is not a subdomain, pathId: " << pathId); + + Y_ABORT_UNLESS(SchemeShard->SubDomains.contains(pathId)); + + EShredStatus status = rowset.GetValue<Schema::WaitingShredTenants::Status>(); + WaitingShredTenants[pathId] = status; + if (status == EShredStatus::IN_PROGRESS) { + numberShredTenantsInRunning++; + } + + if (!rowset.Next()) { + return false; + } + } + if (Status == EShredStatus::IN_PROGRESS && (WaitingShredTenants.empty() || numberShredTenantsInRunning == 0)) { + Status = EShredStatus::IN_PROGRESS_BSC; + } + } + + auto ctx = SchemeShard->ActorContext(); + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootShredManager] Restore: Generation# " << Generation + << ", Status# " << static_cast<ui32>(Status) + << ", WakeupInterval# " << CurrentWakeupInterval.Seconds() << " s" + << ", NumberShredTenantsInRunning# " << numberShredTenantsInRunning); + + return true; +} + +bool TRootShredManager::Remove(const TPathId& pathId) { + auto it = WaitingShredTenants.find(pathId); + if (it != WaitingShredTenants.end()) { + Queue->Remove(pathId); + ActivePipes.erase(pathId); + WaitingShredTenants.erase(it); + + if (WaitingShredTenants.empty()) { + Status = EShredStatus::IN_PROGRESS_BSC; + SendRequestToBSC(); + } + return true; + } + return false; +} + +bool TRootShredManager::Remove(const TShardIdx&) { + auto ctx = SchemeShard->ActorContext(); + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootShredManager] [Remove] Cannot execute in root schemeshard: " << SchemeShard->TabletID()); + return false; +} + +void TRootShredManager::HandleNewPartitioning(const std::vector<TShardIdx>&, NIceDb::TNiceDb&) { + auto ctx = SchemeShard->ActorContext(); + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootShredManager] [HandleNewPartitioning] Cannot execute in root schemeshard: " << SchemeShard->TabletID()); +} + +void TRootShredManager::SyncBscGeneration(NIceDb::TNiceDb& db, ui64 currentBscGeneration) { + db.Table<Schema::ShredGenerations>().Key(GetGeneration()).Delete(); + SetGeneration(currentBscGeneration + 1); + db.Table<Schema::ShredGenerations>().Key(GetGeneration()).Update<Schema::ShredGenerations::Status, + Schema::ShredGenerations::StartTime>(GetStatus(), StartTime.MicroSeconds()); +} + +void TRootShredManager::UpdateMetrics() { + SchemeShard->TabletCounters->Simple()[COUNTER_SHRED_QUEUE_SIZE].Set(Queue->Size()); + SchemeShard->TabletCounters->Simple()[COUNTER_SHRED_QUEUE_RUNNING].Set(Queue->RunningSize()); + SchemeShard->TabletCounters->Simple()[COUNTER_SHRED_OK].Set(CounterShredOk); + SchemeShard->TabletCounters->Simple()[COUNTER_SHRED_TIMEOUT].Set(CounterShredTimeout); +} + +TRootShredManager::TQueue::TConfig TRootShredManager::ConvertConfig(const NKikimrConfig::TDataErasureConfig& config) { + TQueue::TConfig queueConfig; + queueConfig.IsCircular = false; + queueConfig.MaxRate = config.GetMaxRate(); + queueConfig.InflightLimit = config.GetInflightLimit(); + queueConfig.Timeout = TDuration::Seconds(config.GetTimeoutSeconds()); + + return queueConfig; +} + +struct TSchemeShard::TTxShredManagerInit : public TSchemeShard::TRwTxBase { + TTxShredManagerInit(TSelf* self) + : TRwTxBase(self) + {} + + TTxType GetTxType() const override { return TXTYPE_SHRED_INIT; } + + void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxShredManagerInit Execute at schemeshard: " << Self->TabletID()); + NIceDb::TNiceDb db(txc.DB); + Self->ShredManager->SetStatus(EShredStatus::COMPLETED); + db.Table<Schema::ShredGenerations>().Key(0).Update<Schema::ShredGenerations::Status, + Schema::ShredGenerations::StartTime>(Self->ShredManager->GetStatus(), AppData(ctx)->TimeProvider->Now().MicroSeconds()); + } + + void DoComplete(const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxShredManagerInit Complete at schemeshard: " << Self->TabletID()); + } +}; + +NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxShredManagerInit() { + return new TTxShredManagerInit(this); +} + +struct TSchemeShard::TTxRunShred : public TSchemeShard::TRwTxBase { + bool IsNewShred; + bool NeedSendRequestToBSC = false; + + TTxRunShred(TSelf *self, bool isNewShred) + : TRwTxBase(self) + , IsNewShred(isNewShred) + {} + + TTxType GetTxType() const override { return TXTYPE_RUN_SHRED; } + + void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxRunShred Execute at schemeshard: " << Self->TabletID()); + + NIceDb::TNiceDb db(txc.DB); + auto& shredManager = Self->ShredManager; + if (IsNewShred) { + shredManager->ClearOperationQueue(); + + shredManager->ClearWaitingShredRequests(db); + shredManager->IncGeneration(); + shredManager->Run(db); + } + if (Self->ShredManager->GetStatus() == EShredStatus::IN_PROGRESS_BSC) { + NeedSendRequestToBSC = true; + } + } + + void DoComplete(const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxRunShred Complete at schemeshard: " << Self->TabletID() + << ", NeedSendRequestToBSC# " << (NeedSendRequestToBSC ? "true" : "false")); + + if (NeedSendRequestToBSC) { + Self->ShredManager->SendRequestToBSC(); + } + } +}; + +NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxRunShred(bool isNewShred) { + return new TTxRunShred(this, isNewShred); +} + +struct TSchemeShard::TTxCompleteShredTenant : public TSchemeShard::TRwTxBase { + const TEvSchemeShard::TEvTenantShredResponse::TPtr Ev; + bool NeedSendRequestToBSC = false; + + TTxCompleteShredTenant(TSelf* self, const TEvSchemeShard::TEvTenantShredResponse::TPtr& ev) + : TRwTxBase(self) + , Ev(std::move(ev)) + {} + + TTxType GetTxType() const override { return TXTYPE_COMPLETE_SHRED_TENANT; } + + void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteShredTenant Execute at schemeshard: " << Self->TabletID()); + + const auto& record = Ev->Get()->Record; + auto& manager = Self->ShredManager; + const ui64 completedGeneration = record.GetGeneration(); + if (completedGeneration != manager->GetGeneration()) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteShredTenant Unknown generation#" << completedGeneration << ", Expected gen# " << manager->GetGeneration() << " at schemestard: " << Self->TabletID()); + return; + } + + NIceDb::TNiceDb db(txc.DB); + auto pathId = TPathId( + record.GetPathId().GetOwnerId(), + record.GetPathId().GetLocalId()); + manager->OnDone(pathId, db); + if (manager->GetStatus() == EShredStatus::IN_PROGRESS_BSC) { + NeedSendRequestToBSC = true; + } + } + + void DoComplete(const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteShredTenant Complete at schemeshard: " << Self->TabletID() + << ", NeedSendRequestToBSC# " << (NeedSendRequestToBSC ? "true" : "false")); + if (NeedSendRequestToBSC) { + Self->ShredManager->SendRequestToBSC(); + } + } +}; + +NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteShredTenant(TEvSchemeShard::TEvTenantShredResponse::TPtr& ev) { + return new TTxCompleteShredTenant(this, ev); +} + +struct TSchemeShard::TTxCompleteShredBSC : public TSchemeShard::TRwTxBase { + const TEvBlobStorage::TEvControllerShredResponse::TPtr Ev; + bool NeedScheduleRequestToBSC = false; + + TTxCompleteShredBSC(TSelf* self, const TEvBlobStorage::TEvControllerShredResponse::TPtr& ev) + : TRwTxBase(self) + , Ev(std::move(ev)) + {} + + TTxType GetTxType() const override { return TXTYPE_COMPLETE_SHRED_BSC; } + + void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteShredBSC Execute at schemeshard: " << Self->TabletID()); + + const auto& record = Ev->Get()->Record; + auto& manager = Self->ShredManager; + NIceDb::TNiceDb db(txc.DB); + if (ui64 currentBscGeneration = record.GetCurrentGeneration(); currentBscGeneration > manager->GetGeneration()) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteShredBSC Unknown generation#" << currentBscGeneration << ", Expected gen# " << manager->GetGeneration() << " at schemestard: " << Self->TabletID()); + manager->SyncBscGeneration(db, currentBscGeneration); + manager->SendRequestToBSC(); + return; + } + + if (record.GetCompleted()) { + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxCompleteShredBSC: Data shred in BSC is completed"); + manager->Complete(); + db.Table<Schema::ShredGenerations>().Key(Self->ShredManager->GetGeneration()).Update<Schema::ShredGenerations::Status>(Self->ShredManager->GetStatus()); + } else { + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxCompleteShredBSC: Progress data shred in BSC " << static_cast<double>(record.GetProgress10k()) / 100 << "%"); + NeedScheduleRequestToBSC = true; + } + } + + void DoComplete(const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteShredBSC Complete at schemeshard: " << Self->TabletID() + << ", NeedScheduleRequestToBSC# " << (NeedScheduleRequestToBSC ? "true" : "false")); + + if (NeedScheduleRequestToBSC) { + Self->ShredManager->ScheduleRequestToBSC(); + } + } +}; + +NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteShredBSC(TEvBlobStorage::TEvControllerShredResponse::TPtr& ev) { + return new TTxCompleteShredBSC(this, ev); +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__shred_manager.cpp b/ydb/core/tx/schemeshard/schemeshard__shred_manager.cpp new file mode 100644 index 00000000000..1ef32a4f87e --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__shred_manager.cpp @@ -0,0 +1,46 @@ +#include "schemeshard__shred_manager.h" + +namespace NKikimr::NSchemeShard { + +TShredManager::TShredManager(TSchemeShard* const schemeShard) + : SchemeShard(schemeShard) +{} + +EShredStatus TShredManager::GetStatus() const { + return Status; +} + +void TShredManager::SetStatus(const EShredStatus& status) { + Status = status; +} + +void TShredManager::IncGeneration() { + ++Generation; +} + +void TShredManager::SetGeneration(ui64 generation) { + Generation = generation; +} + +ui64 TShredManager::GetGeneration() const { + return Generation; +} + +void TShredManager::Clear() { + ClearOperationQueue(); + ClearWaitingShredRequests(); +} + +void TShredManager::Start() { + Running = true; +} + +void TShredManager::Stop() { + Running = false; +} + +bool TShredManager::IsRunning() const { + return Running; +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.h b/ydb/core/tx/schemeshard/schemeshard__shred_manager.h index c9fe7335e7a..ab5178a1177 100644 --- a/ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.h +++ b/ydb/core/tx/schemeshard/schemeshard__shred_manager.h @@ -18,28 +18,28 @@ namespace NKikimr::NSchemeShard { class TSchemeShard; -class TDataErasureManager { +class TShredManager { protected: TSchemeShard* const SchemeShard; - EDataErasureStatus Status = EDataErasureStatus::UNSPECIFIED; + EShredStatus Status = EShredStatus::UNSPECIFIED; ui64 Generation = 0; bool Running = false; - ui64 CounterDataErasureOk = 0; - ui64 CounterDataErasureTimeout = 0; + ui64 CounterShredOk = 0; + ui64 CounterShredTimeout = 0; public: - TDataErasureManager(TSchemeShard* const schemeShard); + TShredManager(TSchemeShard* const schemeShard); - virtual ~TDataErasureManager() = default; + virtual ~TShredManager() = default; virtual void UpdateConfig(const NKikimrConfig::TDataErasureConfig& config) = 0; virtual void Start(); virtual void Stop(); virtual void ClearOperationQueue() = 0; - virtual void ClearWaitingDataErasureRequests(NIceDb::TNiceDb& db) = 0; - virtual void ClearWaitingDataErasureRequests() = 0; - virtual void WakeupToRunDataErasure(TEvSchemeShard::TEvWakeupToRunDataErasure::TPtr& ev, const NActors::TActorContext& ctx) = 0; + virtual void ClearWaitingShredRequests(NIceDb::TNiceDb& db) = 0; + virtual void ClearWaitingShredRequests() = 0; + virtual void WakeupToRunShred(TEvSchemeShard::TEvWakeupToRunShred::TPtr& ev, const NActors::TActorContext& ctx) = 0; virtual void Run(NIceDb::TNiceDb& db) = 0; virtual void Continue() = 0; virtual void HandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) = 0; @@ -51,13 +51,13 @@ public: virtual bool Restore(NIceDb::TNiceDb& db) = 0; virtual bool Remove(const TPathId& pathId) = 0; virtual bool Remove(const TShardIdx& shardIdx) = 0; - virtual void HandleNewPartitioning(const std::vector<TShardIdx>& dataErasureShards, NIceDb::TNiceDb& db) = 0; + virtual void HandleNewPartitioning(const std::vector<TShardIdx>& shredShards, NIceDb::TNiceDb& db) = 0; virtual void SyncBscGeneration(NIceDb::TNiceDb& db, ui64 currentBscGeneration) = 0; void Clear(); - EDataErasureStatus GetStatus() const; - void SetStatus(const EDataErasureStatus& status); + EShredStatus GetStatus() const; + void SetStatus(const EShredStatus& status); void IncGeneration(); void SetGeneration(ui64 generation); @@ -66,38 +66,38 @@ public: bool IsRunning() const; }; -//////////////////// TRootDataErasureManager //////////////////// +//////////////////// TRootShredManager //////////////////// -class TRootDataErasureManager : public TDataErasureManager { +class TRootShredManager : public TShredManager { private: using TQueue = NOperationQueue::TOperationQueueWithTimer< TPathId, TFifoQueue<TPathId>, - TEvPrivate::EvRunDataErasure, + TEvPrivate::EvRunShred, NKikimrServices::FLAT_TX_SCHEMESHARD, - NKikimrServices::TActivity::DATA_ERASURE>; + NKikimrServices::TActivity::SCHEMESHARD_SHRED>; class TStarter : public TQueue::IStarter { public: - TStarter(TRootDataErasureManager* const manager); + TStarter(TRootShredManager* const manager); NOperationQueue::EStartStatus StartOperation(const TPathId&) override; void OnTimeout(const TPathId&) override; private: - TRootDataErasureManager* const Manager; + TRootShredManager* const Manager; }; private: TStarter Starter; TQueue* Queue = nullptr; - THashMap<TPathId, EDataErasureStatus> WaitingDataErasureTenants; + THashMap<TPathId, EShredStatus> WaitingShredTenants; THashMap<TPathId, TActorId> ActivePipes; - TDuration DataErasureInterval; - TDuration DataErasureBSCInterval; + TDuration ShredInterval; + TDuration ShredBSCInterval; TDuration CurrentWakeupInterval; - bool IsDataErasureWakeupScheduled = false; + bool IsShredWakeupScheduled = false; bool IsRequestToBSCScheduled = false; TInstant StartTime; TInstant FinishTime; @@ -106,15 +106,15 @@ private: bool IsManualStartup = false; public: - TRootDataErasureManager(TSchemeShard* const schemeShard, const NKikimrConfig::TDataErasureConfig& config); + TRootShredManager(TSchemeShard* const schemeShard, const NKikimrConfig::TDataErasureConfig& config); void UpdateConfig(const NKikimrConfig::TDataErasureConfig& config) override; void Start() override; void Stop() override; void ClearOperationQueue() override; - void ClearWaitingDataErasureRequests(NIceDb::TNiceDb& db) override; - void ClearWaitingDataErasureRequests() override; - void WakeupToRunDataErasure(TEvSchemeShard::TEvWakeupToRunDataErasure::TPtr& ev, const NActors::TActorContext& ctx) override; + void ClearWaitingShredRequests(NIceDb::TNiceDb& db) override; + void ClearWaitingShredRequests() override; + void WakeupToRunShred(TEvSchemeShard::TEvWakeupToRunShred::TPtr& ev, const NActors::TActorContext& ctx) override; void Run(NIceDb::TNiceDb& db) override; void Continue() override; void HandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) override; @@ -126,57 +126,57 @@ public: bool Restore(NIceDb::TNiceDb& db) override; bool Remove(const TPathId& pathId) override; bool Remove(const TShardIdx& shardIdx) override; - void HandleNewPartitioning(const std::vector<TShardIdx>& dataErasureShards, NIceDb::TNiceDb& db) override; + void HandleNewPartitioning(const std::vector<TShardIdx>& shredShards, NIceDb::TNiceDb& db) override; void SyncBscGeneration(NIceDb::TNiceDb& db, ui64 currentBscGeneration) override; private: static TQueue::TConfig ConvertConfig(const NKikimrConfig::TDataErasureConfig& config); - void ScheduleDataErasureWakeup(); - NOperationQueue::EStartStatus StartDataErasure(const TPathId& pathId); + void ScheduleShredWakeup(); + NOperationQueue::EStartStatus StartShred(const TPathId& pathId); void OnTimeout(const TPathId& pathId); void Enqueue(const TPathId& pathId); void UpdateMetrics(); }; -//////////////////// TTenantDataErasureManager //////////////////// +//////////////////// TTenantShredManager //////////////////// -class TTenantDataErasureManager : public TDataErasureManager { +class TTenantShredManager : public TShredManager { private: using TQueue = NOperationQueue::TOperationQueueWithTimer< TShardIdx, TFifoQueue<TShardIdx>, - TEvPrivate::EvRunTenantDataErasure, + TEvPrivate::EvRunTenantShred, NKikimrServices::FLAT_TX_SCHEMESHARD, - NKikimrServices::TActivity::TENANT_DATA_ERASURE>; + NKikimrServices::TActivity::SCHEMESHARD_TENANT_SHRED>; class TStarter : public TQueue::IStarter { public: - TStarter(TTenantDataErasureManager* const manager); + TStarter(TTenantShredManager* const manager); NOperationQueue::EStartStatus StartOperation(const TShardIdx& shardIdx) override; void OnTimeout(const TShardIdx& shardIdx) override; private: - TTenantDataErasureManager* const Manager; + TTenantShredManager* const Manager; }; private: TStarter Starter; TQueue* Queue = nullptr; - THashMap<TShardIdx, EDataErasureStatus> WaitingDataErasureShards; + THashMap<TShardIdx, EShredStatus> WaitingShredShards; THashMap<TShardIdx, TActorId> ActivePipes; public: - TTenantDataErasureManager(TSchemeShard* const schemeShard, const NKikimrConfig::TDataErasureConfig& config); + TTenantShredManager(TSchemeShard* const schemeShard, const NKikimrConfig::TDataErasureConfig& config); void UpdateConfig(const NKikimrConfig::TDataErasureConfig& config) override; void Start() override; void Stop() override; void ClearOperationQueue() override; - void ClearWaitingDataErasureRequests(NIceDb::TNiceDb& db) override; - void ClearWaitingDataErasureRequests() override; - void WakeupToRunDataErasure(TEvSchemeShard::TEvWakeupToRunDataErasure::TPtr& ev, const NActors::TActorContext& ctx) override; + void ClearWaitingShredRequests(NIceDb::TNiceDb& db) override; + void ClearWaitingShredRequests() override; + void WakeupToRunShred(TEvSchemeShard::TEvWakeupToRunShred::TPtr& ev, const NActors::TActorContext& ctx) override; void Run(NIceDb::TNiceDb& db) override; void Continue() override; void HandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) override; @@ -188,13 +188,13 @@ public: bool Restore(NIceDb::TNiceDb& db) override; bool Remove(const TPathId& pathId) override; bool Remove(const TShardIdx& shardIdx) override; - void HandleNewPartitioning(const std::vector<TShardIdx>& dataErasureShards, NIceDb::TNiceDb& db) override; + void HandleNewPartitioning(const std::vector<TShardIdx>& shredShards, NIceDb::TNiceDb& db) override; void SyncBscGeneration(NIceDb::TNiceDb& db, ui64 currentBscGeneration) override; private: static TQueue::TConfig ConvertConfig(const NKikimrConfig::TDataErasureConfig& config); - NOperationQueue::EStartStatus StartDataErasure(const TShardIdx& shardIdx); + NOperationQueue::EStartStatus StartShred(const TShardIdx& shardIdx); void OnTimeout(const TShardIdx& shardIdx); void Enqueue(const TShardIdx& shardIdx); void UpdateMetrics(); diff --git a/ydb/core/tx/schemeshard/schemeshard__tenant_data_erasure_manager.cpp b/ydb/core/tx/schemeshard/schemeshard__tenant_shred_manager.cpp index 56077ac61bd..1f56ee324b0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__tenant_data_erasure_manager.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__tenant_shred_manager.cpp @@ -1,4 +1,4 @@ -#include "schemeshard__data_erasure_manager.h" +#include "schemeshard__shred_manager.h" #include <ydb/core/keyvalue/keyvalue_events.h> #include <ydb/core/tx/schemeshard/schemeshard_impl.h> @@ -8,8 +8,8 @@ namespace NKikimr::NSchemeShard { namespace { void SendResponseToRootSchemeShard(TSchemeShard* const schemeShard, const TActorContext& ctx) { - std::unique_ptr<TEvSchemeShard::TEvTenantDataErasureResponse> response( - new TEvSchemeShard::TEvTenantDataErasureResponse(schemeShard->ParentDomainId, schemeShard->DataErasureManager->GetGeneration(), NKikimrScheme::TEvTenantDataErasureResponse::COMPLETED)); + std::unique_ptr<TEvSchemeShard::TEvTenantShredResponse> response( + new TEvSchemeShard::TEvTenantShredResponse(schemeShard->ParentDomainId, schemeShard->ShredManager->GetGeneration(), NKikimrScheme::TEvTenantShredResponse::COMPLETED)); const ui64 rootSchemeshard = schemeShard->ParentDomainId.OwnerId; schemeShard->PipeClientCache->Send( @@ -20,147 +20,147 @@ void SendResponseToRootSchemeShard(TSchemeShard* const schemeShard, const TActor } // namespace -TTenantDataErasureManager::TStarter::TStarter(TTenantDataErasureManager* const manager) +TTenantShredManager::TStarter::TStarter(TTenantShredManager* const manager) : Manager(manager) {} -NOperationQueue::EStartStatus TTenantDataErasureManager::TStarter::StartOperation(const TShardIdx& shardIdx) { - return Manager->StartDataErasure(shardIdx); +NOperationQueue::EStartStatus TTenantShredManager::TStarter::StartOperation(const TShardIdx& shardIdx) { + return Manager->StartShred(shardIdx); } -void TTenantDataErasureManager::TStarter::OnTimeout(const TShardIdx& shardIdx) { +void TTenantShredManager::TStarter::OnTimeout(const TShardIdx& shardIdx) { Manager->OnTimeout(shardIdx); } -TTenantDataErasureManager::TTenantDataErasureManager(TSchemeShard* const schemeShard, const NKikimrConfig::TDataErasureConfig& config) - : TDataErasureManager(schemeShard) +TTenantShredManager::TTenantShredManager(TSchemeShard* const schemeShard, const NKikimrConfig::TDataErasureConfig& config) + : TShredManager(schemeShard) , Starter(this) , Queue(new TQueue(ConvertConfig(config), Starter)) { const auto ctx = SchemeShard->ActorContext(); ctx.RegisterWithSameMailbox(Queue); - const auto& tenantDataErasureConfig = config.GetTenantDataErasureConfig(); + const auto& tenantShredConfig = config.GetTenantDataErasureConfig(); LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] Created: Timeout# " << tenantDataErasureConfig.GetTimeoutSeconds() + "[TenantShredManager] Created: Timeout# " << tenantShredConfig.GetTimeoutSeconds() << ", Rate# " << Queue->GetRate() - << ", InflightLimit# " << tenantDataErasureConfig.GetInflightLimit()); + << ", InflightLimit# " << tenantShredConfig.GetInflightLimit()); } -void TTenantDataErasureManager::UpdateConfig(const NKikimrConfig::TDataErasureConfig& config) { - TTenantDataErasureManager::TQueue::TConfig queueConfig = ConvertConfig(config); +void TTenantShredManager::UpdateConfig(const NKikimrConfig::TDataErasureConfig& config) { + TTenantShredManager::TQueue::TConfig queueConfig = ConvertConfig(config); Queue->UpdateConfig(queueConfig); const auto ctx = SchemeShard->ActorContext(); LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] Config updated: Timeout# " << queueConfig.Timeout + "[TenantShredManager] Config updated: Timeout# " << queueConfig.Timeout << ", Rate# " << Queue->GetRate() << ", InflightLimit# " << queueConfig.InflightLimit); } -void TTenantDataErasureManager::Start() { - TDataErasureManager::Start(); +void TTenantShredManager::Start() { + TShredManager::Start(); const auto ctx = SchemeShard->ActorContext(); LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] Start: Status# " << static_cast<ui32>(Status)); + "[TenantShredManager] Start: Status# " << static_cast<ui32>(Status)); Queue->Start(); - if (Status == EDataErasureStatus::COMPLETED) { + if (Status == EShredStatus::COMPLETED) { SendResponseToRootSchemeShard(); - } else if (Status == EDataErasureStatus::IN_PROGRESS) { + } else if (Status == EShredStatus::IN_PROGRESS) { ClearOperationQueue(); Continue(); } } -void TTenantDataErasureManager::Stop() { - TDataErasureManager::Stop(); +void TTenantShredManager::Stop() { + TShredManager::Stop(); const auto ctx = SchemeShard->ActorContext(); LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] Stop"); + "[TenantShredManager] Stop"); Queue->Stop(); } -void TTenantDataErasureManager::ClearOperationQueue() { +void TTenantShredManager::ClearOperationQueue() { const auto ctx = SchemeShard->ActorContext(); LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] Clear operation queue and active pipes"); + "[TenantShredManager] Clear operation queue and active pipes"); Queue->Clear(); ActivePipes.clear(); } -void TTenantDataErasureManager::WakeupToRunDataErasure(TEvSchemeShard::TEvWakeupToRunDataErasure::TPtr& ev, const NActors::TActorContext& ctx) { +void TTenantShredManager::WakeupToRunShred(TEvSchemeShard::TEvWakeupToRunShred::TPtr& ev, const NActors::TActorContext& ctx) { Y_UNUSED(ev); - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [WakeupToRunDataErasure] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantShredManager] [WakeupToRunShred] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); } -void TTenantDataErasureManager::ClearWaitingDataErasureRequests(NIceDb::TNiceDb& db) { +void TTenantShredManager::ClearWaitingShredRequests(NIceDb::TNiceDb& db) { const auto ctx = SchemeShard->ActorContext(); LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] Clear WaitingDataErasureShards: Size# " << WaitingDataErasureShards.size()); + "[TenantShredManager] Clear WaitingShredShards: Size# " << WaitingShredShards.size()); - for (const auto& [shardIdx, status] : WaitingDataErasureShards) { - db.Table<Schema::WaitingDataErasureShards>().Key(shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Delete(); + for (const auto& [shardIdx, status] : WaitingShredShards) { + db.Table<Schema::WaitingShredShards>().Key(shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Delete(); } - ClearWaitingDataErasureRequests(); + ClearWaitingShredRequests(); } -void TTenantDataErasureManager::ClearWaitingDataErasureRequests() { - WaitingDataErasureShards.clear(); +void TTenantShredManager::ClearWaitingShredRequests() { + WaitingShredShards.clear(); } -void TTenantDataErasureManager::Run(NIceDb::TNiceDb& db) { - CounterDataErasureOk = 0; - CounterDataErasureTimeout = 0; - Status = EDataErasureStatus::IN_PROGRESS; +void TTenantShredManager::Run(NIceDb::TNiceDb& db) { + CounterShredOk = 0; + CounterShredTimeout = 0; + Status = EShredStatus::IN_PROGRESS; for (const auto& [shardIdx, shardInfo] : SchemeShard->ShardInfos) { switch (shardInfo.TabletType) { case NKikimr::NSchemeShard::ETabletType::DataShard: case NKikimr::NSchemeShard::ETabletType::PersQueue: { Enqueue(shardIdx); - WaitingDataErasureShards[shardIdx] = EDataErasureStatus::IN_PROGRESS; - db.Table<Schema::WaitingDataErasureShards>().Key(shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Update<Schema::WaitingDataErasureShards::Status>(WaitingDataErasureShards[shardIdx]); + WaitingShredShards[shardIdx] = EShredStatus::IN_PROGRESS; + db.Table<Schema::WaitingShredShards>().Key(shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Update<Schema::WaitingShredShards::Status>(WaitingShredShards[shardIdx]); break; } default: break; } } - if (WaitingDataErasureShards.empty()) { - Status = EDataErasureStatus::COMPLETED; + if (WaitingShredShards.empty()) { + Status = EShredStatus::COMPLETED; } - db.Table<Schema::TenantDataErasureGenerations>().Key(Generation).Update<Schema::TenantDataErasureGenerations::Status>(Status); + db.Table<Schema::TenantShredGenerations>().Key(Generation).Update<Schema::TenantShredGenerations::Status>(Status); const auto ctx = SchemeShard->ActorContext(); LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] Run: Queue.Size# " << Queue->Size() - << ", WaitingDataErasureShards.size# " << WaitingDataErasureShards.size() + "[TenantShredManager] Run: Queue.Size# " << Queue->Size() + << ", WaitingShredShards.size# " << WaitingShredShards.size() << ", Status# " << static_cast<ui32>(Status)); } -void TTenantDataErasureManager::Continue() { - for (const auto& [shardIdx, status] : WaitingDataErasureShards) { - if (status == EDataErasureStatus::IN_PROGRESS) { +void TTenantShredManager::Continue() { + for (const auto& [shardIdx, status] : WaitingShredShards) { + if (status == EShredStatus::IN_PROGRESS) { Enqueue(shardIdx); // forward generation } } const auto ctx = SchemeShard->ActorContext(); LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] Continue: Queue.Size# " << Queue->Size() + "[TenantShredManager] Continue: Queue.Size# " << Queue->Size() << ", Status# " << static_cast<ui32>(Status)); } -NOperationQueue::EStartStatus TTenantDataErasureManager::StartDataErasure(const TShardIdx& shardIdx) { +NOperationQueue::EStartStatus TTenantShredManager::StartShred(const TShardIdx& shardIdx) { UpdateMetrics(); auto ctx = SchemeShard->ActorContext(); auto it = SchemeShard->ShardInfos.find(shardIdx); if (it == SchemeShard->ShardInfos.end()) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [Start] Failed to resolve shard info " - "for data erasure# " << shardIdx + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantShredManager] [Start] Failed to resolve shard info " + "for shred# " << shardIdx << " at schemeshard# " << SchemeShard->TabletID()); return NOperationQueue::EStartStatus::EOperationRemove; @@ -169,7 +169,7 @@ NOperationQueue::EStartStatus TTenantDataErasureManager::StartDataErasure(const const auto& tabletId = it->second.TabletID; const auto& pathId = it->second.PathId; - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [Start] Data erasure " + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantShredManager] [Start] Shred " "for pathId# " << pathId << ", tabletId# " << tabletId << ", next wakeup# " << Queue->GetWakeupDelta() << ", rate# " << Queue->GetRate() @@ -201,8 +201,8 @@ NOperationQueue::EStartStatus TTenantDataErasureManager::StartDataErasure(const return NOperationQueue::EStartStatus::EOperationRunning; } -void TTenantDataErasureManager::OnTimeout(const TShardIdx& shardIdx) { - CounterDataErasureTimeout++; +void TTenantShredManager::OnTimeout(const TShardIdx& shardIdx) { + CounterShredTimeout++; UpdateMetrics(); ActivePipes.erase(shardIdx); @@ -210,8 +210,8 @@ void TTenantDataErasureManager::OnTimeout(const TShardIdx& shardIdx) { auto ctx = SchemeShard->ActorContext(); auto it = SchemeShard->ShardInfos.find(shardIdx); if (it == SchemeShard->ShardInfos.end()) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [Timeout] Failed to resolve shard info " - "for timeout data erasure# " << shardIdx + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantShredManager] [Timeout] Failed to resolve shard info " + "for timeout shred# " << shardIdx << " at schemeshard# " << SchemeShard->TabletID()); return; } @@ -219,7 +219,7 @@ void TTenantDataErasureManager::OnTimeout(const TShardIdx& shardIdx) { const auto& tabletId = it->second.TabletID; const auto& pathId = it->second.PathId; - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [Timeout] Data erasure timeout " + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantShredManager] [Timeout] Shred timeout " "for pathId# " << pathId << ", tabletId# " << tabletId << ", next wakeup# " << Queue->GetWakeupDelta() << ", in queue# " << Queue->Size() << " shards" @@ -230,24 +230,24 @@ void TTenantDataErasureManager::OnTimeout(const TShardIdx& shardIdx) { Enqueue(shardIdx); } -void TTenantDataErasureManager::Enqueue(const TShardIdx& shardIdx) { +void TTenantShredManager::Enqueue(const TShardIdx& shardIdx) { auto ctx = SchemeShard->ActorContext(); if (Queue->Enqueue(shardIdx)) { LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] [Enqueue] Enqueued shard# " << shardIdx << " at schemeshard " << SchemeShard->TabletID()); - WaitingDataErasureShards[shardIdx] = EDataErasureStatus::IN_PROGRESS; + "[TenantShredManager] [Enqueue] Enqueued shard# " << shardIdx << " at schemeshard " << SchemeShard->TabletID()); + WaitingShredShards[shardIdx] = EShredStatus::IN_PROGRESS; UpdateMetrics(); } else { LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] [Enqueue] Skipped or already exists shard# " << shardIdx << " at schemeshard " << SchemeShard->TabletID()); + "[TenantShredManager] [Enqueue] Skipped or already exists shard# " << shardIdx << " at schemeshard " << SchemeShard->TabletID()); } } -void TTenantDataErasureManager::HandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) { +void TTenantShredManager::HandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) { if (tabletId == TTabletId(SchemeShard->ParentDomainId.OwnerId)) { LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] HandleDisconnect resend response to root schemeshard at schemeshard " << SchemeShard->TabletID()); + "[TenantShredManager] HandleDisconnect resend response to root schemeshard at schemeshard " << SchemeShard->TabletID()); SendResponseToRootSchemeShard(); return; @@ -267,21 +267,21 @@ void TTenantDataErasureManager::HandleDisconnect(TTabletId tabletId, const TActo return; } - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [Disconnect] Data erasure disconnect " + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantShredManager] [Disconnect] Shred disconnect " "to tablet: " << tabletId << ", at schemeshard: " << SchemeShard->TabletID()); ActivePipes.erase(it); - StartDataErasure(shardIdx); + StartShred(shardIdx); } -void TTenantDataErasureManager::OnDone(const TPathId&, NIceDb::TNiceDb&) { +void TTenantShredManager::OnDone(const TPathId&, NIceDb::TNiceDb&) { auto ctx = SchemeShard->ActorContext(); LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] [OnDone] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); + "[TenantShredManager] [OnDone] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); } -void TTenantDataErasureManager::OnDone(const TTabletId& tabletId, NIceDb::TNiceDb& db) { +void TTenantShredManager::OnDone(const TTabletId& tabletId, NIceDb::TNiceDb& db) { const TShardIdx shardIdx = SchemeShard->GetShardIdx(tabletId); const auto it = SchemeShard->ShardInfos.find(shardIdx); @@ -289,7 +289,7 @@ void TTenantDataErasureManager::OnDone(const TTabletId& tabletId, NIceDb::TNiceD auto ctx = SchemeShard->ActorContext(); if (shardIdx == InvalidShardIdx) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [Finished] Failed to resolve shard info " + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantShredManager] [Finished] Failed to resolve shard info " "for pathId# " << (it != SchemeShard->ShardInfos.end() ? it->second.PathId.ToString() : "") << ", tabletId# " << tabletId << " in# " << duration.MilliSeconds() << " ms" << ", next wakeup in# " << Queue->GetWakeupDelta() @@ -298,7 +298,7 @@ void TTenantDataErasureManager::OnDone(const TTabletId& tabletId, NIceDb::TNiceD << ", running# " << Queue->RunningSize() << " shards" << " at schemeshard " << SchemeShard->TabletID()); } else { - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [Finished] Data erasure is completed " + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantShredManager] [Finished] Shred is completed " "for pathId# " << (it != SchemeShard->ShardInfos.end() ? it->second.PathId.ToString() : "") << ", tabletId# " << tabletId << ", shardIdx# " << shardIdx << " in# " << duration.MilliSeconds() << " ms" @@ -311,55 +311,55 @@ void TTenantDataErasureManager::OnDone(const TTabletId& tabletId, NIceDb::TNiceD ActivePipes.erase(shardIdx); { - auto it = WaitingDataErasureShards.find(shardIdx); - if (it != WaitingDataErasureShards.end()) { - db.Table<Schema::WaitingDataErasureShards>().Key(shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Delete(); - WaitingDataErasureShards.erase(it); + auto it = WaitingShredShards.find(shardIdx); + if (it != WaitingShredShards.end()) { + db.Table<Schema::WaitingShredShards>().Key(shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Delete(); + WaitingShredShards.erase(it); } } - CounterDataErasureOk++; + CounterShredOk++; UpdateMetrics(); - if (WaitingDataErasureShards.empty()) { + if (WaitingShredShards.empty()) { LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] Data erasure in shards is completed. Send response to root schemeshard"); + "[TenantShredManager] Shred in shards is completed. Send response to root schemeshard"); Complete(); - db.Table<Schema::TenantDataErasureGenerations>().Key(Generation).Update<Schema::TenantDataErasureGenerations::Status>(Status); + db.Table<Schema::TenantShredGenerations>().Key(Generation).Update<Schema::TenantShredGenerations::Status>(Status); } } -void TTenantDataErasureManager::ScheduleRequestToBSC() { +void TTenantShredManager::ScheduleRequestToBSC() { auto ctx = SchemeShard->ActorContext(); LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] [ScheduleRequestToBSC] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); + "[TenantShredManager] [ScheduleRequestToBSC] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); } -void TTenantDataErasureManager::SendRequestToBSC() { +void TTenantShredManager::SendRequestToBSC() { auto ctx = SchemeShard->ActorContext(); LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] [SendRequestToBSC] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); + "[TenantShredManager] [SendRequestToBSC] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); } -void TTenantDataErasureManager::Complete() { - Status = EDataErasureStatus::COMPLETED; +void TTenantShredManager::Complete() { + Status = EShredStatus::COMPLETED; auto ctx = SchemeShard->ActorContext(); LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] Complete: Generation# " << Generation); + "[TenantShredManager] Complete: Generation# " << Generation); } -bool TTenantDataErasureManager::Restore(NIceDb::TNiceDb& db) { +bool TTenantShredManager::Restore(NIceDb::TNiceDb& db) { { - auto rowset = db.Table<Schema::TenantDataErasureGenerations>().Range().Select(); + auto rowset = db.Table<Schema::TenantShredGenerations>().Range().Select(); if (!rowset.IsReady()) { return false; } while (!rowset.EndOfSet()) { - ui64 generation = rowset.GetValue<Schema::TenantDataErasureGenerations::Generation>(); + ui64 generation = rowset.GetValue<Schema::TenantShredGenerations::Generation>(); if (generation >= Generation) { Generation = generation; - Status = rowset.GetValue<Schema::TenantDataErasureGenerations::Status>(); + Status = rowset.GetValue<Schema::TenantShredGenerations::Status>(); } if (!rowset.Next()) { @@ -368,58 +368,58 @@ bool TTenantDataErasureManager::Restore(NIceDb::TNiceDb& db) { } } - ui64 numberDataErasureShardsInRunning = 0; + ui64 numberShredShardsInRunning = 0; { - auto rowset = db.Table<Schema::WaitingDataErasureShards>().Range().Select(); + auto rowset = db.Table<Schema::WaitingShredShards>().Range().Select(); if (!rowset.IsReady()) { return false; } while (!rowset.EndOfSet()) { - TOwnerId ownerId = rowset.GetValue<Schema::WaitingDataErasureShards::OwnerShardIdx>(); - TLocalShardIdx localShardId = rowset.GetValue<Schema::WaitingDataErasureShards::LocalShardIdx>(); + TOwnerId ownerId = rowset.GetValue<Schema::WaitingShredShards::OwnerShardIdx>(); + TLocalShardIdx localShardId = rowset.GetValue<Schema::WaitingShredShards::LocalShardIdx>(); TShardIdx shardId(ownerId, localShardId); - EDataErasureStatus status = rowset.GetValue<Schema::WaitingDataErasureShards::Status>(); - WaitingDataErasureShards[shardId] = status; - if (status == EDataErasureStatus::IN_PROGRESS) { - numberDataErasureShardsInRunning++; + EShredStatus status = rowset.GetValue<Schema::WaitingShredShards::Status>(); + WaitingShredShards[shardId] = status; + if (status == EShredStatus::IN_PROGRESS) { + numberShredShardsInRunning++; } if (!rowset.Next()) { return false; } } - if (Status == EDataErasureStatus::IN_PROGRESS && (WaitingDataErasureShards.empty() || numberDataErasureShardsInRunning == 0)) { - Status = EDataErasureStatus::COMPLETED; + if (Status == EShredStatus::IN_PROGRESS && (WaitingShredShards.empty() || numberShredShardsInRunning == 0)) { + Status = EShredStatus::COMPLETED; } } auto ctx = SchemeShard->ActorContext(); LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] Restore: Generation# " << Generation + "[TenantShredManager] Restore: Generation# " << Generation << ", Status# " << static_cast<ui32>(Status) - << ", NumberDataErasureShardsInRunning# " << numberDataErasureShardsInRunning); + << ", NumberShredShardsInRunning# " << numberShredShardsInRunning); return true; } -bool TTenantDataErasureManager::Remove(const TPathId&) { +bool TTenantShredManager::Remove(const TPathId&) { auto ctx = SchemeShard->ActorContext(); LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] [Remove] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); + "[TenantShredManager] [Remove] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); return false; } -bool TTenantDataErasureManager::Remove(const TShardIdx& shardIdx) { - auto it = WaitingDataErasureShards.find(shardIdx); - if (it != WaitingDataErasureShards.end()) { +bool TTenantShredManager::Remove(const TShardIdx& shardIdx) { + auto it = WaitingShredShards.find(shardIdx); + if (it != WaitingShredShards.end()) { Queue->Remove(shardIdx); ActivePipes.erase(shardIdx); - WaitingDataErasureShards.erase(it); - if (WaitingDataErasureShards.empty()) { + WaitingShredShards.erase(it); + if (WaitingShredShards.empty()) { auto ctx = SchemeShard->ActorContext(); LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] [Remove] Data erasure in shards is completed. Send response to root schemeshard"); + "[TenantShredManager] [Remove] Shred in shards is completed. Send response to root schemeshard"); Complete(); } return true; @@ -427,41 +427,41 @@ bool TTenantDataErasureManager::Remove(const TShardIdx& shardIdx) { return false; } -void TTenantDataErasureManager::HandleNewPartitioning(const std::vector<TShardIdx>& dataErasureShards, NIceDb::TNiceDb& db) { - Status = EDataErasureStatus::IN_PROGRESS; - for (const auto& shardIdx : dataErasureShards) { +void TTenantShredManager::HandleNewPartitioning(const std::vector<TShardIdx>& shredShards, NIceDb::TNiceDb& db) { + Status = EShredStatus::IN_PROGRESS; + for (const auto& shardIdx : shredShards) { Enqueue(shardIdx); // forward generation - db.Table<Schema::WaitingDataErasureShards>().Key(shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Update<Schema::WaitingDataErasureShards::Status>(WaitingDataErasureShards[shardIdx]); + db.Table<Schema::WaitingShredShards>().Key(shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Update<Schema::WaitingShredShards::Status>(WaitingShredShards[shardIdx]); } - if (WaitingDataErasureShards.empty()) { - Status = EDataErasureStatus::COMPLETED; + if (WaitingShredShards.empty()) { + Status = EShredStatus::COMPLETED; } - db.Table<Schema::TenantDataErasureGenerations>().Key(Generation).Update<Schema::TenantDataErasureGenerations::Status>(Status); + db.Table<Schema::TenantShredGenerations>().Key(Generation).Update<Schema::TenantShredGenerations::Status>(Status); const auto ctx = SchemeShard->ActorContext(); LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] HandleNewPartitioning: Queue.Size# " << Queue->Size() - << ", WaitingDataErasureShards.size# " << WaitingDataErasureShards.size() + "[TenantShredManager] HandleNewPartitioning: Queue.Size# " << Queue->Size() + << ", WaitingShredShards.size# " << WaitingShredShards.size() << ", Status# " << static_cast<ui32>(Status)); } -void TTenantDataErasureManager::SyncBscGeneration(NIceDb::TNiceDb&, ui64) { +void TTenantShredManager::SyncBscGeneration(NIceDb::TNiceDb&, ui64) { auto ctx = SchemeShard->ActorContext(); LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] [SyncBscGeneration] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); + "[TenantShredManager] [SyncBscGeneration] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); } -void TTenantDataErasureManager::UpdateMetrics() { - SchemeShard->TabletCounters->Simple()[COUNTER_TENANT_DATA_ERASURE_QUEUE_SIZE].Set(Queue->Size()); - SchemeShard->TabletCounters->Simple()[COUNTER_TENANT_DATA_ERASURE_QUEUE_RUNNING].Set(Queue->RunningSize()); - SchemeShard->TabletCounters->Simple()[COUNTER_TENANT_DATA_ERASURE_OK].Set(CounterDataErasureOk); - SchemeShard->TabletCounters->Simple()[COUNTER_TENANT_DATA_ERASURE_TIMEOUT].Set(CounterDataErasureTimeout); +void TTenantShredManager::UpdateMetrics() { + SchemeShard->TabletCounters->Simple()[COUNTER_TENANT_SHRED_QUEUE_SIZE].Set(Queue->Size()); + SchemeShard->TabletCounters->Simple()[COUNTER_TENANT_SHRED_QUEUE_RUNNING].Set(Queue->RunningSize()); + SchemeShard->TabletCounters->Simple()[COUNTER_TENANT_SHRED_OK].Set(CounterShredOk); + SchemeShard->TabletCounters->Simple()[COUNTER_TENANT_SHRED_TIMEOUT].Set(CounterShredTimeout); } -void TTenantDataErasureManager::SendResponseToRootSchemeShard() { +void TTenantShredManager::SendResponseToRootSchemeShard() { auto ctx = SchemeShard->ActorContext(); LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasureManager] SendResponseToRootSchemeShard: Generation# " << Generation + "[TenantShredManager] SendResponseToRootSchemeShard: Generation# " << Generation << ", Status# " << static_cast<ui32>(Status) << ", RootSchemeshard# " << SchemeShard->ParentDomainId.OwnerId); @@ -469,54 +469,54 @@ void TTenantDataErasureManager::SendResponseToRootSchemeShard() { } -TTenantDataErasureManager::TQueue::TConfig TTenantDataErasureManager::ConvertConfig(const NKikimrConfig::TDataErasureConfig& config) { +TTenantShredManager::TQueue::TConfig TTenantShredManager::ConvertConfig(const NKikimrConfig::TDataErasureConfig& config) { TQueue::TConfig queueConfig; - const auto& tenantDataErasureConfig = config.GetTenantDataErasureConfig(); + const auto& tenantShredConfig = config.GetTenantDataErasureConfig(); queueConfig.IsCircular = false; - queueConfig.MaxRate = tenantDataErasureConfig.GetMaxRate(); - queueConfig.InflightLimit = tenantDataErasureConfig.GetInflightLimit(); - queueConfig.Timeout = TDuration::Seconds(tenantDataErasureConfig.GetTimeoutSeconds()); + queueConfig.MaxRate = tenantShredConfig.GetMaxRate(); + queueConfig.InflightLimit = tenantShredConfig.GetInflightLimit(); + queueConfig.Timeout = TDuration::Seconds(tenantShredConfig.GetTimeoutSeconds()); return queueConfig; } -struct TSchemeShard::TTxRunTenantDataErasure : public TSchemeShard::TRwTxBase { - TEvSchemeShard::TEvTenantDataErasureRequest::TPtr Ev; +struct TSchemeShard::TTxRunTenantShred : public TSchemeShard::TRwTxBase { + TEvSchemeShard::TEvTenantShredRequest::TPtr Ev; bool NeedResponseComplete = false; - TTxRunTenantDataErasure(TSelf *self, TEvSchemeShard::TEvTenantDataErasureRequest::TPtr& ev) + TTxRunTenantShred(TSelf *self, TEvSchemeShard::TEvTenantShredRequest::TPtr& ev) : TRwTxBase(self) , Ev(std::move(ev)) {} - TTxType GetTxType() const override { return TXTYPE_RUN_DATA_ERASURE_TENANT; } + TTxType GetTxType() const override { return TXTYPE_RUN_SHRED_TENANT; } void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxRunTenantDataErasure Execute at schemestard: " << Self->TabletID()); + "TTxRunTenantShred Execute at schemestard: " << Self->TabletID()); if (Self->IsDomainSchemeShard) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasure] [Request] Cannot run data erasure on root schemeshard"); + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantShred] [Request] Cannot run shred on root schemeshard"); return; } NIceDb::TNiceDb db(txc.DB); const auto& record = Ev->Get()->Record; - auto& dataErasureManager = Self->DataErasureManager; - if (dataErasureManager->GetGeneration() < record.GetGeneration()) { - dataErasureManager->SetGeneration(record.GetGeneration()); - dataErasureManager->ClearOperationQueue(); - dataErasureManager->ClearWaitingDataErasureRequests(db); - dataErasureManager->Run(db); + auto& shredManager = Self->ShredManager; + if (shredManager->GetGeneration() < record.GetGeneration()) { + shredManager->SetGeneration(record.GetGeneration()); + shredManager->ClearOperationQueue(); + shredManager->ClearWaitingShredRequests(db); + shredManager->Run(db); } - if (Self->DataErasureManager->GetGeneration() == record.GetGeneration() && Self->DataErasureManager->GetStatus() == EDataErasureStatus::COMPLETED) { + if (Self->ShredManager->GetGeneration() == record.GetGeneration() && Self->ShredManager->GetStatus() == EShredStatus::COMPLETED) { NeedResponseComplete = true; } } void DoComplete(const TActorContext& ctx) override { LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxRunTenantDataErasure Complete at schemestard: " << Self->TabletID() + "TTxRunTenantShred Complete at schemestard: " << Self->TabletID() << ", NeedResponseComplete# " << (NeedResponseComplete ? "true" : "false")); if (NeedResponseComplete) { @@ -525,49 +525,49 @@ struct TSchemeShard::TTxRunTenantDataErasure : public TSchemeShard::TRwTxBase { } }; -NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxRunTenantDataErasure(TEvSchemeShard::TEvTenantDataErasureRequest::TPtr& ev) { - return new TTxRunTenantDataErasure(this, ev); +NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxRunTenantShred(TEvSchemeShard::TEvTenantShredRequest::TPtr& ev) { + return new TTxRunTenantShred(this, ev); } template <typename TEvType> -struct TSchemeShard::TTxCompleteDataErasureShard : public TSchemeShard::TRwTxBase { +struct TSchemeShard::TTxCompleteShredShard : public TSchemeShard::TRwTxBase { TEvType Ev; bool NeedResponseComplete = false; - TTxCompleteDataErasureShard(TSelf *self, TEvType& ev) + TTxCompleteShredShard(TSelf *self, TEvType& ev) : TRwTxBase(self) , Ev(std::move(ev)) {} - TTxType GetTxType() const override { return TXTYPE_COMPLETE_DATA_ERASURE_SHARD; } + TTxType GetTxType() const override { return TXTYPE_COMPLETE_SHRED_SHARD; } void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCompleteDataErasureShard Execute at schemestard: " << Self->TabletID()); + "TTxCompleteShredShard Execute at schemestard: " << Self->TabletID()); if (!IsSuccess(Ev)) { HandleBadStatus(Ev, ctx); - return; // will be retried after timeout in the queue in TTenantDataErasureManager::OnTimeout() + return; // will be retried after timeout in the queue in TTenantShredManager::OnTimeout() } const ui64 cleanupGeneration = GetCleanupGeneration(Ev); - auto& manager = Self->DataErasureManager; + auto& manager = Self->ShredManager; if (cleanupGeneration != manager->GetGeneration()) { LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCompleteDataErasureShard: Unknown generation#" << cleanupGeneration + "TTxCompleteShredShard: Unknown generation#" << cleanupGeneration << ", Expected gen# " << manager->GetGeneration() << " at schemestard: " << Self->TabletID()); return; } NIceDb::TNiceDb db(txc.DB); manager->OnDone(TTabletId(GetTabletId(Ev)), db); - if (Self->DataErasureManager->GetStatus() == EDataErasureStatus::COMPLETED) { + if (Self->ShredManager->GetStatus() == EShredStatus::COMPLETED) { NeedResponseComplete = true; } } void DoComplete(const TActorContext& ctx) override { LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCompleteDataErasureShard Complete at schemestard: " << Self->TabletID() + "TTxCompleteShredShard Complete at schemestard: " << Self->TabletID() << ", NeedResponseComplete# " << (NeedResponseComplete ? "true" : "false")); if (NeedResponseComplete) { @@ -584,7 +584,7 @@ private: void HandleBadStatus(TEvDataShard::TEvVacuumResult::TPtr& ev, const TActorContext& ctx) const { const auto& record = ev->Get()->Record; LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCompleteDataErasureShard: data erasure failed at DataShard#" << record.GetTabletId() + "TTxCompleteShredShard: shred failed at DataShard#" << record.GetTabletId() << " with status: " << NKikimrTxDataShard::TEvVacuumResult::EStatus_Name(record.GetStatus()) << ", schemestard: " << Self->TabletID()); } @@ -607,7 +607,7 @@ private: void HandleBadStatus(TEvKeyValue::TEvVacuumResponse::TPtr& ev, const TActorContext& ctx) const { const auto& record = ev->Get()->Record; LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCompleteDataErasureShard: data erasure failed at KeyValue#" << record.tablet_id() + "TTxCompleteShredShard: shred failed at KeyValue#" << record.tablet_id() << " with status: " << NKikimrKeyValue::VacuumResponse::Status_Name(record.status()) << ", schemestard: " << Self->TabletID()); } @@ -624,32 +624,32 @@ private: }; template <typename TEvType> -NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasureShard(TEvType& ev) { - return new TTxCompleteDataErasureShard(this, ev); +NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteShredShard(TEvType& ev) { + return new TTxCompleteShredShard(this, ev); } -template NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasureShard<TEvDataShard::TEvVacuumResult::TPtr>(TEvDataShard::TEvVacuumResult::TPtr& ev); -template NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasureShard<TEvKeyValue::TEvVacuumResponse::TPtr>(TEvKeyValue::TEvVacuumResponse::TPtr& ev); +template NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteShredShard<TEvDataShard::TEvVacuumResult::TPtr>(TEvDataShard::TEvVacuumResult::TPtr& ev); +template NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteShredShard<TEvKeyValue::TEvVacuumResponse::TPtr>(TEvKeyValue::TEvVacuumResponse::TPtr& ev); -struct TSchemeShard::TTxAddNewShardToDataErasure : public TSchemeShard::TRwTxBase { - TEvPrivate::TEvAddNewShardToDataErasure::TPtr Ev; +struct TSchemeShard::TTxAddNewShardToShred : public TSchemeShard::TRwTxBase { + TEvPrivate::TEvAddNewShardToShred::TPtr Ev; bool NeedResponseComplete = false; - TTxAddNewShardToDataErasure(TSelf *self, TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev) + TTxAddNewShardToShred(TSelf *self, TEvPrivate::TEvAddNewShardToShred::TPtr& ev) : TRwTxBase(self) , Ev(std::move(ev)) {} - TTxType GetTxType() const override { return TXTYPE_ADD_SHARDS_DATA_ERASURE; } + TTxType GetTxType() const override { return TXTYPE_ADD_SHARDS_SHRED; } void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxAddEntryToDataErasure Execute at schemestard: " << Self->TabletID()); + "TTxAddEntryToShred Execute at schemestard: " << Self->TabletID()); NIceDb::TNiceDb db(txc.DB); - Self->DataErasureManager->HandleNewPartitioning(Ev->Get()->Shards, db); - if (Self->DataErasureManager->GetStatus() == EDataErasureStatus::COMPLETED) { - if (Self->DataErasureManager->IsRunning()) { + Self->ShredManager->HandleNewPartitioning(Ev->Get()->Shards, db); + if (Self->ShredManager->GetStatus() == EShredStatus::COMPLETED) { + if (Self->ShredManager->IsRunning()) { NeedResponseComplete = true; } } @@ -657,41 +657,41 @@ struct TSchemeShard::TTxAddNewShardToDataErasure : public TSchemeShard::TRwTxBas void DoComplete(const TActorContext& ctx) override { LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxAddEntryToDataErasure Complete at schemestard: " << Self->TabletID()); + "TTxAddEntryToShred Complete at schemestard: " << Self->TabletID()); if (NeedResponseComplete) { NKikimr::NSchemeShard::SendResponseToRootSchemeShard(Self, ctx); } } }; -NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxAddNewShardToDataErasure(TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev) { - return new TTxAddNewShardToDataErasure(this, ev); +NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxAddNewShardToShred(TEvPrivate::TEvAddNewShardToShred::TPtr& ev) { + return new TTxAddNewShardToShred(this, ev); } -struct TSchemeShard::TTxCancelDataErasureShards : public TSchemeShard::TRwTxBase { - const std::vector<TShardIdx> DataErasureShards; +struct TSchemeShard::TTxCancelShredShards : public TSchemeShard::TRwTxBase { + const std::vector<TShardIdx> ShredShards; bool NeedResponseComplete = false; - TTxCancelDataErasureShards(TSelf *self, const std::vector<TShardIdx>& dataErasureShards) + TTxCancelShredShards(TSelf *self, const std::vector<TShardIdx>& shredShards) : TRwTxBase(self) - , DataErasureShards(std::move(dataErasureShards)) + , ShredShards(std::move(shredShards)) {} - TTxType GetTxType() const override { return TXTYPE_CANCEL_SHARDS_DATA_ERASURE; } + TTxType GetTxType() const override { return TXTYPE_CANCEL_SHARDS_SHRED; } void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCancelDataErasureShards Execute at schemestard: " << Self->TabletID()); + "TTxCancelShredShards Execute at schemestard: " << Self->TabletID()); NIceDb::TNiceDb db(txc.DB); - for (const auto& shard : DataErasureShards) { - if (Self->DataErasureManager->Remove(shard)) { - db.Table<Schema::WaitingDataErasureShards>().Key(shard.GetOwnerId(), shard.GetLocalId()).Delete(); + for (const auto& shard : ShredShards) { + if (Self->ShredManager->Remove(shard)) { + db.Table<Schema::WaitingShredShards>().Key(shard.GetOwnerId(), shard.GetLocalId()).Delete(); } } - if (Self->DataErasureManager->GetStatus() == EDataErasureStatus::COMPLETED) { - db.Table<Schema::TenantDataErasureGenerations>().Key(Self->DataErasureManager->GetGeneration()).Update<Schema::TenantDataErasureGenerations::Status>(Self->DataErasureManager->GetStatus()); - if (Self->DataErasureManager->IsRunning()) { + if (Self->ShredManager->GetStatus() == EShredStatus::COMPLETED) { + db.Table<Schema::TenantShredGenerations>().Key(Self->ShredManager->GetGeneration()).Update<Schema::TenantShredGenerations::Status>(Self->ShredManager->GetStatus()); + if (Self->ShredManager->IsRunning()) { NeedResponseComplete = true; } } @@ -699,15 +699,15 @@ struct TSchemeShard::TTxCancelDataErasureShards : public TSchemeShard::TRwTxBase void DoComplete(const TActorContext& ctx) override { LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCancelDataErasureShards Complete at schemestard: " << Self->TabletID()); + "TTxCancelShredShards Complete at schemestard: " << Self->TabletID()); if (NeedResponseComplete) { NKikimr::NSchemeShard::SendResponseToRootSchemeShard(Self, ctx); } } }; -NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCancelDataErasureShards(const std::vector<TShardIdx>& oldShards) { - return new TTxCancelDataErasureShards(this, oldShards); +NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCancelShredShards(const std::vector<TShardIdx>& oldShards) { + return new TTxCancelShredShards(this, oldShards); } } // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 1ba34f4a18a..edfdf9b533e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -5,7 +5,7 @@ #include "olap/bg_tasks/adapter/adapter.h" #include "olap/bg_tasks/events/global.h" #include "schemeshard.h" -#include "schemeshard__data_erasure_manager.h" +#include "schemeshard__shred_manager.h" #include "schemeshard_svp_migration.h" #include <ydb/core/base/appdata.h> @@ -228,7 +228,7 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, TActiva StartStopCompactionQueues(); BackgroundCleaningQueue->Start(); - StartStopDataErasure(); + StartStopShred(); ctx.Send(TxAllocatorClient, MakeHolder<TEvTxAllocatorClient::TEvAllocate>(InitiateCachedTxIdsCount)); @@ -572,8 +572,8 @@ void TSchemeShard::Clear() { UpdateBorrowedCompactionQueueMetrics(); } - if (DataErasureManager) { - DataErasureManager->Clear(); + if (ShredManager) { + ShredManager->Clear(); } ClearTempDirsState(); @@ -2454,8 +2454,8 @@ void TSchemeShard::PersistRemoveSubDomain(NIceDb::TNiceDb& db, const TPathId& pa db.Table<Schema::StoragePools>().Key(pathId.LocalPathId, pool.GetName(), pool.GetKind()).Delete(); } - if (DataErasureManager->Remove(pathId)) { - db.Table<Schema::WaitingDataErasureTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Delete(); + if (ShredManager->Remove(pathId)) { + db.Table<Schema::WaitingShredTenants>().Key(pathId.OwnerId, pathId.LocalPathId).Delete(); } db.Table<Schema::SubDomains>().Key(pathId.LocalPathId).Delete(); @@ -4929,7 +4929,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { EnableVectorIndex = appData->FeatureFlags.GetEnableVectorIndex(); EnableResourcePoolsOnServerless = appData->FeatureFlags.GetEnableResourcePoolsOnServerless(); EnableExternalDataSourcesOnServerless = appData->FeatureFlags.GetEnableExternalDataSourcesOnServerless(); - EnableDataErasure = appData->FeatureFlags.GetEnableDataErasure(); + EnableShred = appData->FeatureFlags.GetEnableDataErasure(); EnableExternalSourceSchemaInference = appData->FeatureFlags.GetEnableExternalSourceSchemaInference(); ConfigureCompactionQueues(appData->CompactionConfig, ctx); @@ -4939,7 +4939,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { MaxRestoreBuildIndexShardsInFlight = appData->SchemeShardConfig.GetMaxRestoreBuildIndexShardsInFlight(); ConfigureBackgroundCleaningQueue(appData->BackgroundCleaningConfig, ctx); - ConfigureDataErasureManager(appData->DataErasureConfig); + ConfigureShredManager(appData->ShredConfig); ConfigureExternalSources(appData->QueryServiceConfig, ctx); if (appData->ChannelProfiles) { @@ -5254,17 +5254,17 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvInterconnect::TEvNodeDisconnected, Handle); HFuncTraced(TEvPrivate::TEvRetryNodeSubscribe, Handle); - //data erasure - HFuncTraced(TEvSchemeShard::TEvWakeupToRunDataErasure, DataErasureManager->WakeupToRunDataErasure); - HFuncTraced(TEvSchemeShard::TEvTenantDataErasureRequest, Handle); + // shred + HFuncTraced(TEvSchemeShard::TEvWakeupToRunShred, ShredManager->WakeupToRunShred); + HFuncTraced(TEvSchemeShard::TEvTenantShredRequest, Handle); HFuncTraced(TEvDataShard::TEvVacuumResult, Handle); HFuncTraced(TEvKeyValue::TEvVacuumResponse, Handle); - HFuncTraced(TEvPrivate::TEvAddNewShardToDataErasure, Handle); - HFuncTraced(TEvSchemeShard::TEvTenantDataErasureResponse, Handle); - HFuncTraced(TEvSchemeShard::TEvDataErasureInfoRequest, Handle); - HFuncTraced(TEvSchemeShard::TEvDataErasureManualStartupRequest, Handle); + HFuncTraced(TEvPrivate::TEvAddNewShardToShred, Handle); + HFuncTraced(TEvSchemeShard::TEvTenantShredResponse, Handle); + HFuncTraced(TEvSchemeShard::TEvShredInfoRequest, Handle); + HFuncTraced(TEvSchemeShard::TEvShredManualStartupRequest, Handle); HFuncTraced(TEvBlobStorage::TEvControllerShredResponse, Handle); - HFuncTraced(TEvSchemeShard::TEvWakeupToRunDataErasureBSC, Handle); + HFuncTraced(TEvSchemeShard::TEvWakeupToRunShredBSC, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { @@ -5935,7 +5935,7 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TAc BorrowedCompactionHandleDisconnect(tabletId, clientId); ConditionalEraseHandleDisconnect(tabletId, clientId, ctx); - DataErasureManager->HandleDisconnect(tabletId, clientId, ctx); + ShredManager->HandleDisconnect(tabletId, clientId, ctx); RestartPipeTx(tabletId, ctx); } @@ -5986,7 +5986,7 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TAc BorrowedCompactionHandleDisconnect(tabletId, clientId); ConditionalEraseHandleDisconnect(tabletId, clientId, ctx); - DataErasureManager->HandleDisconnect(tabletId, clientId, ctx); + ShredManager->HandleDisconnect(tabletId, clientId, ctx); RestartPipeTx(tabletId, ctx); } @@ -7410,7 +7410,6 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T newPartitioningSet.reserve(newPartitioning.size()); const auto& oldPartitioning = tableInfo->GetPartitions(); - std::vector<TShardIdx> newDataErasureShards; for (const auto& p: newPartitioning) { if (!oldPartitioning.empty()) newPartitioningSet.insert(p.ShardIdx); @@ -7420,7 +7419,6 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T if (it != partitionStats.end()) { EnqueueBackgroundCompaction(p.ShardIdx, it->second); UpdateShardMetrics(p.ShardIdx, it->second); - newDataErasureShards.push_back(p.ShardIdx); } } @@ -7535,8 +7533,8 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi } if (appConfig.HasDataErasureConfig()) { - const auto& dataErasureConfig = appConfig.GetDataErasureConfig(); - ConfigureDataErasureManager(dataErasureConfig); + const auto& shredConfig = appConfig.GetDataErasureConfig(); + ConfigureShredManager(shredConfig); } if (appConfig.HasSchemeShardConfig()) { @@ -7569,7 +7567,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi BackgroundCleaningQueue->Start(); } - StartStopDataErasure(); + StartStopShred(); } } @@ -7597,7 +7595,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featu EnableResourcePoolsOnServerless = featureFlags.GetEnableResourcePoolsOnServerless(); EnableVectorIndex = featureFlags.GetEnableVectorIndex(); EnableExternalDataSourcesOnServerless = featureFlags.GetEnableExternalDataSourcesOnServerless(); - EnableDataErasure = featureFlags.GetEnableDataErasure(); + EnableShred = featureFlags.GetEnableDataErasure(); EnableExternalSourceSchemaInference = featureFlags.GetEnableExternalSourceSchemaInference(); } @@ -7961,63 +7959,63 @@ void TSchemeShard::Handle(TEvSchemeShard::TEvListUsers::TPtr &ev, const TActorCo Execute(CreateTxListUsers(ev), ctx); } -void TSchemeShard::Handle(TEvSchemeShard::TEvDataErasureInfoRequest::TPtr& ev, const TActorContext& ctx) { +void TSchemeShard::Handle(TEvSchemeShard::TEvShredInfoRequest::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, - "Handle TEvDataErasureInfoRequest, at schemeshard: " << TabletID()); + "Handle TEvShredInfoRequest, at schemeshard: " << TabletID()); - NKikimrScheme::TEvDataErasureInfoResponse::EStatus status = NKikimrScheme::TEvDataErasureInfoResponse::UNSPECIFIED; + NKikimrScheme::TEvShredInfoResponse::EStatus status = NKikimrScheme::TEvShredInfoResponse::UNSPECIFIED; - switch (DataErasureManager->GetStatus()) { - case EDataErasureStatus::UNSPECIFIED: - status = NKikimrScheme::TEvDataErasureInfoResponse::UNSPECIFIED; + switch (ShredManager->GetStatus()) { + case EShredStatus::UNSPECIFIED: + status = NKikimrScheme::TEvShredInfoResponse::UNSPECIFIED; break; - case EDataErasureStatus::COMPLETED: - status = NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED; + case EShredStatus::COMPLETED: + status = NKikimrScheme::TEvShredInfoResponse::COMPLETED; break; - case EDataErasureStatus::IN_PROGRESS: - status = NKikimrScheme::TEvDataErasureInfoResponse::IN_PROGRESS_TENANT; + case EShredStatus::IN_PROGRESS: + status = NKikimrScheme::TEvShredInfoResponse::IN_PROGRESS_TENANT; break; - case EDataErasureStatus::IN_PROGRESS_BSC: - status = NKikimrScheme::TEvDataErasureInfoResponse::IN_PROGRESS_BSC; + case EShredStatus::IN_PROGRESS_BSC: + status = NKikimrScheme::TEvShredInfoResponse::IN_PROGRESS_BSC; break; } - ctx.Send(ev->Sender, new TEvSchemeShard::TEvDataErasureInfoResponse(DataErasureManager->GetGeneration(), status)); + ctx.Send(ev->Sender, new TEvSchemeShard::TEvShredInfoResponse(ShredManager->GetGeneration(), status)); } -void TSchemeShard::Handle(TEvSchemeShard::TEvDataErasureManualStartupRequest::TPtr&, const TActorContext&) { - RunDataErasure(true); +void TSchemeShard::Handle(TEvSchemeShard::TEvShredManualStartupRequest::TPtr&, const TActorContext&) { + RunShred(true); } -void TSchemeShard::Handle(TEvSchemeShard::TEvTenantDataErasureRequest::TPtr& ev, const TActorContext& ctx) { +void TSchemeShard::Handle(TEvSchemeShard::TEvTenantShredRequest::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, - "Handle TEvTenantDataErasureRequest, at schemeshard: " << TabletID()); - Execute(CreateTxRunTenantDataErasure(ev), ctx); + "Handle TEvTenantShredRequest, at schemeshard: " << TabletID()); + Execute(CreateTxRunTenantShred(ev), ctx); } void TSchemeShard::Handle(TEvDataShard::TEvVacuumResult::TPtr& ev, const TActorContext& ctx) { - Execute(CreateTxCompleteDataErasureShard<TEvDataShard::TEvVacuumResult::TPtr>(ev), ctx); + Execute(CreateTxCompleteShredShard<TEvDataShard::TEvVacuumResult::TPtr>(ev), ctx); } void TSchemeShard::Handle(TEvKeyValue::TEvVacuumResponse::TPtr& ev, const TActorContext& ctx) { - Execute(this->CreateTxCompleteDataErasureShard(ev), ctx); + Execute(this->CreateTxCompleteShredShard(ev), ctx); } -void TSchemeShard::Handle(TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev, const TActorContext& ctx) { - Execute(CreateTxAddNewShardToDataErasure(ev), ctx); +void TSchemeShard::Handle(TEvPrivate::TEvAddNewShardToShred::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxAddNewShardToShred(ev), ctx); } -void TSchemeShard::Handle(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev, const TActorContext& ctx) { - Execute(CreateTxCompleteDataErasureTenant(ev), ctx); +void TSchemeShard::Handle(TEvSchemeShard::TEvTenantShredResponse::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxCompleteShredTenant(ev), ctx); } void TSchemeShard::Handle(TEvBlobStorage::TEvControllerShredResponse::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, "Handle TEvControllerShredResponse, at schemeshard: " << TabletID()); - Execute(CreateTxCompleteDataErasureBSC(ev), ctx); + Execute(CreateTxCompleteShredBSC(ev), ctx); } -void TSchemeShard::Handle(TEvSchemeShard::TEvWakeupToRunDataErasureBSC::TPtr&, const TActorContext&) { - DataErasureManager->SendRequestToBSC(); +void TSchemeShard::Handle(TEvSchemeShard::TEvWakeupToRunShredBSC::TPtr&, const TActorContext&) { + ShredManager->SendRequestToBSC(); } void TSchemeShard::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext&) { @@ -8211,36 +8209,36 @@ TDuration TSchemeShard::SendBaseStatsToSA() { + RandomNumber<ui64>(SendStatsIntervalMaxSeconds - SendStatsIntervalMinSeconds)); } -THolder<TDataErasureManager> TSchemeShard::CreateDataErasureManager(const NKikimrConfig::TDataErasureConfig& config) { +THolder<TShredManager> TSchemeShard::CreateShredManager(const NKikimrConfig::TDataErasureConfig& config) { if (IsDomainSchemeShard) { - return MakeHolder<TRootDataErasureManager>(this, config); + return MakeHolder<TRootShredManager>(this, config); } else { - return MakeHolder<TTenantDataErasureManager>(this, config); + return MakeHolder<TTenantShredManager>(this, config); } } -void TSchemeShard::ConfigureDataErasureManager(const NKikimrConfig::TDataErasureConfig& config) { - if (DataErasureManager) { - DataErasureManager->UpdateConfig(config); +void TSchemeShard::ConfigureShredManager(const NKikimrConfig::TDataErasureConfig& config) { + if (ShredManager) { + ShredManager->UpdateConfig(config); } else { - DataErasureManager = CreateDataErasureManager(config); + ShredManager = CreateShredManager(config); } } -void TSchemeShard::StartStopDataErasure() { - if (EnableDataErasure) { - DataErasureManager->Start(); +void TSchemeShard::StartStopShred() { + if (EnableShred) { + ShredManager->Start(); } else { - DataErasureManager->Stop(); + ShredManager->Stop(); } } -void TSchemeShard::MarkFirstRunRootDataErasureManager() { - Execute(CreateTxDataErasureManagerInit(), this->ActorContext()); +void TSchemeShard::MarkFirstRunRootShredManager() { + Execute(CreateTxShredManagerInit(), this->ActorContext()); } -void TSchemeShard::RunDataErasure(bool isNewDataErasure) { - Execute(CreateTxRunDataErasure(isNewDataErasure), this->ActorContext()); +void TSchemeShard::RunShred(bool isNewShred) { + Execute(CreateTxRunShred(isNewShred), this->ActorContext()); } } // namespace NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index be1ef900ccf..6ded6075ba6 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -84,7 +84,7 @@ namespace NSchemeShard { extern const ui64 NEW_TABLE_ALTER_VERSION; extern ui64 gVectorIndexSeed; // for tests only -class TDataErasureManager; +class TShredManager; // Forward declaration for incremental restore context struct TIncrementalRestoreState; @@ -356,7 +356,7 @@ public: bool EnableResourcePoolsOnServerless = false; bool EnableVectorIndex = false; bool EnableExternalDataSourcesOnServerless = false; - bool EnableDataErasure = false; + bool EnableShred = false; bool EnableExternalSourceSchemaInference = false; bool EnableMoveColumnTable = false; @@ -1079,32 +1079,32 @@ public: template <EventBasePtr TEvPtr> NTabletFlatExecutor::ITransaction* CreateTxOperationReply(TOperationId id, TEvPtr& ev); - struct TTxDataErasureManagerInit; - NTabletFlatExecutor::ITransaction* CreateTxDataErasureManagerInit(); + struct TTxShredManagerInit; + NTabletFlatExecutor::ITransaction* CreateTxShredManagerInit(); - struct TTxRunDataErasure; - NTabletFlatExecutor::ITransaction* CreateTxRunDataErasure(bool isNewDataErasure); + struct TTxRunShred; + NTabletFlatExecutor::ITransaction* CreateTxRunShred(bool isNewShred); - struct TTxAddNewShardToDataErasure; - NTabletFlatExecutor::ITransaction* CreateTxAddNewShardToDataErasure(TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev); + struct TTxAddNewShardToShred; + NTabletFlatExecutor::ITransaction* CreateTxAddNewShardToShred(TEvPrivate::TEvAddNewShardToShred::TPtr& ev); - struct TTxCancelDataErasureShards; - NTabletFlatExecutor::ITransaction* CreateTxCancelDataErasureShards(const std::vector<TShardIdx>& oldShards); + struct TTxCancelShredShards; + NTabletFlatExecutor::ITransaction* CreateTxCancelShredShards(const std::vector<TShardIdx>& oldShards); - struct TTxRunTenantDataErasure; - NTabletFlatExecutor::ITransaction* CreateTxRunTenantDataErasure(TEvSchemeShard::TEvTenantDataErasureRequest::TPtr& ev); + struct TTxRunTenantShred; + NTabletFlatExecutor::ITransaction* CreateTxRunTenantShred(TEvSchemeShard::TEvTenantShredRequest::TPtr& ev); template <typename TEvType> - struct TTxCompleteDataErasureShard; + struct TTxCompleteShredShard; template <typename TEvType> - NTabletFlatExecutor::ITransaction* CreateTxCompleteDataErasureShard(TEvType& ev); + NTabletFlatExecutor::ITransaction* CreateTxCompleteShredShard(TEvType& ev); - struct TTxCompleteDataErasureTenant; - NTabletFlatExecutor::ITransaction* CreateTxCompleteDataErasureTenant(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev); + struct TTxCompleteShredTenant; + NTabletFlatExecutor::ITransaction* CreateTxCompleteShredTenant(TEvSchemeShard::TEvTenantShredResponse::TPtr& ev); - struct TTxCompleteDataErasureBSC; - NTabletFlatExecutor::ITransaction* CreateTxCompleteDataErasureBSC(TEvBlobStorage::TEvControllerShredResponse::TPtr& ev); + struct TTxCompleteShredBSC; + NTabletFlatExecutor::ITransaction* CreateTxCompleteShredBSC(TEvBlobStorage::TEvControllerShredResponse::TPtr& ev); void PublishToSchemeBoard(THashMap<TTxId, TDeque<TPathId>>&& paths, const TActorContext& ctx); void PublishToSchemeBoard(TTxId txId, TDeque<TPathId>&& paths, const TActorContext& ctx); @@ -1217,16 +1217,16 @@ public: void Handle(TEvDataShard::TEvMigrateSchemeShardResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const TActorContext &ctx); void Handle(TEvDataShard::TEvCompactBorrowedResult::TPtr &ev, const TActorContext &ctx); - void Handle(TEvSchemeShard::TEvTenantDataErasureRequest::TPtr& ev, const TActorContext& ctx); + void Handle(TEvSchemeShard::TEvTenantShredRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvVacuumResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvKeyValue::TEvVacuumResponse__HandlePtr& ev, const TActorContext& ctx); - void Handle(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev, const TActorContext& ctx); - void Handle(TEvPrivate::TEvAddNewShardToDataErasure::TPtr& ev, const TActorContext& ctx); + void Handle(TEvSchemeShard::TEvTenantShredResponse::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvAddNewShardToShred::TPtr& ev, const TActorContext& ctx); void Handle(TEvBlobStorage::TEvControllerShredResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvBlobDepot::TEvApplyConfigResult::TPtr& ev, const TActorContext& ctx); - void Handle(TEvSchemeShard::TEvDataErasureInfoRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvSchemeShard::TEvDataErasureManualStartupRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvSchemeShard::TEvWakeupToRunDataErasureBSC::TPtr& ev, const TActorContext& ctx); + void Handle(TEvSchemeShard::TEvShredInfoRequest::TPtr& ev, const TActorContext& ctx); + void Handle(TEvSchemeShard::TEvShredManualStartupRequest::TPtr& ev, const TActorContext& ctx); + void Handle(TEvSchemeShard::TEvWakeupToRunShredBSC::TPtr& ev, const TActorContext& ctx); void Handle(TEvSchemeShard::TEvProcessingRequest::TPtr& ev, const TActorContext& ctx); @@ -1588,11 +1588,11 @@ public: void ConnectToSA(); TDuration SendBaseStatsToSA(); - THolder<TDataErasureManager> CreateDataErasureManager(const NKikimrConfig::TDataErasureConfig& config); - void ConfigureDataErasureManager(const NKikimrConfig::TDataErasureConfig& config); - void StartStopDataErasure(); - void MarkFirstRunRootDataErasureManager(); - void RunDataErasure(bool isNewDataErasure); + THolder<TShredManager> CreateShredManager(const NKikimrConfig::TDataErasureConfig& config); + void ConfigureShredManager(const NKikimrConfig::TDataErasureConfig& config); + void StartStopShred(); + void MarkFirstRunRootShredManager(); + void RunShred(bool isNewShred); public: void ChangeStreamShardsCount(i64 delta) override; @@ -1616,7 +1616,7 @@ public: NLogin::TLoginProvider LoginProvider; TActorId LoginHelper; - THolder<TDataErasureManager> DataErasureManager = nullptr; + THolder<TShredManager> ShredManager = nullptr; private: void OnDetach(const TActorContext &ctx) override; diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h index 84da21ab6b7..26743136f50 100644 --- a/ydb/core/tx/schemeshard/schemeshard_private.h +++ b/ydb/core/tx/schemeshard/schemeshard_private.h @@ -45,9 +45,9 @@ namespace TEvPrivate { EvSendBaseStatsToSA, EvRunBackgroundCleaning, EvRetryNodeSubscribe, - EvRunDataErasure, - EvRunTenantDataErasure, - EvAddNewShardToDataErasure, + EvRunShred, + EvRunTenantShred, + EvAddNewShardToShred, EvVerifyPassword, EvLoginFinalize, EvEnd @@ -300,10 +300,10 @@ namespace TEvPrivate { { } }; - struct TEvAddNewShardToDataErasure : public TEventLocal<TEvAddNewShardToDataErasure, EvAddNewShardToDataErasure> { + struct TEvAddNewShardToShred : public TEventLocal<TEvAddNewShardToShred, EvAddNewShardToShred> { const std::vector<TShardIdx> Shards; - TEvAddNewShardToDataErasure(std::vector<TShardIdx>&& shards) + TEvAddNewShardToShred(std::vector<TShardIdx>&& shards) : Shards(std::move(shards)) {} }; diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index b8042fcd9ec..73683d51cac 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -2044,7 +2044,7 @@ struct Schema : NIceDb::Schema { struct DataErasureGenerations : Table<115> { struct Generation : Column<1, NScheme::NTypeIds::Uint64> {}; - struct Status : Column<2, NScheme::NTypeIds::Uint32> { using Type = EDataErasureStatus; }; + struct Status : Column<2, NScheme::NTypeIds::Uint32> { using Type = EShredStatus; }; struct StartTime : Column<3, NScheme::NTypeIds::Timestamp> {}; using TKey = TableKey<Generation>; @@ -2054,11 +2054,12 @@ struct Schema : NIceDb::Schema { StartTime >; }; + using ShredGenerations = DataErasureGenerations; struct WaitingDataErasureTenants : Table<116> { struct OwnerPathId : Column<1, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; struct LocalPathId : Column<2, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; }; - struct Status : Column<3, NScheme::NTypeIds::Uint32> { using Type = EDataErasureStatus; }; + struct Status : Column<3, NScheme::NTypeIds::Uint32> { using Type = EShredStatus; }; using TKey = TableKey<OwnerPathId, LocalPathId>; using TColumns = TableColumns< @@ -2067,10 +2068,11 @@ struct Schema : NIceDb::Schema { Status >; }; + using WaitingShredTenants = WaitingDataErasureTenants; struct TenantDataErasureGenerations : Table<117> { struct Generation : Column<1, NScheme::NTypeIds::Uint64> {}; - struct Status : Column<2, NScheme::NTypeIds::Uint32> { using Type = EDataErasureStatus; }; + struct Status : Column<2, NScheme::NTypeIds::Uint32> { using Type = EShredStatus; }; using TKey = TableKey<Generation>; using TColumns = TableColumns< @@ -2078,11 +2080,12 @@ struct Schema : NIceDb::Schema { Status >; }; + using TenantShredGenerations = TenantDataErasureGenerations; struct WaitingDataErasureShards : Table<118> { struct OwnerShardIdx : Column<1, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; struct LocalShardIdx : Column<2, NScheme::NTypeIds::Uint64> { using Type = TLocalShardIdx; }; - struct Status : Column<3, NScheme::NTypeIds::Uint32> { using Type = EDataErasureStatus; }; + struct Status : Column<3, NScheme::NTypeIds::Uint32> { using Type = EShredStatus; }; using TKey = TableKey<OwnerShardIdx, LocalShardIdx>; using TColumns = TableColumns< @@ -2091,6 +2094,7 @@ struct Schema : NIceDb::Schema { Status >; }; + using WaitingShredShards = WaitingDataErasureShards; struct SysView : Table<119> { struct PathId : Column<1, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; }; diff --git a/ydb/core/tx/schemeshard/schemeshard_types.h b/ydb/core/tx/schemeshard/schemeshard_types.h index 7ff4a938f0f..5e7d9edc5f4 100644 --- a/ydb/core/tx/schemeshard/schemeshard_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_types.h @@ -165,7 +165,7 @@ struct TTempDirInfo { TActorId TempDirOwnerActorId; }; -enum class EDataErasureStatus : ui32 { +enum class EShredStatus : ui32 { UNSPECIFIED = 0, COMPLETED = 1, IN_PROGRESS = 2, diff --git a/ydb/core/tx/schemeshard/ut_helpers/data_erasure_helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/shred_helpers.cpp index 98d5db71a2d..e0259f0d36d 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/data_erasure_helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/shred_helpers.cpp @@ -1,4 +1,4 @@ -#include "data_erasure_helpers.h" +#include "shred_helpers.h" #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> diff --git a/ydb/core/tx/schemeshard/ut_helpers/data_erasure_helpers.h b/ydb/core/tx/schemeshard/ut_helpers/shred_helpers.h index 7f4f33e69dc..7f4f33e69dc 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/data_erasure_helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/shred_helpers.h diff --git a/ydb/core/tx/schemeshard/ut_helpers/ya.make b/ydb/core/tx/schemeshard/ut_helpers/ya.make index f4b58113d22..6cd3f83cb85 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ya.make +++ b/ydb/core/tx/schemeshard/ut_helpers/ya.make @@ -27,13 +27,13 @@ PEERDIR( ) SRCS( - data_erasure_helpers.cpp export_reboots_common.cpp failing_mtpq.cpp helpers.cpp helpers.h ls_checks.cpp ls_checks.h + shred_helpers.cpp test_env.cpp test_env.h test_with_reboots.h diff --git a/ydb/core/tx/schemeshard/ut_data_erasure/ut_data_erasure.cpp b/ydb/core/tx/schemeshard/ut_shred/ut_shred.cpp index bae059d7e66..03df36de6b0 100644 --- a/ydb/core/tx/schemeshard/ut_data_erasure/ut_data_erasure.cpp +++ b/ydb/core/tx/schemeshard/ut_shred/ut_shred.cpp @@ -4,7 +4,7 @@ #include <ydb/core/testlib/actors/block_events.h> #include <ydb/core/testlib/basics/runtime.h> #include <ydb/core/testlib/storage_helpers.h> -#include <ydb/core/tx/schemeshard/ut_helpers/data_erasure_helpers.h> +#include <ydb/core/tx/schemeshard/ut_helpers/shred_helpers.h> #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> #include <library/cpp/testing/unittest/registar.h> @@ -28,9 +28,9 @@ namespace { }); runtime.GetAppData().FeatureFlags.SetEnableDataErasure(true); - auto& dataErasureConfig = runtime.GetAppData().DataErasureConfig; - dataErasureConfig.SetDataErasureIntervalSeconds(50); - dataErasureConfig.SetBlobStorageControllerRequestIntervalSeconds(10); + auto& shredConfig = runtime.GetAppData().ShredConfig; + shredConfig.SetDataErasureIntervalSeconds(50); + shredConfig.SetBlobStorageControllerRequestIntervalSeconds(10); return env; } @@ -56,26 +56,26 @@ namespace { UNIT_ASSERT(BlobStorageContains(dsProxies, valueToDelete)); } - void CheckDataErasureStatus(TTestBasicRuntime& runtime, TActorId sender, TVector<TIntrusivePtr<NFake::TProxyDS>>& dsProxies, const TString& valueToDelete, bool completed) { - auto request = MakeHolder<TEvSchemeShard::TEvDataErasureInfoRequest>(); + void CheckShredStatus(TTestBasicRuntime& runtime, TActorId sender, TVector<TIntrusivePtr<NFake::TProxyDS>>& dsProxies, const TString& valueToDelete, bool completed) { + auto request = MakeHolder<TEvSchemeShard::TEvShredInfoRequest>(); runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); TAutoPtr<IEventHandle> handle; - auto response = runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvDataErasureInfoResponse>(handle); + auto response = runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvShredInfoResponse>(handle); UNIT_ASSERT_EQUAL_C(response->Record.GetGeneration(), 1, response->Record.GetGeneration()); if (completed) { - UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED); + UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvShredInfoResponse::COMPLETED); UNIT_ASSERT(!BlobStorageContains(dsProxies, valueToDelete)); } else { - UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvDataErasureInfoResponse::IN_PROGRESS_TENANT); + UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvShredInfoResponse::IN_PROGRESS_TENANT); UNIT_ASSERT(BlobStorageContains(dsProxies, valueToDelete)); } } } -Y_UNIT_TEST_SUITE(TestDataErasure) { - void SimpleDataErasureTest(const TSchemeObject& createSchemeObject, ui64 currentBscGeneration = 0) { +Y_UNIT_TEST_SUITE(TestShred) { + void SimpleShredTest(const TSchemeObject& createSchemeObject, ui64 currentBscGeneration = 0) { TTestBasicRuntime runtime; TTestEnv env(runtime); @@ -88,12 +88,12 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { }); runtime.GetAppData().FeatureFlags.SetEnableDataErasure(true); - auto& dataErasureConfig = runtime.GetAppData().DataErasureConfig; - dataErasureConfig.SetDataErasureIntervalSeconds(3); - dataErasureConfig.SetBlobStorageControllerRequestIntervalSeconds(1); + auto& shredConfig = runtime.GetAppData().ShredConfig; + shredConfig.SetDataErasureIntervalSeconds(3); + shredConfig.SetBlobStorageControllerRequestIntervalSeconds(1); auto sender = runtime.AllocateEdgeActor(); - // Change BSC counter value between data erasure iterations + // Change BSC counter value between shred iterations if (currentBscGeneration > 1) { auto request = MakeHolder<TEvBlobStorage::TEvControllerShredRequest>(currentBscGeneration); runtime.SendToPipe(MakeBSControllerID(), sender, request.Release(), 0, GetPipeConfigWithRetries()); @@ -109,37 +109,37 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvBlobStorage::EvControllerShredResponse, (currentBscGeneration > 1 ? 4 : 3))); runtime.DispatchEvents(options); - auto request = MakeHolder<TEvSchemeShard::TEvDataErasureInfoRequest>(); + auto request = MakeHolder<TEvSchemeShard::TEvShredInfoRequest>(); runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); TAutoPtr<IEventHandle> handle; - auto response = runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvDataErasureInfoResponse>(handle); + auto response = runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvShredInfoResponse>(handle); if (currentBscGeneration > 1) { UNIT_ASSERT_EQUAL_C(response->Record.GetGeneration(), currentBscGeneration + 1, response->Record.GetGeneration()); } else { UNIT_ASSERT_EQUAL_C(response->Record.GetGeneration(), 1, response->Record.GetGeneration()); } - UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED); + UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvShredInfoResponse::COMPLETED); } Y_UNIT_TEST(SimpleTestForTables) { - SimpleDataErasureTest({.Table = true, .Topic = false}); + SimpleShredTest({.Table = true, .Topic = false}); } Y_UNIT_TEST(SimpleTestForTopic) { - SimpleDataErasureTest({.Table = false, .Topic = true}); + SimpleShredTest({.Table = false, .Topic = true}); } Y_UNIT_TEST(SimpleTestForAllSupportedObjects) { - SimpleDataErasureTest({.Table = true, .Topic = true}); + SimpleShredTest({.Table = true, .Topic = true}); } Y_UNIT_TEST(SchemeShardCounterDoesNotConsistWithBscCounter) { - SimpleDataErasureTest({.Table = true, .Topic = false}, /*currentBscGeneration*/ 47); + SimpleShredTest({.Table = true, .Topic = false}, /*currentBscGeneration*/ 47); } - void DataErasureRun3Cycles(const TSchemeObject& createSchemeObject) { + void ShredRun3Cycles(const TSchemeObject& createSchemeObject) { TTestBasicRuntime runtime; TTestEnv env(runtime); @@ -152,9 +152,9 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { }); runtime.GetAppData().FeatureFlags.SetEnableDataErasure(true); - auto& dataErasureConfig = runtime.GetAppData().DataErasureConfig; - dataErasureConfig.SetDataErasureIntervalSeconds(3); - dataErasureConfig.SetBlobStorageControllerRequestIntervalSeconds(1); + auto& shredConfig = runtime.GetAppData().ShredConfig; + shredConfig.SetDataErasureIntervalSeconds(3); + shredConfig.SetBlobStorageControllerRequestIntervalSeconds(1); auto sender = runtime.AllocateEdgeActor(); RebootTablet(runtime, TTestTxConfig::SchemeShard, sender); @@ -168,29 +168,29 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvBlobStorage::EvControllerShredResponse, 9)); runtime.DispatchEvents(options); - auto request = MakeHolder<TEvSchemeShard::TEvDataErasureInfoRequest>(); + auto request = MakeHolder<TEvSchemeShard::TEvShredInfoRequest>(); runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); TAutoPtr<IEventHandle> handle; - auto response = runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvDataErasureInfoResponse>(handle); + auto response = runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvShredInfoResponse>(handle); UNIT_ASSERT_EQUAL_C(response->Record.GetGeneration(), 3, response->Record.GetGeneration()); - UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED); + UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvShredInfoResponse::COMPLETED); } Y_UNIT_TEST(Run3CyclesForTables) { - DataErasureRun3Cycles({.Table = true, .Topic = false}); + ShredRun3Cycles({.Table = true, .Topic = false}); } Y_UNIT_TEST(Run3CyclesForTopics) { - DataErasureRun3Cycles({.Table = false, .Topic = true}); + ShredRun3Cycles({.Table = false, .Topic = true}); } Y_UNIT_TEST(Run3CyclesForAllSupportedObjects) { - DataErasureRun3Cycles({.Table = true, .Topic = true}); + ShredRun3Cycles({.Table = true, .Topic = true}); } - Y_UNIT_TEST(DataErasureManualLaunch) { + Y_UNIT_TEST(ShredManualLaunch) { TTestBasicRuntime runtime; TTestEnv env(runtime); @@ -203,9 +203,9 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { }); runtime.GetAppData().FeatureFlags.SetEnableDataErasure(true); - auto& dataErasureConfig = runtime.GetAppData().DataErasureConfig; - dataErasureConfig.SetDataErasureIntervalSeconds(0); // do not schedule - dataErasureConfig.SetBlobStorageControllerRequestIntervalSeconds(1); + auto& shredConfig = runtime.GetAppData().ShredConfig; + shredConfig.SetDataErasureIntervalSeconds(0); // do not schedule + shredConfig.SetBlobStorageControllerRequestIntervalSeconds(1); auto sender = runtime.AllocateEdgeActor(); RebootTablet(runtime, TTestTxConfig::SchemeShard, sender); @@ -216,7 +216,7 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { CreateTestExtSubdomain(runtime, env, &txId, "Database2"); { - auto request = MakeHolder<TEvSchemeShard::TEvDataErasureManualStartupRequest>(); + auto request = MakeHolder<TEvSchemeShard::TEvShredManualStartupRequest>(); runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); } @@ -224,14 +224,14 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvBlobStorage::EvControllerShredResponse, 3)); runtime.DispatchEvents(options); - auto request = MakeHolder<TEvSchemeShard::TEvDataErasureInfoRequest>(); + auto request = MakeHolder<TEvSchemeShard::TEvShredInfoRequest>(); runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); TAutoPtr<IEventHandle> handle; - auto response = runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvDataErasureInfoResponse>(handle); + auto response = runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvShredInfoResponse>(handle); UNIT_ASSERT_EQUAL_C(response->Record.GetGeneration(), 1, response->Record.GetGeneration()); - UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED); + UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvShredInfoResponse::COMPLETED); } Y_UNIT_TEST(ManualLaunch3Cycles) { @@ -247,9 +247,9 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { }); runtime.GetAppData().FeatureFlags.SetEnableDataErasure(true); - auto& dataErasureConfig = runtime.GetAppData().DataErasureConfig; - dataErasureConfig.SetDataErasureIntervalSeconds(0); // do not schedule - dataErasureConfig.SetBlobStorageControllerRequestIntervalSeconds(1); + auto& shredConfig = runtime.GetAppData().ShredConfig; + shredConfig.SetDataErasureIntervalSeconds(0); // do not schedule + shredConfig.SetBlobStorageControllerRequestIntervalSeconds(1); auto sender = runtime.AllocateEdgeActor(); RebootTablet(runtime, TTestTxConfig::SchemeShard, sender); @@ -259,10 +259,10 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { CreateTestExtSubdomain(runtime, env, &txId, "Database1"); CreateTestExtSubdomain(runtime, env, &txId, "Database2"); - auto RunDataErasure = [&runtime] (ui32 expectedGeneration) { + auto runShred = [&runtime](ui32 expectedGeneration) { auto sender = runtime.AllocateEdgeActor(); { - auto request = MakeHolder<TEvSchemeShard::TEvDataErasureManualStartupRequest>(); + auto request = MakeHolder<TEvSchemeShard::TEvShredManualStartupRequest>(); runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); } @@ -270,19 +270,19 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvBlobStorage::EvControllerShredResponse, 3)); runtime.DispatchEvents(options); - auto request = MakeHolder<TEvSchemeShard::TEvDataErasureInfoRequest>(); + auto request = MakeHolder<TEvSchemeShard::TEvShredInfoRequest>(); runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); TAutoPtr<IEventHandle> handle; - auto response = runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvDataErasureInfoResponse>(handle); + auto response = runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvShredInfoResponse>(handle); UNIT_ASSERT_EQUAL_C(response->Record.GetGeneration(), expectedGeneration, response->Record.GetGeneration()); - UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED); + UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvShredInfoResponse::COMPLETED); }; - RunDataErasure(1); - RunDataErasure(2); - RunDataErasure(3); + runShred(1); + runShred(2); + runShred(3); } Y_UNIT_TEST(ManualLaunch3CyclesWithNotConsistentCountersInSchemeShardAndBSC) { @@ -298,9 +298,9 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { }); runtime.GetAppData().FeatureFlags.SetEnableDataErasure(true); - auto& dataErasureConfig = runtime.GetAppData().DataErasureConfig; - dataErasureConfig.SetDataErasureIntervalSeconds(0); // do not schedule - dataErasureConfig.SetBlobStorageControllerRequestIntervalSeconds(1); + auto& shredConfig = runtime.GetAppData().ShredConfig; + shredConfig.SetDataErasureIntervalSeconds(0); // do not schedule + shredConfig.SetBlobStorageControllerRequestIntervalSeconds(1); auto sender = runtime.AllocateEdgeActor(); RebootTablet(runtime, TTestTxConfig::SchemeShard, sender); @@ -310,10 +310,10 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { CreateTestExtSubdomain(runtime, env, &txId, "Database1"); CreateTestExtSubdomain(runtime, env, &txId, "Database2"); - auto RunDataErasure = [&runtime] (ui32 expectedGeneration, ui32 requiredCountShredResponses) { + auto runShred = [&runtime](ui32 expectedGeneration, ui32 requiredCountShredResponses) { auto sender = runtime.AllocateEdgeActor(); { - auto request = MakeHolder<TEvSchemeShard::TEvDataErasureManualStartupRequest>(); + auto request = MakeHolder<TEvSchemeShard::TEvShredManualStartupRequest>(); runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); } @@ -321,32 +321,32 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvBlobStorage::EvControllerShredResponse, requiredCountShredResponses)); runtime.DispatchEvents(options); - auto request = MakeHolder<TEvSchemeShard::TEvDataErasureInfoRequest>(); + auto request = MakeHolder<TEvSchemeShard::TEvShredInfoRequest>(); runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); TAutoPtr<IEventHandle> handle; - auto response = runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvDataErasureInfoResponse>(handle); + auto response = runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvShredInfoResponse>(handle); UNIT_ASSERT_EQUAL_C(response->Record.GetGeneration(), expectedGeneration, response->Record.GetGeneration()); - UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED); + UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvShredInfoResponse::COMPLETED); }; - RunDataErasure(1, 3); - // Change BSC counter value between data erasure iterations + runShred(1, 3); + // Change BSC counter value between shred iterations { auto request = MakeHolder<TEvBlobStorage::TEvControllerShredRequest>(50); runtime.SendToPipe(MakeBSControllerID(), sender, request.Release(), 0, GetPipeConfigWithRetries()); } - RunDataErasure(51, 4); - // Change BSC counter value between data erasure iterations + runShred(51, 4); + // Change BSC counter value between shred iterations { auto request = MakeHolder<TEvBlobStorage::TEvControllerShredRequest>(100); runtime.SendToPipe(MakeBSControllerID(), sender, request.Release(), 0, GetPipeConfigWithRetries()); } - RunDataErasure(101, 4); + runShred(101, 4); } - Y_UNIT_TEST(DataErasureWithCopyTable) { + Y_UNIT_TEST(ShredWithCopyTable) { TTestBasicRuntime runtime; TVector<TIntrusivePtr<NFake::TProxyDS>> dsProxies { MakeIntrusive<NFake::TProxyDS>(TGroupId::FromValue(0)), @@ -370,7 +370,7 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { runtime.WaitFor("borrow return", [&borrowReturns]{ return borrowReturns.size() >= 1; }); // data cleanup should not be finished due to holded borrow returns - CheckDataErasureStatus(runtime, sender, dsProxies, value, false); + CheckShredStatus(runtime, sender, dsProxies, value, false); // return borrow borrowReturns.Stop().Unblock(); @@ -380,10 +380,10 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { runtime.DispatchEvents(options); // data cleanup should be finished after returned borrows - CheckDataErasureStatus(runtime, sender, dsProxies, value, true); + CheckShredStatus(runtime, sender, dsProxies, value, true); } - Y_UNIT_TEST(DataErasureWithSplit) { + Y_UNIT_TEST(ShredWithSplit) { TTestBasicRuntime runtime; TVector<TIntrusivePtr<NFake::TProxyDS>> dsProxies { MakeIntrusive<NFake::TProxyDS>(TGroupId::FromValue(0)), @@ -430,8 +430,8 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { runtime.WaitFor("borrow return", [&borrowReturns]{ return borrowReturns.size() >= 1; }); - // DataErasure should be in progress because of SplitTable and DataCleanup have been suspended - CheckDataErasureStatus(runtime, sender, dsProxies, valueToDelete, false); + // Shred should be in progress because of SplitTable and DataCleanup have been suspended + CheckShredStatus(runtime, sender, dsProxies, valueToDelete, false); auto shards2 = GetTableShards(runtime, schemeshardId, "/MyRoot/Database1/Simple"); UNIT_ASSERT_VALUES_EQUAL(shards2.size(), 2); @@ -444,10 +444,10 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { runtime.DispatchEvents(options); // now data cleanup should be finished - CheckDataErasureStatus(runtime, sender, dsProxies, valueToDelete, true); + CheckShredStatus(runtime, sender, dsProxies, valueToDelete, true); } - Y_UNIT_TEST(DataErasureWithMerge) { + Y_UNIT_TEST(ShredWithMerge) { TTestBasicRuntime runtime; TVector<TIntrusivePtr<NFake::TProxyDS>> dsProxies { MakeIntrusive<NFake::TProxyDS>(TGroupId::FromValue(0)), @@ -501,8 +501,8 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { runtime.WaitFor("borrow return", [&borrowReturns]{ return borrowReturns.size() >= 1; }); - // DataErasure should be in progress because of SplitTable and DataCleanup have been suspended - CheckDataErasureStatus(runtime, sender, dsProxies, valueToDelete, false); + // Shred should be in progress because of SplitTable and DataCleanup have been suspended + CheckShredStatus(runtime, sender, dsProxies, valueToDelete, false); auto shards2 = GetTableShards(runtime, schemeshardId, "/MyRoot/Database1/Simple"); UNIT_ASSERT_VALUES_EQUAL(shards2.size(), 1); @@ -515,6 +515,6 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { runtime.DispatchEvents(options); // now data cleanup should be finished - CheckDataErasureStatus(runtime, sender, dsProxies, valueToDelete, true); + CheckShredStatus(runtime, sender, dsProxies, valueToDelete, true); } } diff --git a/ydb/core/tx/schemeshard/ut_data_erasure/ya.make b/ydb/core/tx/schemeshard/ut_shred/ya.make index 26656e5d129..18ca51ee658 100644 --- a/ydb/core/tx/schemeshard/ut_data_erasure/ya.make +++ b/ydb/core/tx/schemeshard/ut_shred/ya.make @@ -22,7 +22,7 @@ PEERDIR( ) SRCS( - ut_data_erasure.cpp + ut_shred.cpp ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/tx/schemeshard/ut_data_erasure_reboots/ut_data_erasure_reboots.cpp b/ydb/core/tx/schemeshard/ut_shred_reboots/ut_shred_reboots.cpp index ceefb5f78a3..da972a794c2 100644 --- a/ydb/core/tx/schemeshard/ut_data_erasure_reboots/ut_data_erasure_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_shred_reboots/ut_shred_reboots.cpp @@ -2,7 +2,7 @@ #include <ydb/core/mind/bscontroller/bsc.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> #include <ydb/core/testlib/basics/runtime.h> -#include <ydb/core/tx/schemeshard/ut_helpers/data_erasure_helpers.h> +#include <ydb/core/tx/schemeshard/ut_helpers/shred_helpers.h> #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> #include <library/cpp/testing/unittest/registar.h> @@ -11,11 +11,11 @@ using namespace NKikimr; using namespace NSchemeShard; using namespace NSchemeShardUT_Private; -Y_UNIT_TEST_SUITE(DataErasureReboots) { +Y_UNIT_TEST_SUITE(ShredReboots) { Y_UNIT_TEST(Fake) { } - Y_UNIT_TEST(SimpleDataErasureTest) { + Y_UNIT_TEST(SimpleShredTest) { TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { @@ -28,9 +28,9 @@ Y_UNIT_TEST_SUITE(DataErasureReboots) { }); runtime.GetAppData().FeatureFlags.SetEnableDataErasure(true); - auto& dataErasureConfig = runtime.GetAppData().DataErasureConfig; - dataErasureConfig.SetDataErasureIntervalSeconds(0); // do not schedule - dataErasureConfig.SetBlobStorageControllerRequestIntervalSeconds(1); + auto& shredConfig = runtime.GetAppData().ShredConfig; + shredConfig.SetDataErasureIntervalSeconds(0); // do not schedule + shredConfig.SetBlobStorageControllerRequestIntervalSeconds(1); ui64 txId = 100; @@ -42,21 +42,21 @@ Y_UNIT_TEST_SUITE(DataErasureReboots) { { TInactiveZone inactive(activeZone); { - auto request = MakeHolder<TEvSchemeShard::TEvDataErasureManualStartupRequest>(); + auto request = MakeHolder<TEvSchemeShard::TEvShredManualStartupRequest>(); runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); } TDispatchOptions options; options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvBlobStorage::EvControllerShredResponse, 3)); runtime.DispatchEvents(options); - auto request = MakeHolder<TEvSchemeShard::TEvDataErasureInfoRequest>(); + auto request = MakeHolder<TEvSchemeShard::TEvShredInfoRequest>(); runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); TAutoPtr<IEventHandle> handle; - auto response = runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvDataErasureInfoResponse>(handle); + auto response = runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvShredInfoResponse>(handle); UNIT_ASSERT_EQUAL_C(response->Record.GetGeneration(), 1, response->Record.GetGeneration()); - UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED); + UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvShredInfoResponse::COMPLETED); } }); } diff --git a/ydb/core/tx/schemeshard/ut_data_erasure_reboots/ya.make b/ydb/core/tx/schemeshard/ut_shred_reboots/ya.make index 3a823fc3440..a624c419ee6 100644 --- a/ydb/core/tx/schemeshard/ut_data_erasure_reboots/ya.make +++ b/ydb/core/tx/schemeshard/ut_shred_reboots/ya.make @@ -20,7 +20,7 @@ PEERDIR( ) SRCS( - ut_data_erasure_reboots.cpp + ut_shred_reboots.cpp ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 5402ac65830..e2df3a2681e 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -13,8 +13,8 @@ RECURSE_FOR_TESTS( ut_column_build ut_compaction ut_continuous_backup - ut_data_erasure - ut_data_erasure_reboots + ut_shred + ut_shred_reboots ut_export ut_export_reboots_s3 ut_external_data_source @@ -77,7 +77,7 @@ SRCS( schemeshard__borrowed_compaction.cpp schemeshard__clean_pathes.cpp schemeshard__conditional_erase.cpp - schemeshard__data_erasure_manager.cpp + schemeshard__shred_manager.cpp schemeshard__delete_tablet_reply.cpp schemeshard__describe_scheme.cpp schemeshard__find_subdomain_path_id.cpp @@ -206,13 +206,13 @@ SRCS( schemeshard__operation_upgrade_subdomain.cpp schemeshard__pq_stats.cpp schemeshard__publish_to_scheme_board.cpp - schemeshard__root_data_erasure_manager.cpp + schemeshard__root_shred_manager.cpp schemeshard__serverless_storage_billing.cpp schemeshard__state_changed_reply.cpp schemeshard__sync_update_tenants.cpp schemeshard__table_stats.cpp schemeshard__table_stats_histogram.cpp - schemeshard__tenant_data_erasure_manager.cpp + schemeshard__tenant_shred_manager.cpp schemeshard__unmark_restore_tables.cpp schemeshard__upgrade_access_database.cpp schemeshard__upgrade_schema.cpp diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index d59c5d3b1bd..5ab44d65919 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1092,8 +1092,8 @@ message TActivity { HTTP_MON_AUTHORIZED_ACTOR_REQUEST = 655; IMPORT_SCHEME_QUERY_EXECUTOR = 656; REPLICATION_TRANSFER_WRITER = 657; - DATA_ERASURE = 658; - TENANT_DATA_ERASURE = 659; + SCHEMESHARD_SHRED = 658; + SCHEMESHARD_TENANT_SHRED = 659; BS_SYNC_BROKER = 660; BS_PROXY_CHECKINTEGRITY_ACTOR = 661; CMS_CONFIG_INFO_COLLECTOR = 662; |