diff options
author | Pisarenko Grigoriy <grigoriypisar@ydb.tech> | 2024-12-13 17:14:50 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-13 17:14:50 +0300 |
commit | 31afdb84df31cabbc5a0e9101d2568f0e71e282d (patch) | |
tree | 61d0a38492364d81c7fac006c72eacc425ed5e6c | |
parent | 932db4f699b1716ef2c525bcb88759878c1b2ac2 (diff) | |
download | ydb-31afdb84df31cabbc5a0e9101d2568f0e71e282d.tar.gz |
YQ-3970 RD added parser mkql counters (#12607)
24 files changed, 89 insertions, 39 deletions
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index e365ec04cf..23c139aea2 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -205,7 +205,8 @@ void Init( tenant, yqCounters->GetSubgroup("subsystem", "row_dispatcher"), CreatePqNativeGateway(pqServices), - appData->Mon); + appData->Mon, + appData->Counters); actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release()); } diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp index 524f9f7632..e77be57b91 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp @@ -20,6 +20,7 @@ struct TActorFactory : public IActorFactory { NYdb::TDriver driver, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, const ::NMonitoring::TDynamicCounterPtr& counters, + const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, ui64 maxBufferSize) const override { @@ -35,6 +36,7 @@ struct TActorFactory : public IActorFactory { std::move(driver), credentialsProviderFactory, counters, + countersRoot, pqGateway, maxBufferSize ); diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.h b/ydb/core/fq/libs/row_dispatcher/actors_factory.h index b6df32f759..9c7dc0fac1 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.h +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.h @@ -23,6 +23,7 @@ struct IActorFactory : public TThrRefBase { NYdb::TDriver driver, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, const ::NMonitoring::TDynamicCounterPtr& counters, + const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, ui64 maxBufferSize) const = 0; }; diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp index 8340bbf62f..cfb7526bdc 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp @@ -10,4 +10,12 @@ TString TSchemaColumn::ToString() const { return TStringBuilder() << "'" << Name << "' : " << TypeYson; } +//// TCountersDesc + +TCountersDesc TCountersDesc::CopyWithNewMkqlCountersName(const TString& mkqlCountersName) const { + TCountersDesc result(*this); + result.MkqlCountersName = mkqlCountersName; + return result; +} + } // namespace NFq::NRowDispatcher diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h index 10c5701b5d..dbd5e10bc2 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h @@ -1,5 +1,7 @@ #pragma once +#include <library/cpp/monlib/dynamic_counters/counters.h> + #include <ydb/library/conclusion/generic/result.h> #include <ydb/library/conclusion/status.h> #include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h> @@ -22,4 +24,12 @@ struct TSchemaColumn { TString ToString() const; }; +struct TCountersDesc { + NMonitoring::TDynamicCounterPtr CountersRoot = MakeIntrusive<NMonitoring::TDynamicCounters>(); + NMonitoring::TDynamicCounterPtr CountersSubgroup = MakeIntrusive<NMonitoring::TDynamicCounters>(); + TString MkqlCountersName; // Used for TAlignedPagePoolCounters created from CountersRoot + + [[nodiscard]] TCountersDesc CopyWithNewMkqlCountersName(const TString& mkqlCountersName) const; +}; + } // namespace NFq::NRowDispatcher diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make b/ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make index e4d689c0a1..1ac30eda68 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make @@ -5,6 +5,8 @@ SRCS( ) PEERDIR( + library/cpp/monlib/dynamic_counters + ydb/library/conclusion ydb/library/yql/dq/actors/protos diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp index 1a8bdedf4b..df46b12bef 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp @@ -18,24 +18,24 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public using TBase = NActors::TActor<TTopicFormatHandler>; struct TCounters { - const NMonitoring::TDynamicCounterPtr CountersRoot; - const NMonitoring::TDynamicCounterPtr CountersSubgroup; + TCountersDesc Desc; NMonitoring::TDynamicCounters::TCounterPtr ActiveFormatHandlers; NMonitoring::TDynamicCounters::TCounterPtr ActiveClients; - TCounters(NMonitoring::TDynamicCounterPtr counters, const TSettings& settings) - : CountersRoot(counters) - , CountersSubgroup(counters->GetSubgroup("format", settings.ParsingFormat)) + TCounters(const TCountersDesc& counters, const TSettings& settings) + : Desc(counters) { + Desc.CountersSubgroup = Desc.CountersSubgroup->GetSubgroup("format", settings.ParsingFormat); + Register(); } private: void Register() { - ActiveFormatHandlers = CountersRoot->GetCounter("ActiveFormatHandlers", false); + ActiveFormatHandlers = Desc.CountersRoot->GetCounter("ActiveFormatHandlers", false); - ActiveClients = CountersSubgroup->GetCounter("ActiveClients", false); + ActiveClients = Desc.CountersSubgroup->GetCounter("ActiveClients", false); } }; @@ -282,9 +282,9 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public }; public: - TTopicFormatHandler(const TFormatHandlerConfig& config, const TSettings& settings, NMonitoring::TDynamicCounterPtr counters) + TTopicFormatHandler(const TFormatHandlerConfig& config, const TSettings& settings, const TCountersDesc& counters) : TBase(&TTopicFormatHandler::StateFunc) - , TTypeParser(__LOCATION__) + , TTypeParser(__LOCATION__, counters.CopyWithNewMkqlCountersName("row_dispatcher")) , Config(config) , Settings(settings) , LogPrefix(TStringBuilder() << "TTopicFormatHandler [" << Settings.ParsingFormat << "]: ") @@ -487,18 +487,19 @@ private: } TValueStatus<ITopicParser::TPtr> CreateParserForFormat() const { + const auto& counters = Counters.Desc.CopyWithNewMkqlCountersName("row_dispatcher_parser"); if (Settings.ParsingFormat == "raw") { - return CreateRawParser(ParserHandler); + return CreateRawParser(ParserHandler, counters); } if (Settings.ParsingFormat == "json_each_row") { - return CreateJsonParser(ParserHandler, Config.JsonParserConfig); + return CreateJsonParser(ParserHandler, Config.JsonParserConfig, counters); } return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Unsupported parsing format: " << Settings.ParsingFormat); } void CreateFilters() { if (!Filters) { - Filters = CreateTopicFilters(SelfId(), Config.FiltersConfig, Counters.CountersSubgroup); + Filters = CreateTopicFilters(SelfId(), Config.FiltersConfig, Counters.Desc.CountersSubgroup); } } @@ -567,7 +568,7 @@ void ITopicFormatHandler::TDestroy::Destroy(ITopicFormatHandler* handler) { } } -ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, NMonitoring::TDynamicCounterPtr counters) { +ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, const TCountersDesc& counters) { const auto handler = new TTopicFormatHandler(config, settings, counters); owner.RegisterWithSameMailbox(handler); return ITopicFormatHandler::TPtr(handler); @@ -585,7 +586,7 @@ TFormatHandlerConfig CreateFormatHandlerConfig(const NConfig::TRowDispatcherConf namespace NTests { ITopicFormatHandler::TPtr CreateTestFormatHandler(const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings) { - const auto handler = new TTopicFormatHandler(config, settings, MakeIntrusive<NMonitoring::TDynamicCounters>()); + const auto handler = new TTopicFormatHandler(config, settings, {}); NActors::TActivationContext::ActorSystem()->Register(handler); return ITopicFormatHandler::TPtr(handler); } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h index 809d59b42e..ca028a6888 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h @@ -66,7 +66,7 @@ struct TFormatHandlerConfig { TTopicFiltersConfig FiltersConfig; }; -ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, NMonitoring::TDynamicCounterPtr counters); +ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, const TCountersDesc& counters); TFormatHandlerConfig CreateFormatHandlerConfig(const NConfig::TRowDispatcherConfig& rowDispatcherConfig, NActors::TActorId compileServiceId); namespace NTests { diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp index 9b241c9635..7c4fb95102 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp @@ -316,8 +316,8 @@ public: using TPtr = TIntrusivePtr<TJsonParser>; public: - TJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config) - : TBase(std::move(consumer), __LOCATION__) + TJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters) + : TBase(std::move(consumer), __LOCATION__, counters) , Config(config) , NumberColumns(Consumer->GetColumns().size()) , MaxNumberRows((config.BufferCellCount - 1) / NumberColumns + 1) @@ -483,8 +483,8 @@ private: } // anonymous namespace -TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config) { - TJsonParser::TPtr parser = MakeIntrusive<TJsonParser>(consumer, config); +TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters) { + TJsonParser::TPtr parser = MakeIntrusive<TJsonParser>(consumer, config, counters); if (auto status = parser->InitColumnsParsers(); status.IsFail()) { return status; } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h index fe34abb3cd..c8afd38464 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h @@ -14,7 +14,7 @@ struct TJsonParserConfig { ui64 BufferCellCount = 1000000; // (number rows) * (number columns) limit }; -TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config); +TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters); TJsonParserConfig CreateJsonParserConfig(const NConfig::TJsonParserConfig& parserConfig); } // namespace NFq::NRowDispatcher diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp index 663ece63a4..916795ea4f 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp @@ -10,8 +10,8 @@ namespace NFq::NRowDispatcher { //// TTypeParser -TTypeParser::TTypeParser(const TSourceLocation& location) - : Alloc(location, NKikimr::TAlignedPagePoolCounters(), true, false) +TTypeParser::TTypeParser(const TSourceLocation& location, const TCountersDesc& counters) + : Alloc(location, NKikimr::TAlignedPagePoolCounters(counters.CountersRoot, counters.MkqlCountersName), true, false) , FunctionRegistry(NKikimr::NMiniKQL::CreateFunctionRegistry(&PrintBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, {})) , TypeEnv(std::make_unique<NKikimr::NMiniKQL::TTypeEnvironment>(Alloc)) , ProgramBuilder(std::make_unique<NKikimr::NMiniKQL::TProgramBuilder>(*TypeEnv, *FunctionRegistry)) @@ -57,8 +57,8 @@ void TTopicParserBase::TStats::Clear() { //// TTopicParserBase -TTopicParserBase::TTopicParserBase(IParsedDataConsumer::TPtr consumer, const TSourceLocation& location) - : TTypeParser(location) +TTopicParserBase::TTopicParserBase(IParsedDataConsumer::TPtr consumer, const TSourceLocation& location, const TCountersDesc& counters) + : TTypeParser(location, counters) , Consumer(std::move(consumer)) {} diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.h b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.h index f6d47268e5..b28fadbd4c 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.h @@ -9,7 +9,7 @@ namespace NFq::NRowDispatcher { class TTypeParser { public: - explicit TTypeParser(const TSourceLocation& location); + explicit TTypeParser(const TSourceLocation& location, const TCountersDesc& counters); virtual ~TTypeParser(); TValueStatus<NKikimr::NMiniKQL::TType*> ParseTypeYson(const TString& typeYson) const; @@ -36,7 +36,7 @@ public: using TPtr = TIntrusivePtr<TTopicParserBase>; public: - TTopicParserBase(IParsedDataConsumer::TPtr consumer, const TSourceLocation& location); + TTopicParserBase(IParsedDataConsumer::TPtr consumer, const TSourceLocation& location, const TCountersDesc& counters); virtual ~TTopicParserBase() = default; public: diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp index b230b8dd0a..32d4f4c4c1 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp @@ -17,8 +17,8 @@ public: using TPtr = TIntrusivePtr<TRawParser>; public: - TRawParser(IParsedDataConsumer::TPtr consumer, const TSchemaColumn& schema) - : TBase(std::move(consumer), __LOCATION__) + TRawParser(IParsedDataConsumer::TPtr consumer, const TSchemaColumn& schema, const TCountersDesc& counters) + : TBase(std::move(consumer), __LOCATION__, counters) , Schema(schema) , LogPrefix("TRawParser: ") {} @@ -108,13 +108,13 @@ private: } // anonymous namespace -TValueStatus<ITopicParser::TPtr> CreateRawParser(IParsedDataConsumer::TPtr consumer) { +TValueStatus<ITopicParser::TPtr> CreateRawParser(IParsedDataConsumer::TPtr consumer, const TCountersDesc& counters) { const auto& columns = consumer->GetColumns(); if (columns.size() != 1) { return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Expected only one column for raw format, but got " << columns.size()); } - TRawParser::TPtr parser = MakeIntrusive<TRawParser>(consumer, columns[0]); + TRawParser::TPtr parser = MakeIntrusive<TRawParser>(consumer, columns[0], counters); if (auto status = parser->InitColumnParser(); status.IsFail()) { return status.AddParentIssue(TStringBuilder() << "Failed to create raw parser for column " << columns[0].ToString()); } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.h b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.h index f920543a13..6588844884 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.h @@ -4,6 +4,6 @@ namespace NFq::NRowDispatcher { -TValueStatus<ITopicParser::TPtr> CreateRawParser(IParsedDataConsumer::TPtr consumer); +TValueStatus<ITopicParser::TPtr> CreateRawParser(IParsedDataConsumer::TPtr consumer, const TCountersDesc& counters); } // namespace NFq::NRowDispatcher diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp index d7b3812f87..b65911e793 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp @@ -150,7 +150,7 @@ TBaseFixture::TBatch& TBaseFixture::TBatch::AddRow(TRow row) { //// TBaseFixture TBaseFixture::TBaseFixture() - : TTypeParser(__LOCATION__) + : TTypeParser(__LOCATION__, {}) , MemoryInfo("TBaseFixture alloc") , HolderFactory(std::make_unique<NKikimr::NMiniKQL::THolderFactory>(Alloc.Ref(), MemoryInfo)) , Runtime(1) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp index e2d46d5367..0f2d59d810 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp @@ -162,7 +162,7 @@ public: protected: TValueStatus<ITopicParser::TPtr> CreateParser() override { - return CreateJsonParser(ParserHandler, Config); + return CreateJsonParser(ParserHandler, Config, {}); } public: @@ -172,7 +172,7 @@ public: class TRawParserFixture : public TBaseParserFixture { protected: TValueStatus<ITopicParser::TPtr> CreateParser() override { - return CreateRawParser(ParserHandler); + return CreateRawParser(ParserHandler, {}); } }; diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 64126ef800..1809bc6bcc 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -280,6 +280,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> { TString Tenant; NFq::NRowDispatcher::IActorFactory::TPtr ActorFactory; const ::NMonitoring::TDynamicCounterPtr Counters; + const ::NMonitoring::TDynamicCounterPtr CountersRoot; TRowDispatcherMetrics Metrics; NYql::IPqGateway::TPtr PqGateway; NActors::TMon* Monitoring; @@ -358,6 +359,7 @@ public: const TString& tenant, const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory, const ::NMonitoring::TDynamicCounterPtr& counters, + const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, NActors::TMon* monitoring = nullptr); @@ -435,6 +437,7 @@ TRowDispatcher::TRowDispatcher( const TString& tenant, const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory, const ::NMonitoring::TDynamicCounterPtr& counters, + const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, NActors::TMon* monitoring) : Config(config) @@ -445,6 +448,7 @@ TRowDispatcher::TRowDispatcher( , Tenant(tenant) , ActorFactory(actorFactory) , Counters(counters) + , CountersRoot(countersRoot) , Metrics(counters) , PqGateway(pqGateway) , Monitoring(monitoring) @@ -788,6 +792,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { ev->Get()->Record.GetToken(), source.GetAddBearerToToken()), Counters, + CountersRoot, PqGateway, MaxSessionBufferSizeBytes ); @@ -1055,6 +1060,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher( const TString& tenant, const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory, const ::NMonitoring::TDynamicCounterPtr& counters, + const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, NActors::TMon* monitoring) { @@ -1066,6 +1072,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher( tenant, actorFactory, counters, + countersRoot, pqGateway, monitoring)); } diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h index f956ac3d8b..947afcbb79 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h @@ -27,6 +27,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher( const TString& tenant, const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory, const ::NMonitoring::TDynamicCounterPtr& counters, + const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, NActors::TMon* monitoring = nullptr); diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp index 65a63bdea2..bc406e6f62 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp @@ -17,7 +17,8 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService( const TString& tenant, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway, - NActors::TMon* monitoring) + NActors::TMon* monitoring, + ::NMonitoring::TDynamicCounterPtr countersRoot) { return NewRowDispatcher( config, @@ -27,6 +28,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService( tenant, NFq::NRowDispatcher::CreateActorFactory(), counters, + countersRoot, pqGateway, monitoring); } diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h index 055240eb2e..ebcce4b1e8 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h @@ -27,6 +27,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService( const TString& tenant, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway, - NActors::TMon* monitoring = nullptr); + NActors::TMon* monitoring = nullptr, + ::NMonitoring::TDynamicCounterPtr countersRoot = MakeIntrusive<::NMonitoring::TDynamicCounters>()); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 0baf82b93b..d44dd18710 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -252,6 +252,7 @@ private: TStats ClientsStats; TTopicSessionMetrics Metrics; const ::NMonitoring::TDynamicCounterPtr Counters; + const ::NMonitoring::TDynamicCounterPtr CountersRoot; public: explicit TTopicSession( @@ -266,6 +267,7 @@ public: NYdb::TDriver driver, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, const ::NMonitoring::TDynamicCounterPtr& counters, + const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, ui64 maxBufferSize); @@ -345,6 +347,7 @@ TTopicSession::TTopicSession( NYdb::TDriver driver, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, const ::NMonitoring::TDynamicCounterPtr& counters, + const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, ui64 maxBufferSize) : ReadGroup(readGroup) @@ -362,6 +365,7 @@ TTopicSession::TTopicSession( , BufferSize(maxBufferSize) , LogPrefix("TopicSession") , Counters(counters) + , CountersRoot(countersRoot) {} void TTopicSession::Bootstrap() { @@ -718,7 +722,12 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive<TClientsInfo>(*this, LogPrefix, handlerSettings, ev, Counters, ReadGroup)}).first->second; auto formatIt = FormatHandlers.find(handlerSettings); if (formatIt == FormatHandlers.end()) { - formatIt = FormatHandlers.insert({handlerSettings, CreateTopicFormatHandler(ActorContext(), FormatHandlerConfig, handlerSettings, Metrics.PartitionGroup)}).first; + formatIt = FormatHandlers.insert({handlerSettings, CreateTopicFormatHandler( + ActorContext(), + FormatHandlerConfig, + handlerSettings, + {.CountersRoot = CountersRoot, .CountersSubgroup = Metrics.PartitionGroup} + )}).first; } if (auto status = formatIt->second->AddClient(clientInfo); status.IsFail()) { @@ -881,9 +890,10 @@ std::unique_ptr<IActor> NewTopicSession( NYdb::TDriver driver, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, const ::NMonitoring::TDynamicCounterPtr& counters, + const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, ui64 maxBufferSize) { - return std::unique_ptr<IActor>(new TTopicSession(readGroup, topicPath, endpoint, database, config, rowDispatcherActorId, compileServiceActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, pqGateway, maxBufferSize)); + return std::unique_ptr<IActor>(new TTopicSession(readGroup, topicPath, endpoint, database, config, rowDispatcherActorId, compileServiceActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, countersRoot, pqGateway, maxBufferSize)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.h b/ydb/core/fq/libs/row_dispatcher/topic_session.h index 470dbbef29..d24201d0d6 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.h +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.h @@ -27,6 +27,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession( NYdb::TDriver driver, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, const ::NMonitoring::TDynamicCounterPtr& counters, + const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, ui64 maxBufferSize); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp index e767cf0573..e2807adbee 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp @@ -38,6 +38,7 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory { NYdb::TDriver /*driver*/, std::shared_ptr<NYdb::ICredentialsProviderFactory> /*credentialsProviderFactory*/, const ::NMonitoring::TDynamicCounterPtr& /*counters*/, + const ::NMonitoring::TDynamicCounterPtr& /*counters*/, const NYql::IPqGateway::TPtr& /*pqGateway*/, ui64 /*maxBufferSize*/) const override { auto actorId = Runtime.AllocateEdgeActor(); @@ -94,6 +95,7 @@ public: "Tenant", TestActorFactory, MakeIntrusive<NMonitoring::TDynamicCounters>(), + MakeIntrusive<NMonitoring::TDynamicCounters>(), CreatePqNativeGateway(pqServices) ).release()); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 7016ca9c2b..82eb073241 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -71,6 +71,7 @@ public: Driver, CredentialsProviderFactory, MakeIntrusive<NMonitoring::TDynamicCounters>(), + MakeIntrusive<NMonitoring::TDynamicCounters>(), CreatePqNativeGateway(pqServices), 16000000 ).release()); |