diff options
author | monster <monster@ydb.tech> | 2022-11-17 22:06:52 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-11-17 22:06:52 +0300 |
commit | 6b0ef1e419310324ad779d3adf521b92a6c78ad7 (patch) | |
tree | a8ef368b9f3a04c4894d29f5e8579c93ec0d234b | |
parent | 00b076591d06d78ed0e9021dce5981eaa6f56de7 (diff) | |
download | ydb-6b0ef1e419310324ad779d3adf521b92a6c78ad7.tar.gz |
autocreating missing sysviewprocessors
-rw-r--r-- | ydb/core/cms/console/console__create_tenant.cpp | 2 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 2 | ||||
-rw-r--r-- | ydb/core/protos/services.proto | 1 | ||||
-rw-r--r-- | ydb/core/sys_view/processor/tx_init_schema.cpp | 2 | ||||
-rw-r--r-- | ydb/core/sys_view/service/sysview_service.cpp | 13 | ||||
-rw-r--r-- | ydb/core/sys_view/ut_common.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 35 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_svp_migration.cpp | 169 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_svp_migration.h | 16 |
11 files changed, 232 insertions, 17 deletions
diff --git a/ydb/core/cms/console/console__create_tenant.cpp b/ydb/core/cms/console/console__create_tenant.cpp index 2f8676de35..469261f25a 100644 --- a/ydb/core/cms/console/console__create_tenant.cpp +++ b/ydb/core/cms/console/console__create_tenant.cpp @@ -126,7 +126,7 @@ public: Tenant->IsExternalSubdomain = Self->FeatureFlags.GetEnableExternalSubdomains(); Tenant->IsExternalHive = Self->FeatureFlags.GetEnableExternalHive(); - Tenant->IsExternalSysViewProcessor = Self->FeatureFlags.GetEnablePersistentQueryStats(); + Tenant->IsExternalSysViewProcessor = Self->FeatureFlags.GetEnableSystemViews(); if (rec.options().disable_external_subdomain()) { Tenant->IsExternalSubdomain = false; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 8b1c4e6ad1..a3be7cd884 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -690,7 +690,7 @@ message TFeatureFlags { optional bool EnableOfflineSlaves = 22 [default = true]; // deprecated: always true optional bool CheckDatabaseAccessPermission = 23 [default = false]; optional bool AllowOnlineIndexBuild = 24 [default = true]; // deprecated: always true - optional bool EnablePersistentQueryStats = 25 [default = false]; + optional bool EnablePersistentQueryStats = 25 [default = true]; optional bool DisableDataShardBarrier = 26 [default = false]; optional bool EnablePutBatchingForBlobStorage = 27 [default = true]; optional bool EnableKqpWideFlow = 28 [default = true]; // deprecated: always true diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 2c515042c2..1f1fb531d7 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -942,5 +942,6 @@ message TActivity { HTTP_MON_SERVICE_MON_REQUEST = 594; HTTP_MON_SERVICE_NODE_PROXY = 595; AUDIT_WRITER_ACTOR = 596; + SCHEMESHARD_SVP_MIGRATOR = 597; }; }; diff --git a/ydb/core/sys_view/processor/tx_init_schema.cpp b/ydb/core/sys_view/processor/tx_init_schema.cpp index 8a1df28d68..4e1ed9f6cc 100644 --- a/ydb/core/sys_view/processor/tx_init_schema.cpp +++ b/ydb/core/sys_view/processor/tx_init_schema.cpp @@ -40,7 +40,7 @@ struct TSysViewProcessor::TTxInitSchema : public TTxBase { void Complete(const TActorContext& ctx) override { SVLOG_D("[" << Self->TabletID() << "] TTxInitSchema::Complete"); - if (!AppData()->FeatureFlags.GetEnablePersistentQueryStats()) { + if (!AppData()->FeatureFlags.GetEnableSystemViews()) { SVLOG_D("[" << Self->TabletID() << "] tablet is offline"); Self->SignalTabletActive(ctx); Self->Become(&TThis::StateOffline); diff --git a/ydb/core/sys_view/service/sysview_service.cpp b/ydb/core/sys_view/service/sysview_service.cpp index 92e826cd7f..76995c0c99 100644 --- a/ydb/core/sys_view/service/sysview_service.cpp +++ b/ydb/core/sys_view/service/sysview_service.cpp @@ -348,10 +348,8 @@ public: ScanLimiter = MakeIntrusive<TScanLimiter>(ConcurrentScansLimit); - if (AppData()->FeatureFlags.GetEnablePersistentQueryStats()) { - IntervalEnd = GetNextIntervalEnd(); - Schedule(IntervalEnd, new TEvPrivate::TEvProcessInterval(IntervalEnd)); - } + IntervalEnd = GetNextIntervalEnd(); + Schedule(IntervalEnd, new TEvPrivate::TEvProcessInterval(IntervalEnd)); if (AppData()->FeatureFlags.GetEnableDbCounters()) { auto intervalSize = ProcessCountersInterval.MicroSeconds(); @@ -668,11 +666,6 @@ private: void Handle(TEvSysView::TEvGetIntervalMetricsRequest::TPtr& ev) { auto response = MakeHolder<TEvSysView::TEvGetIntervalMetricsResponse>(); - if (!AppData()->FeatureFlags.GetEnablePersistentQueryStats()) { - Send(ev->Sender, std::move(response), 0, ev->Cookie); - return; - } - const auto& record = ev->Get()->Record; response->Record.SetIntervalEndUs(record.GetIntervalEndUs()); const auto& database = record.GetDatabase(); @@ -916,7 +909,7 @@ private: << ", query hash# " << stats->GetQueryTextHash() << ", cpu time# " << stats->GetTotalCpuTimeUs()); - if (AppData()->FeatureFlags.GetEnablePersistentQueryStats() && !database.empty()) { + if (!database.empty()) { auto queryEnd = TInstant::MilliSeconds(stats->GetEndTimeMs()); if (queryEnd < IntervalEnd - TotalInterval) { return; diff --git a/ydb/core/sys_view/ut_common.cpp b/ydb/core/sys_view/ut_common.cpp index 726c531a94..898e4cb8c2 100644 --- a/ydb/core/sys_view/ut_common.cpp +++ b/ydb/core/sys_view/ut_common.cpp @@ -42,10 +42,8 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool featureFlags.SetEnableBackgroundCompaction(false); Settings->SetFeatureFlags(featureFlags); - if (enableSVP) { - Settings->SetEnablePersistentQueryStats(true); - Settings->SetEnableDbCounters(true); - } + Settings->SetEnablePersistentQueryStats(enableSVP); + Settings->SetEnableDbCounters(enableSVP); for (ui32 i : xrange(storagePools)) { TString poolName = Sprintf("test%d", i); diff --git a/ydb/core/tx/schemeshard/CMakeLists.txt b/ydb/core/tx/schemeshard/CMakeLists.txt index e533a8a913..33100184ee 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.txt @@ -206,6 +206,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path_element.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_svp_migration.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_types.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_ui64id.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_utils.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 2c75f1c6e9..3e8fe2c4b2 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1,5 +1,6 @@ #include "schemeshard.h" #include "schemeshard_impl.h" +#include "schemeshard_svp_migration.h" #include <ydb/core/tablet_flat/tablet_flat_executed.h> #include <ydb/core/tablet/tablet_counters_aggregator.h> @@ -83,6 +84,36 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, Execute(CreateTxCleanBlockStoreVolumes(std::move(blockStoreVolumesToClean)), ctx); } + if (IsDomainSchemeShard) { + std::queue<TSVPMigrationInfo> migrations; + for (auto& [pathId, subdomain] : SubDomains) { + if (subdomain->GetTenantSchemeShardID() == InvalidTabletId) { // no tenant schemeshard + continue; + } + if (subdomain->GetTenantSysViewProcessorID() != InvalidTabletId) { // tenant has SVP + continue; + } + + auto path = TPath::Init(pathId, this); + if (path->IsRoot()) { // do not migrate main domain + continue; + } + + auto workingDir = path.Parent().PathString(); + auto dbName = path.LeafName(); + TSVPMigrationInfo migration{workingDir, dbName}; + migrations.push(std::move(migration)); + + LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, + "SVPMigrator - creating SVP" + << ", working dir: " << workingDir + << ", db name: " << dbName + << ", at schemeshard: " << TabletID()); + } + + SVPMigrator = Register(CreateSVPMigrator(TabletID(), SelfId(), std::move(migrations)).Release()); + } + ResumeExports(exportIds, ctx); ResumeImports(importsIds, ctx); @@ -3917,6 +3948,10 @@ void TSchemeShard::Die(const TActorContext &ctx) { ctx.Send(TxAllocatorClient, new TEvents::TEvPoisonPill()); ctx.Send(SysPartitionStatsCollector, new TEvents::TEvPoisonPill()); + if (SVPMigrator) { + ctx.Send(SVPMigrator, new TEvents::TEvPoisonPill()); + } + ShardDeleter.Shutdown(ctx); ParentDomainLink.Shutdown(ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index e91e33afe4..d7cf400f92 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -265,6 +265,8 @@ public: TActorId SysPartitionStatsCollector; + TActorId SVPMigrator; + TDuration StatsMaxExecuteTime; TDuration StatsBatchTimeout; ui32 StatsMaxBatchSize = 0; diff --git a/ydb/core/tx/schemeshard/schemeshard_svp_migration.cpp b/ydb/core/tx/schemeshard/schemeshard_svp_migration.cpp new file mode 100644 index 0000000000..ba76703036 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_svp_migration.cpp @@ -0,0 +1,169 @@ +#include "schemeshard_svp_migration.h" +#include "schemeshard_impl.h" + +#include <ydb/core/tx/tx_proxy/proxy.h> + +namespace NKikimr::NSchemeShard { + +class TSVPMigrator : public TActorBootstrapped<TSVPMigrator> { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::SCHEMESHARD_SVP_MIGRATOR; + } + + TSVPMigrator(ui64 ssTabletId, TActorId ssActorId, std::queue<TSVPMigrationInfo>&& migrations) + : SSTabletId(ssTabletId) + , SSActorId(ssActorId) + , Queue(std::move(migrations)) + {} + + void Bootstrap() { + Schedule(TDuration::Seconds(15), new TEvents::TEvWakeup); + Become(&TSVPMigrator::StateWork); + } + + STFUNC(StateWork) { + switch(ev->GetTypeRewrite()) { + hFunc(TEvents::TEvWakeup, Handle); + hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle) + hFunc(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle); + hFunc(TEvSchemeShard::TEvNotifyTxCompletionResult, Handle); + IgnoreFunc(TEvSchemeShard::TEvNotifyTxCompletionRegistered); + cFunc(TEvents::TEvPoison::EventType, PassAway); + default: + LOG_CRIT(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TSVPMigrator StateWork unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); + } + } + +private: + void RequestTxId() { + LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, + "SVPMigrator - send TEvAllocateTxId" + << ", working dir " << Current.WorkingDir + << ", db name: " << Current.DbName + << ", at schemeshard: " << SSTabletId); + + Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId); + } + + void SendModifyScheme(ui64 txId) { + auto request = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(); + auto& record = request->Record; + record.SetTxId(txId); + + auto& modifyScheme = *record.AddTransaction(); + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterExtSubDomain); + modifyScheme.SetWorkingDir(Current.WorkingDir); + modifyScheme.SetFailOnExist(false); + + auto& modifySubDomain = *modifyScheme.MutableSubDomain(); + modifySubDomain.SetName(Current.DbName); + modifySubDomain.SetExternalSysViewProcessor(true); + + LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, + "SVPMigrator - send TEvModifySchemeTransaction" + << ", working dir " << Current.WorkingDir + << ", db name: " << Current.DbName + << ", at schemeshard: " << SSTabletId); + + Send(SSActorId, request.Release()); + } + + void SubscribeToCompletion(ui64 txId) { + auto request = MakeHolder<TEvSchemeShard::TEvNotifyTxCompletion>(); + request->Record.SetTxId(txId); + + LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, + "SVPMigrator - send TEvNotifyTxCompletion" + << ", txId " << txId + << ", at schemeshard: " << SSTabletId); + + Send(SSActorId, request.Release()); + } + + void StartNextMigration() { + Current = {}; + if (Queue.empty()) { + PassAway(); + return; + } + Current = std::move(Queue.front()); + Queue.pop(); + RequestTxId(); + } + + void Handle(TEvents::TEvWakeup::TPtr&) { + LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, + "SVPMigrator - start processing migrations" + << ", queue size: " << Queue.size() + << ", at schemeshard: " << SSTabletId); + + StartNextMigration(); + } + + void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) { + auto txId = ev->Get()->TxId; + + LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, + "SVPMigrator - handle TEvAllocateTxIdResult" + << ", txId: " << txId + << ", at schemeshard: " << SSTabletId); + + SendModifyScheme(txId); + } + + void Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev) { + auto& record = ev->Get()->Record; + auto status = record.GetStatus(); + auto txId = record.GetTxId(); + + LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, + "SVPMigrator - handle TEvModifySchemeTransactionResult" + << ", status: " << status + << ", txId: " << txId + << ", at schemeshard: " << SSTabletId); + + switch (status) { + case NKikimrScheme::StatusSuccess: + StartNextMigration(); + break; + case NKikimrScheme::StatusAccepted: + SubscribeToCompletion(record.GetTxId()); + break; + default: + LOG_ERROR_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, + "SVPMigrator - migration failed" + << ", status: " << status + << ", reason: " << record.GetReason() + << ", txId: " << txId + << ", at schemeshard: " << SSTabletId); + + StartNextMigration(); + break; + } + } + + void Handle(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) { + LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, + "SVPMigrator - handle TEvNotifyTxCompletionResult" + << ", txId: " << ev->Get()->Record.GetTxId() + << ", at schemeshard: " << SSTabletId); + + StartNextMigration(); + } + +private: + const ui64 SSTabletId; + const TActorId SSActorId; + std::queue<TSVPMigrationInfo> Queue; + TSVPMigrationInfo Current; +}; + +THolder<IActor> CreateSVPMigrator(ui64 ssTabletId, TActorId ssActorId, + std::queue<TSVPMigrationInfo>&& migrations) +{ + return MakeHolder<TSVPMigrator>(ssTabletId, ssActorId, std::move(migrations)); +} + +} // namespace NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_svp_migration.h b/ydb/core/tx/schemeshard/schemeshard_svp_migration.h new file mode 100644 index 0000000000..b71466cd3f --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_svp_migration.h @@ -0,0 +1,16 @@ +#pragma once + +#include <library/cpp/actors/core/actor.h> +#include <queue> + +namespace NKikimr::NSchemeShard { + +struct TSVPMigrationInfo { + TString WorkingDir; + TString DbName; +}; + +THolder<NActors::IActor> CreateSVPMigrator(ui64 ssTabletId, NActors::TActorId ssActorId, + std::queue<TSVPMigrationInfo>&& migrations); + +} // namespace NKikimr::NSchemeShard |