diff options
author | monster <monster@ydb.tech> | 2023-11-13 12:16:42 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2023-11-13 12:59:33 +0300 |
commit | 8f71613ae4af935f07bce837ef1f9bdd96aca6c1 (patch) | |
tree | 12f02678a9199125ca4ef865304bad8622403ddf | |
parent | fb9621192115f1bbd72f93954ec2f66f628a0c84 (diff) | |
download | ydb-8f71613ae4af935f07bce837ef1f9bdd96aca6c1.tar.gz |
introduce Statistics Aggregator tablet KIKIMR-19862
71 files changed, 918 insertions, 9 deletions
diff --git a/.mapping.json b/.mapping.json index 2297dce1d81..5735b225fdd 100644 --- a/.mapping.json +++ b/.mapping.json @@ -5136,6 +5136,11 @@ "ydb/core/statistics/CMakeLists.linux-x86_64.txt":"", "ydb/core/statistics/CMakeLists.txt":"", "ydb/core/statistics/CMakeLists.windows-x86_64.txt":"", + "ydb/core/statistics/aggregator/CMakeLists.darwin-x86_64.txt":"", + "ydb/core/statistics/aggregator/CMakeLists.linux-aarch64.txt":"", + "ydb/core/statistics/aggregator/CMakeLists.linux-x86_64.txt":"", + "ydb/core/statistics/aggregator/CMakeLists.txt":"", + "ydb/core/statistics/aggregator/CMakeLists.windows-x86_64.txt":"", "ydb/core/statistics/ut/CMakeLists.darwin-x86_64.txt":"", "ydb/core/statistics/ut/CMakeLists.linux-aarch64.txt":"", "ydb/core/statistics/ut/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/core/cms/console/console__create_tenant.cpp b/ydb/core/cms/console/console__create_tenant.cpp index ff91c0e4ffe..11728050cea 100644 --- a/ydb/core/cms/console/console__create_tenant.cpp +++ b/ydb/core/cms/console/console__create_tenant.cpp @@ -126,6 +126,7 @@ public: Tenant->IsExternalSubdomain = Self->FeatureFlags.GetEnableExternalSubdomains(); Tenant->IsExternalHive = Self->FeatureFlags.GetEnableExternalHive(); Tenant->IsExternalSysViewProcessor = Self->FeatureFlags.GetEnableSystemViews(); + Tenant->IsExternalStatisticsAggregator = Self->FeatureFlags.GetEnableStatistics(); if (rec.options().disable_external_subdomain()) { Tenant->IsExternalSubdomain = false; @@ -143,10 +144,12 @@ public: Tenant->IsExternalSubdomain = false; Tenant->IsExternalHive = false; Tenant->IsExternalSysViewProcessor = false; + Tenant->IsExternalStatisticsAggregator = false; } Tenant->IsExternalHive &= Tenant->IsExternalSubdomain; // external hive without external sub domain is pointless Tenant->IsExternalSysViewProcessor &= Tenant->IsExternalSubdomain; + Tenant->IsExternalStatisticsAggregator &= Tenant->IsExternalSubdomain; Tenant->StorageUnitsQuota = Self->Config.DefaultStorageUnitsQuota; Tenant->ComputationalUnitsQuota = Self->Config.DefaultComputationalUnitsQuota; diff --git a/ydb/core/cms/console/console__scheme.h b/ydb/core/cms/console/console__scheme.h index 18208f6d8f6..3516751b3fb 100644 --- a/ydb/core/cms/console/console__scheme.h +++ b/ydb/core/cms/console/console__scheme.h @@ -47,13 +47,14 @@ struct Schema : NIceDb::Schema { struct CreateIdempotencyKey : Column<25, NScheme::NTypeIds::Utf8> {}; struct AlterIdempotencyKey : Column<26, NScheme::NTypeIds::Utf8> {}; struct DatabaseQuotas : Column<27, NScheme::NTypeIds::String> {}; + struct IsExternalStatisticsAggregator : Column<28, NScheme::NTypeIds::Bool> {}; using TKey = TableKey<Path>; using TColumns = TableColumns<Path, State, Coordinators, Mediators, PlanResolution, Issue, TxId, UserToken, SubdomainVersion, ConfirmedSubdomain, TimeCastBucketsPerMediator, Attributes, Generation, SchemeShardId, PathId, ErrorCode, IsExternalSubDomain, IsExternalHive, AreResourcesShared, SharedDomainSchemeShardId, SharedDomainPathId, IsExternalSysViewProcessor, - SchemaOperationQuotas, CreateIdempotencyKey, AlterIdempotencyKey, DatabaseQuotas>; + SchemaOperationQuotas, CreateIdempotencyKey, AlterIdempotencyKey, DatabaseQuotas, IsExternalStatisticsAggregator>; }; struct TenantPools : Table<3> { diff --git a/ydb/core/cms/console/console_tenants_manager.cpp b/ydb/core/cms/console/console_tenants_manager.cpp index 31b5c001be3..955e8926f13 100644 --- a/ydb/core/cms/console/console_tenants_manager.cpp +++ b/ydb/core/cms/console/console_tenants_manager.cpp @@ -457,6 +457,9 @@ public: if (Tenant->IsExternalSysViewProcessor) { subdomain.SetExternalSysViewProcessor(true); } + if (Tenant->IsExternalStatisticsAggregator) { + subdomain.SetExternalStatisticsAggregator(true); + } } if (SharedTenant) { @@ -479,6 +482,9 @@ public: if (Tenant->IsExternalSysViewProcessor) { subdomain.SetExternalSysViewProcessor(true); } + if (Tenant->IsExternalStatisticsAggregator) { + subdomain.SetExternalStatisticsAggregator(true); + } } if (tablets) { subdomain.SetCoordinators(Tenant->Coordinators); @@ -1188,6 +1194,7 @@ TTenantsManager::TTenant::TTenant(const TString &path, , IsExternalSubdomain(false) , IsExternalHive(false) , IsExternalSysViewProcessor(false) + , IsExternalStatisticsAggregator(false) , AreResourcesShared(false) { } @@ -2265,6 +2272,7 @@ void TTenantsManager::DbAddTenant(TTenant::TPtr tenant, << " isExternalSubDomain=" << tenant->IsExternalSubdomain << " isExternalHive=" << tenant->IsExternalHive << " isExternalSysViewProcessor=" << tenant->IsExternalSysViewProcessor + << " isExternalStatisticsAggregator=" << tenant->IsExternalStatisticsAggregator << " areResourcesShared=" << tenant->AreResourcesShared << " sharedDomainId=" << tenant->SharedDomainId); @@ -2286,6 +2294,7 @@ void TTenantsManager::DbAddTenant(TTenant::TPtr tenant, NIceDb::TUpdate<Schema::Tenants::IsExternalSubDomain>(tenant->IsExternalSubdomain), NIceDb::TUpdate<Schema::Tenants::IsExternalHive>(tenant->IsExternalHive), NIceDb::TUpdate<Schema::Tenants::IsExternalSysViewProcessor>(tenant->IsExternalSysViewProcessor), + NIceDb::TUpdate<Schema::Tenants::IsExternalStatisticsAggregator>(tenant->IsExternalStatisticsAggregator), NIceDb::TUpdate<Schema::Tenants::AreResourcesShared>(tenant->AreResourcesShared), NIceDb::TUpdate<Schema::Tenants::CreateIdempotencyKey>(tenant->CreateIdempotencyKey)); @@ -2390,6 +2399,7 @@ bool TTenantsManager::DbLoadState(TTransactionContext &txc, const TActorContext bool isExternalSubDomain = tenantRowset.GetValueOrDefault<Schema::Tenants::IsExternalSubDomain>(false); bool isExternalHive = tenantRowset.GetValueOrDefault<Schema::Tenants::IsExternalHive>(false); bool isExternalSysViewProcessor = tenantRowset.GetValueOrDefault<Schema::Tenants::IsExternalSysViewProcessor>(false); + bool isExternalStatisticsAggregator = tenantRowset.GetValueOrDefault<Schema::Tenants::IsExternalStatisticsAggregator>(false); const bool areResourcesShared = tenantRowset.GetValueOrDefault<Schema::Tenants::AreResourcesShared>(false); TTenant::TPtr tenant = new TTenant(path, state, userToken); @@ -2409,6 +2419,7 @@ bool TTenantsManager::DbLoadState(TTransactionContext &txc, const TActorContext tenant->IsExternalSubdomain = isExternalSubDomain; tenant->IsExternalHive = isExternalHive; tenant->IsExternalSysViewProcessor = isExternalSysViewProcessor; + tenant->IsExternalStatisticsAggregator = isExternalStatisticsAggregator; tenant->AreResourcesShared = areResourcesShared; if (tenantRowset.HaveValue<Schema::Tenants::SchemaOperationQuotas>()) { diff --git a/ydb/core/cms/console/console_tenants_manager.h b/ydb/core/cms/console/console_tenants_manager.h index 4d4233813a2..6da937eb614 100644 --- a/ydb/core/cms/console/console_tenants_manager.h +++ b/ydb/core/cms/console/console_tenants_manager.h @@ -527,6 +527,7 @@ public: bool IsExternalSubdomain; bool IsExternalHive; bool IsExternalSysViewProcessor; + bool IsExternalStatisticsAggregator; bool AreResourcesShared; THashSet<TTenant::TPtr> HostedTenants; diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt index 8439de3a0c9..5439933cac8 100644 --- a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt @@ -90,6 +90,7 @@ target_link_libraries(run PUBLIC ydb-core-scheme_types ydb-core-security ydb-core-statistics + core-statistics-aggregator core-sys_view-processor core-sys_view-service ydb-core-tablet diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt index fb358d77d5c..2e3e643f790 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt @@ -91,6 +91,7 @@ target_link_libraries(run PUBLIC ydb-core-scheme_types ydb-core-security ydb-core-statistics + core-statistics-aggregator core-sys_view-processor core-sys_view-service ydb-core-tablet diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt index fb358d77d5c..2e3e643f790 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt @@ -91,6 +91,7 @@ target_link_libraries(run PUBLIC ydb-core-scheme_types ydb-core-security ydb-core-statistics + core-statistics-aggregator core-sys_view-processor core-sys_view-service ydb-core-tablet diff --git a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt index 8439de3a0c9..5439933cac8 100644 --- a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt @@ -90,6 +90,7 @@ target_link_libraries(run PUBLIC ydb-core-scheme_types ydb-core-security ydb-core-statistics + core-statistics-aggregator core-sys_view-processor core-sys_view-service ydb-core-tablet diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 13d147c1499..1b519ba80b3 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -119,6 +119,7 @@ #include <ydb/core/sys_view/processor/processor.h> #include <ydb/core/sys_view/service/sysview_service.h> #include <ydb/core/statistics/stat_service.h> +#include <ydb/core/statistics/aggregator/aggregator.h> #include <ydb/core/tablet/bootstrapper.h> #include <ydb/core/tablet/node_tablet_monitor.h> @@ -1075,6 +1076,7 @@ void TLocalServiceInitializer::InitializeServices( addToLocalConfig(TTabletTypes::SequenceShard, &NSequenceShard::CreateSequenceShard, TMailboxType::ReadAsFilled, appData->UserPoolId); addToLocalConfig(TTabletTypes::ReplicationController, &NReplication::CreateController, TMailboxType::ReadAsFilled, appData->UserPoolId); addToLocalConfig(TTabletTypes::BlobDepot, &NBlobDepot::CreateBlobDepot, TMailboxType::ReadAsFilled, appData->UserPoolId); + addToLocalConfig(TTabletTypes::StatisticsAggregator, &NStat::CreateStatisticsAggregator, TMailboxType::ReadAsFilled, appData->UserPoolId); TTenantPoolConfig::TPtr tenantPoolConfig = new TTenantPoolConfig(Config.GetTenantPoolConfig(), localConfig); diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make index cf75b16e98b..80560dd8d1e 100644 --- a/ydb/core/driver_lib/run/ya.make +++ b/ydb/core/driver_lib/run/ya.make @@ -99,6 +99,7 @@ PEERDIR( ydb/core/scheme_types ydb/core/security ydb/core/statistics + ydb/core/statistics/aggregator ydb/core/sys_view/processor ydb/core/sys_view/service ydb/core/tablet diff --git a/ydb/core/mind/configured_tablet_bootstrapper.cpp b/ydb/core/mind/configured_tablet_bootstrapper.cpp index 4793f84cbd1..00e5c8da92e 100644 --- a/ydb/core/mind/configured_tablet_bootstrapper.cpp +++ b/ydb/core/mind/configured_tablet_bootstrapper.cpp @@ -23,6 +23,7 @@ #include <ydb/core/sys_view/processor/processor.h> #include <ydb/core/test_tablet/test_tablet.h> #include <ydb/core/blob_depot/blob_depot.h> +#include <ydb/core/statistics/aggregator/aggregator.h> #include <library/cpp/actors/core/hfunc.h> @@ -227,6 +228,9 @@ TIntrusivePtr<TTabletSetupInfo> MakeTabletSetupInfo( case TTabletTypes::BlobDepot: createFunc = &NBlobDepot::CreateBlobDepot; break; + case TTabletTypes::StatisticsAggregator: + createFunc = &NStat::CreateStatisticsAggregator; + break; default: return nullptr; } diff --git a/ydb/core/mind/hive/hive_statics.cpp b/ydb/core/mind/hive/hive_statics.cpp index 73abd086d26..b6ec140a1f4 100644 --- a/ydb/core/mind/hive/hive_statics.cpp +++ b/ydb/core/mind/hive/hive_statics.cpp @@ -384,7 +384,7 @@ void MakeTabletTypeSet(std::vector<TTabletTypes::EType>& list) { bool IsValidTabletType(TTabletTypes::EType type) { return (type > TTabletTypes::Unknown - && type < TTabletTypes::Reserved40 + && type < TTabletTypes::Reserved41 ); } diff --git a/ydb/core/mind/hive/monitoring.cpp b/ydb/core/mind/hive/monitoring.cpp index b20741d0403..18e885f3cd7 100644 --- a/ydb/core/mind/hive/monitoring.cpp +++ b/ydb/core/mind/hive/monitoring.cpp @@ -1278,6 +1278,8 @@ public: return "RC"; case TTabletTypes::BlobDepot: return "BD"; + case TTabletTypes::StatisticsAggregator: + return "SA"; default: return Sprintf("%d", (int)type); } diff --git a/ydb/core/protos/CMakeLists.darwin-x86_64.txt b/ydb/core/protos/CMakeLists.darwin-x86_64.txt index 265534e9811..2f2a0766360 100644 --- a/ydb/core/protos/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/protos/CMakeLists.darwin-x86_64.txt @@ -1574,6 +1574,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1656,6 +1668,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_replication.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_schemeshard.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_sequenceshard.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_statistics_aggregator.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_sysview_processor.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_testshard.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_tx_proxy.proto diff --git a/ydb/core/protos/CMakeLists.linux-aarch64.txt b/ydb/core/protos/CMakeLists.linux-aarch64.txt index b40a96debba..cff2686486b 100644 --- a/ydb/core/protos/CMakeLists.linux-aarch64.txt +++ b/ydb/core/protos/CMakeLists.linux-aarch64.txt @@ -1574,6 +1574,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1657,6 +1669,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_replication.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_schemeshard.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_sequenceshard.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_statistics_aggregator.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_sysview_processor.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_testshard.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_tx_proxy.proto diff --git a/ydb/core/protos/CMakeLists.linux-x86_64.txt b/ydb/core/protos/CMakeLists.linux-x86_64.txt index b40a96debba..cff2686486b 100644 --- a/ydb/core/protos/CMakeLists.linux-x86_64.txt +++ b/ydb/core/protos/CMakeLists.linux-x86_64.txt @@ -1574,6 +1574,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1657,6 +1669,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_replication.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_schemeshard.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_sequenceshard.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_statistics_aggregator.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_sysview_processor.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_testshard.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_tx_proxy.proto diff --git a/ydb/core/protos/CMakeLists.windows-x86_64.txt b/ydb/core/protos/CMakeLists.windows-x86_64.txt index 265534e9811..2f2a0766360 100644 --- a/ydb/core/protos/CMakeLists.windows-x86_64.txt +++ b/ydb/core/protos/CMakeLists.windows-x86_64.txt @@ -1574,6 +1574,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1656,6 +1668,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_replication.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_schemeshard.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_sequenceshard.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_statistics_aggregator.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_sysview_processor.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_testshard.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_tx_proxy.proto diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 73cf72a9cf3..9b0fb679496 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -186,6 +186,8 @@ enum ESimpleCounters { COUNTER_PQ_STATS_QUEUE_SIZE = 151 [(CounterOpts) = {Name: "PQStatsQueueSize"}]; COUNTER_DISK_SPACE_TOPICS_TOTAL_BYTES = 152 [(CounterOpts) = {Name: "DiskSpaceTopicsTotalBytes"}]; + + COUNTER_STATISTICS_AGGREGATOR_COUNT = 153 [(CounterOpts) = {Name: "StatisticsAggregatorCount"}]; } enum ECumulativeCounters { diff --git a/ydb/core/protos/counters_statistics_aggregator.proto b/ydb/core/protos/counters_statistics_aggregator.proto new file mode 100644 index 00000000000..ba44b3a2459 --- /dev/null +++ b/ydb/core/protos/counters_statistics_aggregator.proto @@ -0,0 +1,13 @@ +import "ydb/core/protos/counters.proto"; + +package NKikimr.NStat; + +option java_package = "ru.yandex.kikimr.proto"; + +option (TabletTypeName) = "StatisticsAggregator"; + +enum ETxTypes { + TXTYPE_INIT_SCHEMA = 0 [(TxTypeOpts) = {Name: "TxInitSchema"}]; + TXTYPE_INIT = 1 [(TxTypeOpts) = {Name: "TxInit"}]; + TXTYPE_CONFIGURE = 2 [(TxTypeOpts) = {Name: "TxConfigure"}]; +} diff --git a/ydb/core/protos/flat_tx_scheme.proto b/ydb/core/protos/flat_tx_scheme.proto index ca31a4039da..aec23c52932 100644 --- a/ydb/core/protos/flat_tx_scheme.proto +++ b/ydb/core/protos/flat_tx_scheme.proto @@ -354,6 +354,7 @@ message TEvSyncTenantSchemeShard { optional uint64 TenantHive = 8; optional uint64 TenantSysViewProcessor = 9; + optional uint64 TenantStatisticsAggregator = 11; optional string TenantRootACL = 10; } @@ -374,6 +375,7 @@ message TEvUpdateTenantSchemeShard { optional uint64 TenantHive = 10; optional uint64 TenantSysViewProcessor = 11; + optional uint64 TenantStatisticsAggregator = 16; optional NKikimrSubDomains.TSchemeQuotas DeclaredSchemeQuotas = 12; optional Ydb.Cms.DatabaseQuotas DatabaseQuotas = 14; diff --git a/ydb/core/protos/statistics.proto b/ydb/core/protos/statistics.proto index 539f69c34e1..05e71ed292d 100644 --- a/ydb/core/protos/statistics.proto +++ b/ydb/core/protos/statistics.proto @@ -18,3 +18,7 @@ message TEvRegisterNode { optional uint32 NodeId = 1; optional bool HasStatistics = 2; } + +message TEvConfigureAggregator { + optional string Database = 1; +} diff --git a/ydb/core/protos/subdomains.proto b/ydb/core/protos/subdomains.proto index 83490ca6d26..88e6d0f564c 100644 --- a/ydb/core/protos/subdomains.proto +++ b/ydb/core/protos/subdomains.proto @@ -23,6 +23,7 @@ message TSubDomainSettings { optional TSchemeQuotas DeclaredSchemeQuotas = 11; optional Ydb.Cms.DatabaseQuotas DatabaseQuotas = 12; optional TAuditSettings AuditSettings = 13; + optional bool ExternalStatisticsAggregator = 14 [default = false]; } message TProcessingParams { @@ -35,6 +36,7 @@ message TProcessingParams { optional fixed64 SchemeShard = 6; optional fixed64 Hive = 7; optional fixed64 SysViewProcessor = 8; + optional fixed64 StatisticsAggregator = 10; //put there SubSchemeShard and SubHive at the future diff --git a/ydb/core/protos/tablet.proto b/ydb/core/protos/tablet.proto index 43ad30b1836..1723064942e 100644 --- a/ydb/core/protos/tablet.proto +++ b/ydb/core/protos/tablet.proto @@ -47,14 +47,15 @@ message TTabletTypes { SequenceShard = 37; ReplicationController = 38; BlobDepot = 39; + StatisticsAggregator = 40; // when adding a new tablet type and keeping parse compatibility with the old version // rename existing reserved item to desired one, and add new reserved item to // the end of reserved list - Reserved40 = 40; Reserved41 = 41; Reserved42 = 42; Reserved43 = 43; + Reserved44 = 44; UserTypeStart = 255; TypeInvalid = -1; diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make index e1ce700db75..d853fa20961 100644 --- a/ydb/core/protos/ya.make +++ b/ydb/core/protos/ya.make @@ -51,6 +51,7 @@ SRCS( counters_replication.proto counters_schemeshard.proto counters_sequenceshard.proto + counters_statistics_aggregator.proto counters_sysview_processor.proto counters_testshard.proto counters_tx_proxy.proto diff --git a/ydb/core/statistics/CMakeLists.darwin-x86_64.txt b/ydb/core/statistics/CMakeLists.darwin-x86_64.txt index 120d373be38..ecb766d6e7f 100644 --- a/ydb/core/statistics/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/statistics/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(aggregator) add_subdirectory(ut) add_library(ydb-core-statistics) diff --git a/ydb/core/statistics/CMakeLists.linux-aarch64.txt b/ydb/core/statistics/CMakeLists.linux-aarch64.txt index 5dfcc3622af..685d0f05d12 100644 --- a/ydb/core/statistics/CMakeLists.linux-aarch64.txt +++ b/ydb/core/statistics/CMakeLists.linux-aarch64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(aggregator) add_subdirectory(ut) add_library(ydb-core-statistics) diff --git a/ydb/core/statistics/CMakeLists.linux-x86_64.txt b/ydb/core/statistics/CMakeLists.linux-x86_64.txt index 5dfcc3622af..685d0f05d12 100644 --- a/ydb/core/statistics/CMakeLists.linux-x86_64.txt +++ b/ydb/core/statistics/CMakeLists.linux-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(aggregator) add_subdirectory(ut) add_library(ydb-core-statistics) diff --git a/ydb/core/statistics/CMakeLists.windows-x86_64.txt b/ydb/core/statistics/CMakeLists.windows-x86_64.txt index 120d373be38..ecb766d6e7f 100644 --- a/ydb/core/statistics/CMakeLists.windows-x86_64.txt +++ b/ydb/core/statistics/CMakeLists.windows-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(aggregator) add_subdirectory(ut) add_library(ydb-core-statistics) diff --git a/ydb/core/statistics/aggregator/CMakeLists.darwin-x86_64.txt b/ydb/core/statistics/aggregator/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..fe736bf693e --- /dev/null +++ b/ydb/core/statistics/aggregator/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,30 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-statistics-aggregator) +target_compile_options(core-statistics-aggregator PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-statistics-aggregator PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-base + core-engine-minikql + ydb-core-protos + ydb-core-tablet + ydb-core-tablet_flat +) +target_sources(core-statistics-aggregator PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/aggregator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/aggregator_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/schema.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/tx_configure.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/tx_init.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/tx_init_schema.cpp +) diff --git a/ydb/core/statistics/aggregator/CMakeLists.linux-aarch64.txt b/ydb/core/statistics/aggregator/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..5b8294cd590 --- /dev/null +++ b/ydb/core/statistics/aggregator/CMakeLists.linux-aarch64.txt @@ -0,0 +1,31 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-statistics-aggregator) +target_compile_options(core-statistics-aggregator PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-statistics-aggregator PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-base + core-engine-minikql + ydb-core-protos + ydb-core-tablet + ydb-core-tablet_flat +) +target_sources(core-statistics-aggregator PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/aggregator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/aggregator_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/schema.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/tx_configure.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/tx_init.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/tx_init_schema.cpp +) diff --git a/ydb/core/statistics/aggregator/CMakeLists.linux-x86_64.txt b/ydb/core/statistics/aggregator/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..5b8294cd590 --- /dev/null +++ b/ydb/core/statistics/aggregator/CMakeLists.linux-x86_64.txt @@ -0,0 +1,31 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-statistics-aggregator) +target_compile_options(core-statistics-aggregator PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-statistics-aggregator PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-base + core-engine-minikql + ydb-core-protos + ydb-core-tablet + ydb-core-tablet_flat +) +target_sources(core-statistics-aggregator PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/aggregator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/aggregator_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/schema.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/tx_configure.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/tx_init.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/tx_init_schema.cpp +) diff --git a/ydb/core/statistics/aggregator/CMakeLists.txt b/ydb/core/statistics/aggregator/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/statistics/aggregator/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/statistics/aggregator/CMakeLists.windows-x86_64.txt b/ydb/core/statistics/aggregator/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..fe736bf693e --- /dev/null +++ b/ydb/core/statistics/aggregator/CMakeLists.windows-x86_64.txt @@ -0,0 +1,30 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-statistics-aggregator) +target_compile_options(core-statistics-aggregator PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-statistics-aggregator PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-base + core-engine-minikql + ydb-core-protos + ydb-core-tablet + ydb-core-tablet_flat +) +target_sources(core-statistics-aggregator PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/aggregator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/aggregator_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/schema.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/tx_configure.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/tx_init.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/statistics/aggregator/tx_init_schema.cpp +) diff --git a/ydb/core/statistics/aggregator/aggregator.cpp b/ydb/core/statistics/aggregator/aggregator.cpp new file mode 100644 index 00000000000..9ec84bfe991 --- /dev/null +++ b/ydb/core/statistics/aggregator/aggregator.cpp @@ -0,0 +1,11 @@ +#include "aggregator.h" + +#include "aggregator_impl.h" + +namespace NKikimr::NStat { + +IActor* CreateStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info) { + return new TStatisticsAggregator(tablet, info); +} + +} // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/aggregator.h b/ydb/core/statistics/aggregator/aggregator.h new file mode 100644 index 00000000000..46fa18365ae --- /dev/null +++ b/ydb/core/statistics/aggregator/aggregator.h @@ -0,0 +1,10 @@ +#pragma once + +#include <library/cpp/actors/core/actor.h> +#include <ydb/core/base/blobstorage.h> + +namespace NKikimr::NStat { + +IActor* CreateStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info); + +} // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/aggregator_impl.cpp b/ydb/core/statistics/aggregator/aggregator_impl.cpp new file mode 100644 index 00000000000..f1556276f43 --- /dev/null +++ b/ydb/core/statistics/aggregator/aggregator_impl.cpp @@ -0,0 +1,60 @@ +#include "aggregator_impl.h" + +#include <ydb/core/engine/minikql/flat_local_tx_factory.h> + +#include <library/cpp/monlib/service/pages/templates.h> + +namespace NKikimr::NStat { + +TStatisticsAggregator::TStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info) + : TActor(&TThis::StateInit) + , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory) +{} + +void TStatisticsAggregator::OnDetach(const TActorContext& ctx) { + Die(ctx); +} + +void TStatisticsAggregator::OnTabletDead(TEvTablet::TEvTabletDead::TPtr&, const TActorContext& ctx) { + Die(ctx); +} + +void TStatisticsAggregator::OnActivateExecutor(const TActorContext& ctx) { + SA_LOG_I("[" << TabletID() << "] OnActivateExecutor"); + + Execute(CreateTxInitSchema(), ctx); +} + +void TStatisticsAggregator::DefaultSignalTabletActive(const TActorContext& ctx) { + Y_UNUSED(ctx); +} + +void TStatisticsAggregator::Handle(TEvPrivate::TEvProcess::TPtr&) { + SA_LOG_D("[" << TabletID() << "] Handle TEvPrivate::TEvProcess"); +} + +void TStatisticsAggregator::PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value) { + db.Table<Schema::SysParams>().Key(id).Update( + NIceDb::TUpdate<Schema::SysParams::Value>(value)); +} + +bool TStatisticsAggregator::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, + const TActorContext& ctx) +{ + if (!ev) { + return true; + } + + TStringStream str; + HTML(str) { + PRE() { + str << "---- StatisticsAggregator ----" << Endl << Endl; + str << "Database: " << Database << Endl; + } + } + + ctx.Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(str.Str())); + return true; +} + +} // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/aggregator_impl.h b/ydb/core/statistics/aggregator/aggregator_impl.h new file mode 100644 index 00000000000..18460fb4486 --- /dev/null +++ b/ydb/core/statistics/aggregator/aggregator_impl.h @@ -0,0 +1,85 @@ +#pragma once + +#include "schema.h" + +#include <ydb/core/protos/statistics.pb.h> +#include <ydb/core/protos/counters_statistics_aggregator.pb.h> + +#include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/statistics/common.h> +#include <ydb/core/statistics/events.h> + +#include <ydb/core/tablet_flat/tablet_flat_executed.h> + +namespace NKikimr::NStat { + +class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabletFlatExecutor::TTabletExecutedFlat { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::STATISTICS_AGGREGATOR; + } + + TStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info); + +private: + using Schema = TAggregatorSchema; + using TTxBase = NTabletFlatExecutor::TTransactionBase<TStatisticsAggregator>; + + struct TTxInitSchema; + struct TTxInit; + struct TTxConfigure; + + struct TEvPrivate { + enum EEv { + EvProcess = EventSpaceBegin(TEvents::ES_PRIVATE), + + EvEnd + }; + + struct TEvProcess : public TEventLocal<TEvProcess, EvProcess> {}; + }; + +private: + void OnDetach(const TActorContext& ctx) override; + void OnTabletDead(TEvTablet::TEvTabletDead::TPtr& ev, const TActorContext& ctx) override; + void OnActivateExecutor(const TActorContext& ctx) override; + void DefaultSignalTabletActive(const TActorContext& ctx) override; + bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext &ctx) override; + + NTabletFlatExecutor::ITransaction* CreateTxInitSchema(); + NTabletFlatExecutor::ITransaction* CreateTxInit(); + + void Handle(TEvStatistics::TEvConfigureAggregator::TPtr& ev); + void Handle(TEvPrivate::TEvProcess::TPtr& ev); + + void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value); + + STFUNC(StateInit) { + switch(ev->GetTypeRewrite()) { + hFunc(TEvStatistics::TEvConfigureAggregator, Handle); + IgnoreFunc(TEvPrivate::TEvProcess); + default: + if (!HandleDefaultEvents(ev, SelfId())) { + LOG_CRIT(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, + "TStatisticsAggregator StateInit unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); + } + } + } + + STFUNC(StateWork) { + switch(ev->GetTypeRewrite()) { + hFunc(TEvStatistics::TEvConfigureAggregator, Handle); + hFunc(TEvPrivate::TEvProcess, Handle); + default: + if (!HandleDefaultEvents(ev, SelfId())) { + LOG_CRIT(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, + "TStatisticsAggregator StateWork unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); + } + } + } + +private: + TString Database; +}; + +} // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/schema.cpp b/ydb/core/statistics/aggregator/schema.cpp new file mode 100644 index 00000000000..e9fd222405c --- /dev/null +++ b/ydb/core/statistics/aggregator/schema.cpp @@ -0,0 +1 @@ +#include "schema.h" diff --git a/ydb/core/statistics/aggregator/schema.h b/ydb/core/statistics/aggregator/schema.h new file mode 100644 index 00000000000..a0a616746dd --- /dev/null +++ b/ydb/core/statistics/aggregator/schema.h @@ -0,0 +1,28 @@ +#pragma once + +#include <ydb/core/tablet_flat/flat_cxx_database.h> + +namespace NKikimr::NStat { + +struct TAggregatorSchema : NIceDb::Schema { + struct SysParams : Table<1> { + struct Id : Column<1, NScheme::NTypeIds::Uint64> {}; + struct Value : Column<2, NScheme::NTypeIds::Utf8> {}; + + using TKey = TableKey<Id>; + using TColumns = TableColumns<Id, Value>; + }; + + using TTables = SchemaTables< + SysParams + >; + + using TSettings = SchemaSettings< + ExecutorLogBatching<true>, + ExecutorLogFlushPeriod<512> + >; + + static constexpr ui64 SysParam_Database = 1; +}; + +} // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/tx_configure.cpp b/ydb/core/statistics/aggregator/tx_configure.cpp new file mode 100644 index 00000000000..391324b0b01 --- /dev/null +++ b/ydb/core/statistics/aggregator/tx_configure.cpp @@ -0,0 +1,44 @@ +#include "aggregator_impl.h" + +#include <ydb/core/tx/tx.h> + +namespace NKikimr::NStat { + +struct TStatisticsAggregator::TTxConfigure : public TTxBase { + NKikimrStat::TEvConfigureAggregator Record; + TActorId Sender; + + TTxConfigure(TSelf* self, NKikimrStat::TEvConfigureAggregator&& record, const NActors::TActorId& sender) + : TTxBase(self) + , Record(std::move(record)) + , Sender(sender) + {} + + TTxType GetTxType() const override { return TXTYPE_CONFIGURE; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + SA_LOG_D("[" << Self->TabletID() << "] TTxConfigure::Execute: " + << "database# " << Record.GetDatabase()); + + NIceDb::TNiceDb db(txc.DB); + + Self->Database = Record.GetDatabase(); + Self->PersistSysParam(db, Schema::SysParam_Database, Self->Database); + return true; + } + + void Complete(const TActorContext& ctx) override { + SA_LOG_D("[" << Self->TabletID() << "] TTxConfigure::Complete"); + + ctx.Send(Sender, new TEvSubDomain::TEvConfigureStatus( + NKikimrTx::TEvSubDomainConfigurationAck::SUCCESS, Self->TabletID())); + } +}; + +void TStatisticsAggregator::Handle(TEvStatistics::TEvConfigureAggregator::TPtr& ev) { + auto& record = ev->Get()->Record; + Execute(new TTxConfigure(this, std::move(record), ev->Sender), + TActivationContext::AsActorContext()); +} + +} // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/tx_init.cpp b/ydb/core/statistics/aggregator/tx_init.cpp new file mode 100644 index 00000000000..c4de87c000f --- /dev/null +++ b/ydb/core/statistics/aggregator/tx_init.cpp @@ -0,0 +1,66 @@ +#include "aggregator_impl.h" + +namespace NKikimr::NStat { + +struct TStatisticsAggregator::TTxInit : public TTxBase { + explicit TTxInit(TSelf* self) + : TTxBase(self) + {} + + TTxType GetTxType() const override { return TXTYPE_INIT; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + SA_LOG_D("[" << Self->TabletID() << "] TTxInit::Execute"); + + NIceDb::TNiceDb db(txc.DB); + + { // precharge + auto sysParamsRowset = db.Table<Schema::SysParams>().Range().Select(); + + if (!sysParamsRowset.IsReady()) { + return false; + } + } + + // SysParams + { + auto rowset = db.Table<Schema::SysParams>().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + ui64 id = rowset.GetValue<Schema::SysParams::Id>(); + TString value = rowset.GetValue<Schema::SysParams::Value>(); + + switch (id) { + case Schema::SysParam_Database: + Self->Database = value; + SA_LOG_D("[" << Self->TabletID() << "] Loading database: " << Self->Database); + break; + default: + SA_LOG_CRIT("[" << Self->TabletID() << "] Unexpected SysParam id: " << id); + } + + if (!rowset.Next()) { + return false; + } + } + } + + return true; + } + + void Complete(const TActorContext& ctx) override { + SA_LOG_D("[" << Self->TabletID() << "] TTxInit::Complete"); + + Self->SignalTabletActive(ctx); + Self->Become(&TThis::StateWork); + } +}; + +NTabletFlatExecutor::ITransaction* TStatisticsAggregator::CreateTxInit() { + return new TTxInit(this); +} + +} // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/tx_init_schema.cpp b/ydb/core/statistics/aggregator/tx_init_schema.cpp new file mode 100644 index 00000000000..2ba69793300 --- /dev/null +++ b/ydb/core/statistics/aggregator/tx_init_schema.cpp @@ -0,0 +1,31 @@ +#include "aggregator_impl.h" + +namespace NKikimr::NStat { + +struct TStatisticsAggregator::TTxInitSchema : public TTxBase { + explicit TTxInitSchema(TSelf* self) + : TTxBase(self) + {} + + TTxType GetTxType() const override { return TXTYPE_INIT_SCHEMA; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + SA_LOG_D("[" << Self->TabletID() << "] TTxInitSchema::Execute"); + + NIceDb::TNiceDb(txc.DB).Materialize<Schema>(); + + return true; + } + + void Complete(const TActorContext& ctx) override { + SA_LOG_D("[" << Self->TabletID() << "] TTxInitSchema::Complete"); + + Self->Execute(Self->CreateTxInit(), ctx); + } +}; + +NTabletFlatExecutor::ITransaction* TStatisticsAggregator::CreateTxInitSchema() { + return new TTxInitSchema(this); +} + +} // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/ya.make b/ydb/core/statistics/aggregator/ya.make new file mode 100644 index 00000000000..1dec5b49538 --- /dev/null +++ b/ydb/core/statistics/aggregator/ya.make @@ -0,0 +1,25 @@ +LIBRARY() + +SRCS( + aggregator.h + aggregator.cpp + aggregator_impl.h + aggregator_impl.cpp + schema.h + schema.cpp + tx_configure.cpp + tx_init.cpp + tx_init_schema.cpp +) + +PEERDIR( + ydb/core/base + ydb/core/engine/minikql + ydb/core/protos + ydb/core/tablet + ydb/core/tablet_flat +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/statistics/common.h b/ydb/core/statistics/common.h new file mode 100644 index 00000000000..09683585f62 --- /dev/null +++ b/ydb/core/statistics/common.h @@ -0,0 +1,9 @@ +#pragma once + +#define SA_LOG_T(stream) LOG_TRACE_S((TlsActivationContext->AsActorContext()), NKikimrServices::STATISTICS, stream) +#define SA_LOG_D(stream) LOG_DEBUG_S((TlsActivationContext->AsActorContext()), NKikimrServices::STATISTICS, stream) +#define SA_LOG_I(stream) LOG_INFO_S((TlsActivationContext->AsActorContext()), NKikimrServices::STATISTICS, stream) +#define SA_LOG_N(stream) LOG_NOTICE_S((TlsActivationContext->AsActorContext()), NKikimrServices::STATISTICS, stream) +#define SA_LOG_W(stream) LOG_WARN_S((TlsActivationContext->AsActorContext()), NKikimrServices::STATISTICS, stream) +#define SA_LOG_E(stream) LOG_ERROR_S((TlsActivationContext->AsActorContext()), NKikimrServices::STATISTICS, stream) +#define SA_LOG_CRIT(stream) LOG_CRIT_S((TlsActivationContext->AsActorContext()), NKikimrServices::STATISTICS, stream) diff --git a/ydb/core/statistics/events.h b/ydb/core/statistics/events.h index a22fff1abab..49dc1750cbb 100644 --- a/ydb/core/statistics/events.h +++ b/ydb/core/statistics/events.h @@ -47,6 +47,8 @@ struct TEvStatistics { EvBroadcastStatistics, EvRegisterNode, + EvConfigureAggregator, + EvEnd }; @@ -70,6 +72,19 @@ struct TEvStatistics { NKikimrStat::TEvRegisterNode, EvRegisterNode> {}; + + struct TEvConfigureAggregator : public TEventPB< + TEvConfigureAggregator, + NKikimrStat::TEvConfigureAggregator, + EvConfigureAggregator> + { + TEvConfigureAggregator() = default; + + explicit TEvConfigureAggregator(const TString& database) { + Record.SetDatabase(database); + } + }; + }; } // NStat diff --git a/ydb/core/statistics/ya.make b/ydb/core/statistics/ya.make index d02d4a00b34..a015998d9ca 100644 --- a/ydb/core/statistics/ya.make +++ b/ydb/core/statistics/ya.make @@ -15,6 +15,10 @@ PEERDIR( END() +RECURSE( + aggregator +) + RECURSE_FOR_TESTS( ut ) diff --git a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt index 374379e37b9..4fe8905c0e7 100644 --- a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt @@ -58,6 +58,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-core-persqueue ydb-core-protos ydb-core-security + core-statistics-aggregator core-sys_view-processor core-sys_view-service core-testlib-actors diff --git a/ydb/core/testlib/CMakeLists.linux-aarch64.txt b/ydb/core/testlib/CMakeLists.linux-aarch64.txt index 43e9c7e52ce..8c9742a39c8 100644 --- a/ydb/core/testlib/CMakeLists.linux-aarch64.txt +++ b/ydb/core/testlib/CMakeLists.linux-aarch64.txt @@ -59,6 +59,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-core-persqueue ydb-core-protos ydb-core-security + core-statistics-aggregator core-sys_view-processor core-sys_view-service core-testlib-actors diff --git a/ydb/core/testlib/CMakeLists.linux-x86_64.txt b/ydb/core/testlib/CMakeLists.linux-x86_64.txt index 43e9c7e52ce..8c9742a39c8 100644 --- a/ydb/core/testlib/CMakeLists.linux-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.linux-x86_64.txt @@ -59,6 +59,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-core-persqueue ydb-core-protos ydb-core-security + core-statistics-aggregator core-sys_view-processor core-sys_view-service core-testlib-actors diff --git a/ydb/core/testlib/CMakeLists.windows-x86_64.txt b/ydb/core/testlib/CMakeLists.windows-x86_64.txt index 374379e37b9..4fe8905c0e7 100644 --- a/ydb/core/testlib/CMakeLists.windows-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.windows-x86_64.txt @@ -58,6 +58,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-core-persqueue ydb-core-protos ydb-core-security + core-statistics-aggregator core-sys_view-processor core-sys_view-service core-testlib-actors diff --git a/ydb/core/testlib/tablet_helpers.cpp b/ydb/core/testlib/tablet_helpers.cpp index c908ca7a52a..4b099e9f3a5 100644 --- a/ydb/core/testlib/tablet_helpers.cpp +++ b/ydb/core/testlib/tablet_helpers.cpp @@ -43,6 +43,7 @@ #include <ydb/core/keyvalue/keyvalue.h> #include <ydb/core/persqueue/pq.h> #include <ydb/core/sys_view/processor/processor.h> +#include <ydb/core/statistics/aggregator/aggregator.h> #include <ydb/core/testlib/basics/storage.h> #include <ydb/core/testlib/basics/appdata.h> @@ -1218,6 +1219,8 @@ namespace NKikimr { bootstrapperActorId = Boot(ctx, type, &NReplication::CreateController, DataGroupErasure); } else if (type == TTabletTypes::PersQueue) { bootstrapperActorId = Boot(ctx, type, &CreatePersQueue, DataGroupErasure); + } else if (type == TTabletTypes::StatisticsAggregator) { + bootstrapperActorId = Boot(ctx, type, &NStat::CreateStatisticsAggregator, DataGroupErasure); } else { status = NKikimrProto::ERROR; } diff --git a/ydb/core/testlib/tenant_runtime.cpp b/ydb/core/testlib/tenant_runtime.cpp index 6b93c818fa6..92f3f0f021d 100644 --- a/ydb/core/testlib/tenant_runtime.cpp +++ b/ydb/core/testlib/tenant_runtime.cpp @@ -24,6 +24,7 @@ #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/sys_view/processor/processor.h> #include <ydb/core/persqueue/pq.h> +#include <ydb/core/statistics/aggregator/aggregator.h> #include <library/cpp/actors/core/interconnect.h> #include <library/cpp/actors/interconnect/interconnect.h> @@ -451,6 +452,8 @@ class TFakeHive : public TActor<TFakeHive>, public TTabletExecutedFlat { bootstrapperActorId = Boot(ctx, type, &NReplication::CreateController, DataGroupErasure); } else if (type == TTabletTypes::PersQueue) { bootstrapperActorId = Boot(ctx, type, &NKikimr::CreatePersQueue, DataGroupErasure); + } else if (type == TTabletTypes::StatisticsAggregator) { + bootstrapperActorId = Boot(ctx, type, &NStat::CreateStatisticsAggregator, DataGroupErasure); } else { status = NKikimrProto::ERROR; } diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index a7a923cfbb8..2f079f08a9f 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -95,6 +95,7 @@ #include <ydb/core/kesus/proxy/proxy.h> #include <ydb/core/kesus/tablet/tablet.h> #include <ydb/core/sys_view/processor/processor.h> +#include <ydb/core/statistics/aggregator/aggregator.h> #include <ydb/core/keyvalue/keyvalue.h> #include <ydb/core/persqueue/pq.h> #include <ydb/core/persqueue/cluster_tracker.h> @@ -723,6 +724,10 @@ namespace Tests { TLocalConfig::TTabletClassInfo(new TTabletSetupInfo( &NReplication::CreateController, TMailboxType::Revolving, appData.UserPoolId, TMailboxType::Revolving, appData.SystemPoolId)); + localConfig.TabletClassInfo[TTabletTypes::StatisticsAggregator] = + TLocalConfig::TTabletClassInfo(new TTabletSetupInfo( + &NStat::CreateStatisticsAggregator, TMailboxType::Revolving, appData.UserPoolId, + TMailboxType::Revolving, appData.SystemPoolId)); } void TServer::SetupLocalService(ui32 nodeIdx, const TString &domainName) { diff --git a/ydb/core/testlib/ya.make b/ydb/core/testlib/ya.make index 1df7391d650..def2f83e848 100644 --- a/ydb/core/testlib/ya.make +++ b/ydb/core/testlib/ya.make @@ -62,6 +62,7 @@ PEERDIR( ydb/core/persqueue ydb/core/protos ydb/core/security + ydb/core/statistics/aggregator ydb/core/sys_view/processor ydb/core/sys_view/service ydb/core/testlib/actors diff --git a/ydb/core/tx/schemeshard/schemeshard.h b/ydb/core/tx/schemeshard/schemeshard.h index ef07d4412a7..4bb36375ef0 100644 --- a/ydb/core/tx/schemeshard/schemeshard.h +++ b/ydb/core/tx/schemeshard/schemeshard.h @@ -494,6 +494,7 @@ struct TEvSchemeShard { ui64 userAttrsVersion, ui64 tenantHive, ui64 tenantSysViewProcessor, + ui64 tenantStatisticsAggregator, const TString& rootACL) { Record.SetDomainSchemeShard(domainKey.OwnerId); @@ -508,6 +509,7 @@ struct TEvSchemeShard { Record.SetTenantHive(tenantHive); Record.SetTenantSysViewProcessor(tenantSysViewProcessor); + Record.SetTenantStatisticsAggregator(tenantStatisticsAggregator); Record.SetTenantRootACL(rootACL); } @@ -555,6 +557,10 @@ struct TEvSchemeShard { Record.SetTenantSysViewProcessor(svp); } + void SetTenantStatisticsAggregator(ui64 sa) { + Record.SetTenantStatisticsAggregator(sa); + } + void SetUpdateTenantRootACL(const TString& acl) { Record.SetUpdateTenantRootACL(acl); } diff --git a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp index d44b4f94809..83b6cd3cce5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp @@ -104,6 +104,9 @@ struct TSchemeShard::TTxDeleteTabletReply : public TSchemeShard::TRwTxBase { case ETabletType::BlobDepot: Self->TabletCounters->Simple()[COUNTER_BLOB_DEPOT_COUNT].Sub(1); break; + case ETabletType::StatisticsAggregator: + Self->TabletCounters->Simple()[COUNTER_STATISTICS_AGGREGATOR_COUNT].Sub(1); + break; default: Y_FAIL_S("Unknown TabletType" << ", ShardIdx " << ShardIdx diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 34f5debfb68..3a81e088d22 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -3886,6 +3886,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { case ETabletType::BlobDepot: Self->TabletCounters->Simple()[COUNTER_BLOB_DEPOT_COUNT].Add(1); break; + case ETabletType::StatisticsAggregator: + Self->TabletCounters->Simple()[COUNTER_STATISTICS_AGGREGATOR_COUNT].Add(1); + break; default: Y_FAIL_S("dont know how to interpret tablet type" << ", type id: " << (ui32)si.second.TabletType diff --git a/ydb/core/tx/schemeshard/schemeshard__init_root.cpp b/ydb/core/tx/schemeshard/schemeshard__init_root.cpp index 593abe230eb..e37b88c8893 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init_root.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init_root.cpp @@ -405,6 +405,9 @@ struct TSchemeShard::TTxInitTenantSchemeShard : public TSchemeShard::TRwTxBase { if (processingParams.HasSysViewProcessor()) { RegisterShard(db, subdomain, TVector<ui64>{processingParams.GetSysViewProcessor()}, TTabletTypes::SysViewProcessor); } + if (processingParams.HasStatisticsAggregator()) { + RegisterShard(db, subdomain, TVector<ui64>{processingParams.GetStatisticsAggregator()}, TTabletTypes::StatisticsAggregator); + } subdomain->Initialize(Self->ShardInfos); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp index d81c2cd9ebf..dfcf35ae540 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp @@ -36,6 +36,7 @@ struct TParamsDelta { uint8_t AddExternalSchemeShard = 0; uint8_t AddExternalHive = 0; uint8_t AddExternalSysViewProcessor = 0; + uint8_t AddExternalStatisticsAggregator = 0; bool SharedTxSupportAdded = false; TVector<TStoragePool> StoragePoolsAdded; }; @@ -55,7 +56,7 @@ VerifyParams(TParamsDelta* delta, const TSubDomainInfo::TPtr& current, const NKi // // Currently this operation support very few workable result states: // 1. extsubdomain with full SharedTxSupport (ExternalSchemeShard, Coordinators, Mediators + required params), - // with or without ExternalHive and ExternalSysViewProcessor + // with or without ExternalHive, ExternalSysViewProcessor and ExternalStatisticsAggregator // // First params check: single values @@ -171,6 +172,21 @@ VerifyParams(TParamsDelta* delta, const TSubDomainInfo::TPtr& current, const NKi } } + // ExternalStatisticsAggregator checks + uint8_t addExternalStatisticsAggregator = 0; + if (input.HasExternalStatisticsAggregator()) { + const bool prev = bool(current->GetTenantStatisticsAggregatorID()); + const bool next = input.GetExternalStatisticsAggregator(); + const bool changed = (prev != next); + + if (changed) { + if (next == false) { + return paramError("ExternalStatisticsAggregator could only be added, not removed"); + } + addExternalStatisticsAggregator = 1; + } + } + // Second params check: combinations bool sharedTxSupportAdded = (coordinatorsAdded + mediatorsAdded) > 0; @@ -235,6 +251,7 @@ VerifyParams(TParamsDelta* delta, const TSubDomainInfo::TPtr& current, const NKi delta->AddExternalSchemeShard = addExternalSchemeShard; delta->AddExternalHive = addExternalHive; delta->AddExternalSysViewProcessor = addExternalSysViewProcessor; + delta->AddExternalStatisticsAggregator = addExternalStatisticsAggregator; delta->SharedTxSupportAdded = sharedTxSupportAdded; delta->StoragePoolsAdded = std::move(storagePoolsAdded); @@ -741,9 +758,9 @@ public: // Count tablets to create - //NOTE: ExternalHive and ExternalSysViewProcessor are _not_ counted against limits + //NOTE: ExternalHive, ExternalSysViewProcessor and ExternalStatisticsAggregator are _not_ counted against limits ui64 tabletsToCreateUnderLimit = delta.AddExternalSchemeShard + delta.CoordinatorsAdded + delta.MediatorsAdded; - ui64 tabletsToCreateOverLimit = delta.AddExternalSysViewProcessor; + ui64 tabletsToCreateOverLimit = delta.AddExternalSysViewProcessor + delta.AddExternalStatisticsAggregator; ui64 tabletsToCreateTotal = tabletsToCreateUnderLimit + tabletsToCreateOverLimit; // Check path limits @@ -812,7 +829,12 @@ public: // Create shards for the requested tablets (except hive) { TChannelsBindings channelsBinding; - if (delta.SharedTxSupportAdded || delta.AddExternalSchemeShard || delta.AddExternalSysViewProcessor || delta.AddExternalHive) { + if (delta.SharedTxSupportAdded || + delta.AddExternalSchemeShard || + delta.AddExternalSysViewProcessor || + delta.AddExternalHive || + delta.AddExternalStatisticsAggregator) + { if (!context.SS->ResolveSubdomainsChannels(alter->GetStoragePools(), channelsBinding)) { result->SetError(NKikimrScheme::StatusInvalidParameter, "failed to construct channels binding"); return result; @@ -838,6 +860,9 @@ public: if (delta.AddExternalSysViewProcessor) { AddShardsTo(txState, OperationId.GetTxId(), basenameId, 1, TTabletTypes::SysViewProcessor, channelsBinding, context.SS); } + if (delta.AddExternalStatisticsAggregator) { + AddShardsTo(txState, OperationId.GetTxId(), basenameId, 1, TTabletTypes::StatisticsAggregator, channelsBinding, context.SS); + } Y_ABORT_UNLESS(txState.Shards.size() == tabletsToCreateTotal); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index ed8bf9fc4f0..49a60ce4571 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -298,6 +298,9 @@ public: case ETabletType::SysViewProcessor: context.SS->TabletCounters->Simple()[COUNTER_SYS_VIEW_PROCESSOR_COUNT].Add(1); break; + case ETabletType::StatisticsAggregator: + context.SS->TabletCounters->Simple()[COUNTER_STATISTICS_AGGREGATOR_COUNT].Add(1); + break; default: break; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_subdomain.h b/ydb/core/tx/schemeshard/schemeshard__operation_common_subdomain.h index 4ab54021123..59a15811281 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common_subdomain.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_subdomain.h @@ -218,6 +218,16 @@ public: context.OnComplete.BindMsgToPipe(OperationId, tabletID, idx, event); break; } + case ETabletType::StatisticsAggregator: { + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Send configure request to statistics aggregator: " << tabletID << + " opId: " << OperationId << + " schemeshard: " << ssId); + auto event = new NStat::TEvStatistics::TEvConfigureAggregator(path.PathString()); + shard.Operation = TTxState::ConfigureParts; + context.OnComplete.BindMsgToPipe(OperationId, tabletID, idx, event); + break; + } case ETabletType::SchemeShard: { auto event = new TEvSchemeShard::TEvInitTenantSchemeShard(ui64(ssId), pathId.LocalPathId, path.PathString(), diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp index 99e24077c7c..b2225650df9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp @@ -503,6 +503,25 @@ void TSideEffects::DoUpdateTenant(TSchemeShard* ss, NTabletFlatExecutor::TTransa } } + if (!tenantLink.TenantStatisticsAggregator && subDomain->GetTenantStatisticsAggregatorID()) { + message->SetTenantStatisticsAggregator(ui64(subDomain->GetTenantStatisticsAggregatorID())); + hasChanges = true; + } + + if (tenantLink.TenantStatisticsAggregator) { + if (subDomain->GetAlter()) { + Y_VERIFY_S(tenantLink.TenantStatisticsAggregator == subDomain->GetAlter()->GetTenantStatisticsAggregatorID(), + "tenant SA is inconsistent" + << " on tss: " << tenantLink.TenantStatisticsAggregator + << " on gss: " << subDomain->GetAlter()->GetTenantStatisticsAggregatorID()); + } else { + Y_VERIFY_S(tenantLink.TenantStatisticsAggregator == subDomain->GetTenantStatisticsAggregatorID(), + "tenant SA is inconsistent" + << " on tss: " << tenantLink.TenantStatisticsAggregator + << " on gss: " << subDomain->GetTenantStatisticsAggregatorID()); + } + } + if (!hasChanges) { LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "DoUpdateTenant no hasChanges" diff --git a/ydb/core/tx/schemeshard/schemeshard__sync_update_tenants.cpp b/ydb/core/tx/schemeshard/schemeshard__sync_update_tenants.cpp index 3e0238c247c..81df25fee17 100644 --- a/ydb/core/tx/schemeshard/schemeshard__sync_update_tenants.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__sync_update_tenants.cpp @@ -179,6 +179,14 @@ struct TSchemeShard::TTxUpdateTenant : public TSchemeShard::TRwTxBase { Y_ABORT_UNLESS(tenantSVP == subdomain->GetTenantSysViewProcessorID()); } + if (record.HasTenantStatisticsAggregator()) { + TTabletId tenantSA = TTabletId(record.GetTenantStatisticsAggregator()); + if (!subdomain->GetTenantStatisticsAggregatorID()) { + addPrivateShard(tenantSA, ETabletType::StatisticsAggregator); + } + Y_ABORT_UNLESS(tenantSA == subdomain->GetTenantStatisticsAggregatorID()); + } + if (record.HasUpdateTenantRootACL()) { // KIKIMR-10699: transfer tenants root ACL from GSS to the TSS // here TSS sees the ACL from GSS diff --git a/ydb/core/tx/schemeshard/schemeshard_domain_links.cpp b/ydb/core/tx/schemeshard/schemeshard_domain_links.cpp index 12bbae04edf..299ab291859 100644 --- a/ydb/core/tx/schemeshard/schemeshard_domain_links.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_domain_links.cpp @@ -28,6 +28,7 @@ THolder<TEvSchemeShard::TEvSyncTenantSchemeShard> TParentDomainLink::MakeSyncMsg rootPath->UserAttrs->AlterVersion, ui64(rootSubdomain->GetTenantHiveID()), ui64(rootSubdomain->GetTenantSysViewProcessorID()), + ui64(rootSubdomain->GetTenantStatisticsAggregatorID()), rootPath->ACL); } @@ -107,6 +108,7 @@ void TSubDomainsLinks::TLink::Out(IOutputStream& stream) const { << ", UserAttributesVersion: " << UserAttributesVersion << ", TenantHive: " << TenantHive << ", TenantSysViewProcessor: " << TenantSysViewProcessor + << ", TenantStatisticsAggregator: " << TenantStatisticsAggregator << ", TenantRootACL: " << TenantRootACL << "}"; } @@ -121,6 +123,8 @@ TSubDomainsLinks::TLink::TLink(const NKikimrScheme::TEvSyncTenantSchemeShard &re , TenantHive(record.HasTenantHive() ? TTabletId(record.GetTenantHive()) : InvalidTabletId) , TenantSysViewProcessor(record.HasTenantSysViewProcessor() ? TTabletId(record.GetTenantSysViewProcessor()) : InvalidTabletId) + , TenantStatisticsAggregator(record.HasTenantStatisticsAggregator() ? + TTabletId(record.GetTenantStatisticsAggregator()) : InvalidTabletId) , TenantRootACL(record.GetTenantRootACL()) {} diff --git a/ydb/core/tx/schemeshard/schemeshard_domain_links.h b/ydb/core/tx/schemeshard/schemeshard_domain_links.h index d9cb6133c42..542d82e91e6 100644 --- a/ydb/core/tx/schemeshard/schemeshard_domain_links.h +++ b/ydb/core/tx/schemeshard/schemeshard_domain_links.h @@ -41,6 +41,7 @@ public: ui64 UserAttributesVersion = 0; TTabletId TenantHive = InvalidTabletId; TTabletId TenantSysViewProcessor = InvalidTabletId; + TTabletId TenantStatisticsAggregator = InvalidTabletId; TString TenantRootACL; TLink() = default; @@ -76,4 +77,3 @@ template<> inline void Out<NKikimr::NSchemeShard::TSubDomainsLinks::TLink>(IOutputStream& o, const NKikimr::NSchemeShard::TSubDomainsLinks::TLink& x) { return x.Out(o); } - diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 2738973d664..44c59e4c14f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -1567,6 +1567,13 @@ struct TSubDomainInfo: TSimpleRefCount<TSubDomainInfo> { return TTabletId(ProcessingParams.GetSysViewProcessor()); } + TTabletId GetTenantStatisticsAggregatorID() const { + if (!ProcessingParams.HasStatisticsAggregator()) { + return InvalidTabletId; + } + return TTabletId(ProcessingParams.GetStatisticsAggregator()); + } + ui64 GetPathsInside() const { return PathsInsideCount; } @@ -1937,6 +1944,13 @@ struct TSubDomainInfo: TSimpleRefCount<TSubDomainInfo> { if (sysViewProcessors.size()) { ProcessingParams.SetSysViewProcessor(ui64(sysViewProcessors.front())); } + + ProcessingParams.ClearStatisticsAggregator(); + TVector<TTabletId> statisticsAggregators = FilterPrivateTablets(ETabletType::StatisticsAggregator, allShards); + Y_VERIFY_S(statisticsAggregators.size() <= 1, "size was: " << statisticsAggregators.size()); + if (statisticsAggregators.size()) { + ProcessingParams.SetStatisticsAggregator(ui64(statisticsAggregators.front())); + } } void InitializeAsGlobal(NKikimrSubDomains::TProcessingParams&& processingParams) { diff --git a/ydb/core/tx/schemeshard/ut_extsubdomain/ut_extsubdomain.cpp b/ydb/core/tx/schemeshard/ut_extsubdomain/ut_extsubdomain.cpp index bf39bfe9ccb..4737ccdddc6 100644 --- a/ydb/core/tx/schemeshard/ut_extsubdomain/ut_extsubdomain.cpp +++ b/ydb/core/tx/schemeshard/ut_extsubdomain/ut_extsubdomain.cpp @@ -680,6 +680,42 @@ Y_UNIT_TEST_SUITE(TSchemeShardExtSubDomainTest) { ); } + Y_UNIT_TEST_FLAG(AlterCantChangeExternalStatisticsAggregator, AlterDatabaseCreateHiveFirst) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableAlterDatabaseCreateHiveFirst(AlterDatabaseCreateHiveFirst)); + ui64 txId = 100; + + TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", R"(Name: "USER_0")"); + env.TestWaitNotification(runtime, txId); + + // Minimally correct ExtSubDomain settings + TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", + R"( + Name: "USER_0" + ExternalSchemeShard: true + PlanResolution: 50 + Coordinators: 1 + Mediators: 1 + TimeCastBucketsPerMediator: 2 + StoragePools { + Name: "pool-1" + Kind: "hdd" + } + + ExternalStatisticsAggregator: true + )" + ); + env.TestWaitNotification(runtime, txId); + + TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", + R"( + Name: "USER_0" + ExternalStatisticsAggregator: false + )", + {{NKikimrScheme::StatusInvalidParameter, "ExternalStatisticsAggregator could only be added, not removed"}} + ); + } + Y_UNIT_TEST_FLAG(AlterCantChangeSetParams, AlterDatabaseCreateHiveFirst) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableAlterDatabaseCreateHiveFirst(AlterDatabaseCreateHiveFirst)); @@ -1329,6 +1365,108 @@ Y_UNIT_TEST_SUITE(TSchemeShardExtSubDomainTest) { UNIT_ASSERT_EQUAL(tenantSVP, tenantSVPOnTSS); } + Y_UNIT_TEST_FLAG(StatisticsAggregatorSync, AlterDatabaseCreateHiveFirst) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableAlterDatabaseCreateHiveFirst(AlterDatabaseCreateHiveFirst)); + ui64 txId = 100; + + NSchemeShard::TSchemeLimits lowLimits; + lowLimits.MaxShardsInPath = 3; + SetSchemeshardSchemaLimits(runtime, lowLimits); + + TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", + R"(Name: "USER_0")" + ); + + // check that limits have a power, try create 4 shards + TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", + R"( + Name: "USER_0" + PlanResolution: 50 + Coordinators: 2 + Mediators: 1 + TimeCastBucketsPerMediator: 2 + ExternalSchemeShard: true + )", + {{NKikimrScheme::StatusResourceExhausted}} + ); + + // create 3 shards + TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", + R"( + Name: "USER_0" + PlanResolution: 50 + Coordinators: 1 + Mediators: 1 + TimeCastBucketsPerMediator: 2 + ExternalSchemeShard: true + StoragePools { + Name: "/dc-1/users/tenant-1:hdd" + Kind: "hdd" + } + )" + ); + env.TestWaitNotification(runtime, {txId, txId - 1}); + + lowLimits.MaxShardsInPath = 2; + SetSchemeshardSchemaLimits(runtime, lowLimits); + + // one more, but for free + TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", + R"( + Name: "USER_0" + ExternalStatisticsAggregator: true + )" + ); + + env.TestWaitNotification(runtime, txId); + + ui64 tenantSchemeShard = 0; + ui64 tenantSA = 0; + TestDescribeResult(DescribePath(runtime, "/MyRoot/USER_0"), + {NLs::PathExist, + NLs::IsExternalSubDomain("USER_0"), + NLs::ExtractTenantSchemeshard(&tenantSchemeShard), + NLs::ExtractTenantStatisticsAggregator(&tenantSA)}); + + UNIT_ASSERT(tenantSchemeShard != 0 + && tenantSchemeShard != (ui64)-1 + && tenantSchemeShard != TTestTxConfig::SchemeShard); + + UNIT_ASSERT(tenantSA != 0 && tenantSA != (ui64)-1); + + ui64 tenantSAOnTSS = 0; + TestDescribeResult(DescribePath(runtime, tenantSchemeShard, "/MyRoot/USER_0"), + {NLs::PathExist, + NLs::ExtractTenantStatisticsAggregator(&tenantSAOnTSS)}); + + UNIT_ASSERT_EQUAL(tenantSA, tenantSAOnTSS); + + RebootTablet(runtime, tenantSchemeShard, runtime.AllocateEdgeActor()); + + TestCreateTable(runtime, tenantSchemeShard, ++txId, "/MyRoot/USER_0", + R"( + Name: "table" + Columns { Name: "RowId" Type: "Uint64"} + Columns { Name: "Value" Type: "Utf8"} + KeyColumnNames: ["RowId"] + )" + ); + + env.TestWaitNotification(runtime, txId, tenantSchemeShard); + + TestDescribeResult(DescribePath(runtime, tenantSchemeShard, "/MyRoot/USER_0/table"), + {NLs::PathExist}); + + RebootTablet(runtime, tenantSchemeShard, runtime.AllocateEdgeActor()); + + TestDescribeResult(DescribePath(runtime, tenantSchemeShard, "/MyRoot/USER_0"), + {NLs::PathExist, + NLs::ExtractTenantStatisticsAggregator(&tenantSAOnTSS)}); + + UNIT_ASSERT_EQUAL(tenantSA, tenantSAOnTSS); + } + Y_UNIT_TEST_FLAG(SchemeQuotas, AlterDatabaseCreateHiveFirst) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableAlterDatabaseCreateHiveFirst(AlterDatabaseCreateHiveFirst)); @@ -1489,4 +1627,4 @@ Y_UNIT_TEST_SUITE(TSchemeShardExtSubDomainTest) { KeyColumnNames: ["key"] )", {NKikimrScheme::StatusQuotaExceeded}); } -}
\ No newline at end of file +} diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp index 162d82f2066..d0de3aa5e55 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp @@ -126,6 +126,18 @@ TCheckFunc ExtractTenantSysViewProcessor(ui64* tenantSVPId) { }; } +TCheckFunc ExtractTenantStatisticsAggregator(ui64* tenantSAId) { + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrScheme::StatusSuccess); + const auto& pathDescr = record.GetPathDescription(); + UNIT_ASSERT(pathDescr.HasDomainDescription()); + const auto& domainDesc = pathDescr.GetDomainDescription(); + UNIT_ASSERT(domainDesc.HasProcessingParams()); + const auto& procParams = domainDesc.GetProcessingParams(); + *tenantSAId = procParams.GetStatisticsAggregator(); + }; +} + TCheckFunc ExtractDomainHive(ui64* domainHiveId) { return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrScheme::StatusSuccess); diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h index edd085969c1..7c849a7c53e 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h @@ -65,6 +65,7 @@ namespace NLs { void InExternalSubdomain(const NKikimrScheme::TEvDescribeSchemeResult& record); TCheckFunc ExtractTenantSchemeshard(ui64* tenantSchemeShardId); TCheckFunc ExtractTenantSysViewProcessor(ui64* tenantSVPId); + TCheckFunc ExtractTenantStatisticsAggregator(ui64* tenantSAId); TCheckFunc ExtractDomainHive(ui64* domainHiveId); void NotFinished(const NKikimrScheme::TEvDescribeSchemeResult& record); diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 4c83a37ddc5..aa71df6f0c0 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1009,5 +1009,6 @@ message TActivity { KQP_COMPILE_COMPUTATION_PATTERN_SERVICE = 619; NODEWARDEN_DISTRIBUTED_CONFIG = 620; PQ_FETCH_REQUEST = 621; + STATISTICS_AGGREGATOR = 622; }; }; |