aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-11-17 22:06:52 +0300
committermonster <monster@ydb.tech>2022-11-17 22:06:52 +0300
commit6b0ef1e419310324ad779d3adf521b92a6c78ad7 (patch)
treea8ef368b9f3a04c4894d29f5e8579c93ec0d234b
parent00b076591d06d78ed0e9021dce5981eaa6f56de7 (diff)
downloadydb-6b0ef1e419310324ad779d3adf521b92a6c78ad7.tar.gz
autocreating missing sysviewprocessors
-rw-r--r--ydb/core/cms/console/console__create_tenant.cpp2
-rw-r--r--ydb/core/protos/config.proto2
-rw-r--r--ydb/core/protos/services.proto1
-rw-r--r--ydb/core/sys_view/processor/tx_init_schema.cpp2
-rw-r--r--ydb/core/sys_view/service/sysview_service.cpp13
-rw-r--r--ydb/core/sys_view/ut_common.cpp6
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.txt1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp35
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_svp_migration.cpp169
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_svp_migration.h16
11 files changed, 232 insertions, 17 deletions
diff --git a/ydb/core/cms/console/console__create_tenant.cpp b/ydb/core/cms/console/console__create_tenant.cpp
index 2f8676de35..469261f25a 100644
--- a/ydb/core/cms/console/console__create_tenant.cpp
+++ b/ydb/core/cms/console/console__create_tenant.cpp
@@ -126,7 +126,7 @@ public:
Tenant->IsExternalSubdomain = Self->FeatureFlags.GetEnableExternalSubdomains();
Tenant->IsExternalHive = Self->FeatureFlags.GetEnableExternalHive();
- Tenant->IsExternalSysViewProcessor = Self->FeatureFlags.GetEnablePersistentQueryStats();
+ Tenant->IsExternalSysViewProcessor = Self->FeatureFlags.GetEnableSystemViews();
if (rec.options().disable_external_subdomain()) {
Tenant->IsExternalSubdomain = false;
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 8b1c4e6ad1..a3be7cd884 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -690,7 +690,7 @@ message TFeatureFlags {
optional bool EnableOfflineSlaves = 22 [default = true]; // deprecated: always true
optional bool CheckDatabaseAccessPermission = 23 [default = false];
optional bool AllowOnlineIndexBuild = 24 [default = true]; // deprecated: always true
- optional bool EnablePersistentQueryStats = 25 [default = false];
+ optional bool EnablePersistentQueryStats = 25 [default = true];
optional bool DisableDataShardBarrier = 26 [default = false];
optional bool EnablePutBatchingForBlobStorage = 27 [default = true];
optional bool EnableKqpWideFlow = 28 [default = true]; // deprecated: always true
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 2c515042c2..1f1fb531d7 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -942,5 +942,6 @@ message TActivity {
HTTP_MON_SERVICE_MON_REQUEST = 594;
HTTP_MON_SERVICE_NODE_PROXY = 595;
AUDIT_WRITER_ACTOR = 596;
+ SCHEMESHARD_SVP_MIGRATOR = 597;
};
};
diff --git a/ydb/core/sys_view/processor/tx_init_schema.cpp b/ydb/core/sys_view/processor/tx_init_schema.cpp
index 8a1df28d68..4e1ed9f6cc 100644
--- a/ydb/core/sys_view/processor/tx_init_schema.cpp
+++ b/ydb/core/sys_view/processor/tx_init_schema.cpp
@@ -40,7 +40,7 @@ struct TSysViewProcessor::TTxInitSchema : public TTxBase {
void Complete(const TActorContext& ctx) override {
SVLOG_D("[" << Self->TabletID() << "] TTxInitSchema::Complete");
- if (!AppData()->FeatureFlags.GetEnablePersistentQueryStats()) {
+ if (!AppData()->FeatureFlags.GetEnableSystemViews()) {
SVLOG_D("[" << Self->TabletID() << "] tablet is offline");
Self->SignalTabletActive(ctx);
Self->Become(&TThis::StateOffline);
diff --git a/ydb/core/sys_view/service/sysview_service.cpp b/ydb/core/sys_view/service/sysview_service.cpp
index 92e826cd7f..76995c0c99 100644
--- a/ydb/core/sys_view/service/sysview_service.cpp
+++ b/ydb/core/sys_view/service/sysview_service.cpp
@@ -348,10 +348,8 @@ public:
ScanLimiter = MakeIntrusive<TScanLimiter>(ConcurrentScansLimit);
- if (AppData()->FeatureFlags.GetEnablePersistentQueryStats()) {
- IntervalEnd = GetNextIntervalEnd();
- Schedule(IntervalEnd, new TEvPrivate::TEvProcessInterval(IntervalEnd));
- }
+ IntervalEnd = GetNextIntervalEnd();
+ Schedule(IntervalEnd, new TEvPrivate::TEvProcessInterval(IntervalEnd));
if (AppData()->FeatureFlags.GetEnableDbCounters()) {
auto intervalSize = ProcessCountersInterval.MicroSeconds();
@@ -668,11 +666,6 @@ private:
void Handle(TEvSysView::TEvGetIntervalMetricsRequest::TPtr& ev) {
auto response = MakeHolder<TEvSysView::TEvGetIntervalMetricsResponse>();
- if (!AppData()->FeatureFlags.GetEnablePersistentQueryStats()) {
- Send(ev->Sender, std::move(response), 0, ev->Cookie);
- return;
- }
-
const auto& record = ev->Get()->Record;
response->Record.SetIntervalEndUs(record.GetIntervalEndUs());
const auto& database = record.GetDatabase();
@@ -916,7 +909,7 @@ private:
<< ", query hash# " << stats->GetQueryTextHash()
<< ", cpu time# " << stats->GetTotalCpuTimeUs());
- if (AppData()->FeatureFlags.GetEnablePersistentQueryStats() && !database.empty()) {
+ if (!database.empty()) {
auto queryEnd = TInstant::MilliSeconds(stats->GetEndTimeMs());
if (queryEnd < IntervalEnd - TotalInterval) {
return;
diff --git a/ydb/core/sys_view/ut_common.cpp b/ydb/core/sys_view/ut_common.cpp
index 726c531a94..898e4cb8c2 100644
--- a/ydb/core/sys_view/ut_common.cpp
+++ b/ydb/core/sys_view/ut_common.cpp
@@ -42,10 +42,8 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool
featureFlags.SetEnableBackgroundCompaction(false);
Settings->SetFeatureFlags(featureFlags);
- if (enableSVP) {
- Settings->SetEnablePersistentQueryStats(true);
- Settings->SetEnableDbCounters(true);
- }
+ Settings->SetEnablePersistentQueryStats(enableSVP);
+ Settings->SetEnableDbCounters(enableSVP);
for (ui32 i : xrange(storagePools)) {
TString poolName = Sprintf("test%d", i);
diff --git a/ydb/core/tx/schemeshard/CMakeLists.txt b/ydb/core/tx/schemeshard/CMakeLists.txt
index e533a8a913..33100184ee 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.txt
@@ -206,6 +206,7 @@ target_sources(core-tx-schemeshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path_element.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_svp_migration.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_types.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_ui64id.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_utils.cpp
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 2c75f1c6e9..3e8fe2c4b2 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -1,5 +1,6 @@
#include "schemeshard.h"
#include "schemeshard_impl.h"
+#include "schemeshard_svp_migration.h"
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
#include <ydb/core/tablet/tablet_counters_aggregator.h>
@@ -83,6 +84,36 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx,
Execute(CreateTxCleanBlockStoreVolumes(std::move(blockStoreVolumesToClean)), ctx);
}
+ if (IsDomainSchemeShard) {
+ std::queue<TSVPMigrationInfo> migrations;
+ for (auto& [pathId, subdomain] : SubDomains) {
+ if (subdomain->GetTenantSchemeShardID() == InvalidTabletId) { // no tenant schemeshard
+ continue;
+ }
+ if (subdomain->GetTenantSysViewProcessorID() != InvalidTabletId) { // tenant has SVP
+ continue;
+ }
+
+ auto path = TPath::Init(pathId, this);
+ if (path->IsRoot()) { // do not migrate main domain
+ continue;
+ }
+
+ auto workingDir = path.Parent().PathString();
+ auto dbName = path.LeafName();
+ TSVPMigrationInfo migration{workingDir, dbName};
+ migrations.push(std::move(migration));
+
+ LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "SVPMigrator - creating SVP"
+ << ", working dir: " << workingDir
+ << ", db name: " << dbName
+ << ", at schemeshard: " << TabletID());
+ }
+
+ SVPMigrator = Register(CreateSVPMigrator(TabletID(), SelfId(), std::move(migrations)).Release());
+ }
+
ResumeExports(exportIds, ctx);
ResumeImports(importsIds, ctx);
@@ -3917,6 +3948,10 @@ void TSchemeShard::Die(const TActorContext &ctx) {
ctx.Send(TxAllocatorClient, new TEvents::TEvPoisonPill());
ctx.Send(SysPartitionStatsCollector, new TEvents::TEvPoisonPill());
+ if (SVPMigrator) {
+ ctx.Send(SVPMigrator, new TEvents::TEvPoisonPill());
+ }
+
ShardDeleter.Shutdown(ctx);
ParentDomainLink.Shutdown(ctx);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index e91e33afe4..d7cf400f92 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -265,6 +265,8 @@ public:
TActorId SysPartitionStatsCollector;
+ TActorId SVPMigrator;
+
TDuration StatsMaxExecuteTime;
TDuration StatsBatchTimeout;
ui32 StatsMaxBatchSize = 0;
diff --git a/ydb/core/tx/schemeshard/schemeshard_svp_migration.cpp b/ydb/core/tx/schemeshard/schemeshard_svp_migration.cpp
new file mode 100644
index 0000000000..ba76703036
--- /dev/null
+++ b/ydb/core/tx/schemeshard/schemeshard_svp_migration.cpp
@@ -0,0 +1,169 @@
+#include "schemeshard_svp_migration.h"
+#include "schemeshard_impl.h"
+
+#include <ydb/core/tx/tx_proxy/proxy.h>
+
+namespace NKikimr::NSchemeShard {
+
+class TSVPMigrator : public TActorBootstrapped<TSVPMigrator> {
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::SCHEMESHARD_SVP_MIGRATOR;
+ }
+
+ TSVPMigrator(ui64 ssTabletId, TActorId ssActorId, std::queue<TSVPMigrationInfo>&& migrations)
+ : SSTabletId(ssTabletId)
+ , SSActorId(ssActorId)
+ , Queue(std::move(migrations))
+ {}
+
+ void Bootstrap() {
+ Schedule(TDuration::Seconds(15), new TEvents::TEvWakeup);
+ Become(&TSVPMigrator::StateWork);
+ }
+
+ STFUNC(StateWork) {
+ switch(ev->GetTypeRewrite()) {
+ hFunc(TEvents::TEvWakeup, Handle);
+ hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle)
+ hFunc(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle);
+ hFunc(TEvSchemeShard::TEvNotifyTxCompletionResult, Handle);
+ IgnoreFunc(TEvSchemeShard::TEvNotifyTxCompletionRegistered);
+ cFunc(TEvents::TEvPoison::EventType, PassAway);
+ default:
+ LOG_CRIT(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "TSVPMigrator StateWork unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());
+ }
+ }
+
+private:
+ void RequestTxId() {
+ LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "SVPMigrator - send TEvAllocateTxId"
+ << ", working dir " << Current.WorkingDir
+ << ", db name: " << Current.DbName
+ << ", at schemeshard: " << SSTabletId);
+
+ Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId);
+ }
+
+ void SendModifyScheme(ui64 txId) {
+ auto request = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>();
+ auto& record = request->Record;
+ record.SetTxId(txId);
+
+ auto& modifyScheme = *record.AddTransaction();
+ modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterExtSubDomain);
+ modifyScheme.SetWorkingDir(Current.WorkingDir);
+ modifyScheme.SetFailOnExist(false);
+
+ auto& modifySubDomain = *modifyScheme.MutableSubDomain();
+ modifySubDomain.SetName(Current.DbName);
+ modifySubDomain.SetExternalSysViewProcessor(true);
+
+ LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "SVPMigrator - send TEvModifySchemeTransaction"
+ << ", working dir " << Current.WorkingDir
+ << ", db name: " << Current.DbName
+ << ", at schemeshard: " << SSTabletId);
+
+ Send(SSActorId, request.Release());
+ }
+
+ void SubscribeToCompletion(ui64 txId) {
+ auto request = MakeHolder<TEvSchemeShard::TEvNotifyTxCompletion>();
+ request->Record.SetTxId(txId);
+
+ LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "SVPMigrator - send TEvNotifyTxCompletion"
+ << ", txId " << txId
+ << ", at schemeshard: " << SSTabletId);
+
+ Send(SSActorId, request.Release());
+ }
+
+ void StartNextMigration() {
+ Current = {};
+ if (Queue.empty()) {
+ PassAway();
+ return;
+ }
+ Current = std::move(Queue.front());
+ Queue.pop();
+ RequestTxId();
+ }
+
+ void Handle(TEvents::TEvWakeup::TPtr&) {
+ LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "SVPMigrator - start processing migrations"
+ << ", queue size: " << Queue.size()
+ << ", at schemeshard: " << SSTabletId);
+
+ StartNextMigration();
+ }
+
+ void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
+ auto txId = ev->Get()->TxId;
+
+ LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "SVPMigrator - handle TEvAllocateTxIdResult"
+ << ", txId: " << txId
+ << ", at schemeshard: " << SSTabletId);
+
+ SendModifyScheme(txId);
+ }
+
+ void Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev) {
+ auto& record = ev->Get()->Record;
+ auto status = record.GetStatus();
+ auto txId = record.GetTxId();
+
+ LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "SVPMigrator - handle TEvModifySchemeTransactionResult"
+ << ", status: " << status
+ << ", txId: " << txId
+ << ", at schemeshard: " << SSTabletId);
+
+ switch (status) {
+ case NKikimrScheme::StatusSuccess:
+ StartNextMigration();
+ break;
+ case NKikimrScheme::StatusAccepted:
+ SubscribeToCompletion(record.GetTxId());
+ break;
+ default:
+ LOG_ERROR_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "SVPMigrator - migration failed"
+ << ", status: " << status
+ << ", reason: " << record.GetReason()
+ << ", txId: " << txId
+ << ", at schemeshard: " << SSTabletId);
+
+ StartNextMigration();
+ break;
+ }
+ }
+
+ void Handle(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) {
+ LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "SVPMigrator - handle TEvNotifyTxCompletionResult"
+ << ", txId: " << ev->Get()->Record.GetTxId()
+ << ", at schemeshard: " << SSTabletId);
+
+ StartNextMigration();
+ }
+
+private:
+ const ui64 SSTabletId;
+ const TActorId SSActorId;
+ std::queue<TSVPMigrationInfo> Queue;
+ TSVPMigrationInfo Current;
+};
+
+THolder<IActor> CreateSVPMigrator(ui64 ssTabletId, TActorId ssActorId,
+ std::queue<TSVPMigrationInfo>&& migrations)
+{
+ return MakeHolder<TSVPMigrator>(ssTabletId, ssActorId, std::move(migrations));
+}
+
+} // namespace NKikimr::NSchemeShard
diff --git a/ydb/core/tx/schemeshard/schemeshard_svp_migration.h b/ydb/core/tx/schemeshard/schemeshard_svp_migration.h
new file mode 100644
index 0000000000..b71466cd3f
--- /dev/null
+++ b/ydb/core/tx/schemeshard/schemeshard_svp_migration.h
@@ -0,0 +1,16 @@
+#pragma once
+
+#include <library/cpp/actors/core/actor.h>
+#include <queue>
+
+namespace NKikimr::NSchemeShard {
+
+struct TSVPMigrationInfo {
+ TString WorkingDir;
+ TString DbName;
+};
+
+THolder<NActors::IActor> CreateSVPMigrator(ui64 ssTabletId, NActors::TActorId ssActorId,
+ std::queue<TSVPMigrationInfo>&& migrations);
+
+} // namespace NKikimr::NSchemeShard