aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPisarenko Grigoriy <grigoriypisar@ydb.tech>2024-12-13 17:14:50 +0300
committerGitHub <noreply@github.com>2024-12-13 17:14:50 +0300
commit31afdb84df31cabbc5a0e9101d2568f0e71e282d (patch)
tree61d0a38492364d81c7fac006c72eacc425ed5e6c
parent932db4f699b1716ef2c525bcb88759878c1b2ac2 (diff)
downloadydb-31afdb84df31cabbc5a0e9101d2568f0e71e282d.tar.gz
YQ-3970 RD added parser mkql counters (#12607)
-rw-r--r--ydb/core/fq/libs/init/init.cpp3
-rw-r--r--ydb/core/fq/libs/row_dispatcher/actors_factory.cpp2
-rw-r--r--ydb/core/fq/libs/row_dispatcher/actors_factory.h1
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp8
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h10
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make2
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp29
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h2
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp8
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h2
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp8
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.h4
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp8
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.h2
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp2
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp4
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp7
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher.h1
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp4
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h3
-rw-r--r--ydb/core/fq/libs/row_dispatcher/topic_session.cpp14
-rw-r--r--ydb/core/fq/libs/row_dispatcher/topic_session.h1
-rw-r--r--ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp2
-rw-r--r--ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp1
24 files changed, 89 insertions, 39 deletions
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp
index e365ec04cf..23c139aea2 100644
--- a/ydb/core/fq/libs/init/init.cpp
+++ b/ydb/core/fq/libs/init/init.cpp
@@ -205,7 +205,8 @@ void Init(
tenant,
yqCounters->GetSubgroup("subsystem", "row_dispatcher"),
CreatePqNativeGateway(pqServices),
- appData->Mon);
+ appData->Mon,
+ appData->Counters);
actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release());
}
diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp
index 524f9f7632..e77be57b91 100644
--- a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp
@@ -20,6 +20,7 @@ struct TActorFactory : public IActorFactory {
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
+ const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
ui64 maxBufferSize) const override {
@@ -35,6 +36,7 @@ struct TActorFactory : public IActorFactory {
std::move(driver),
credentialsProviderFactory,
counters,
+ countersRoot,
pqGateway,
maxBufferSize
);
diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.h b/ydb/core/fq/libs/row_dispatcher/actors_factory.h
index b6df32f759..9c7dc0fac1 100644
--- a/ydb/core/fq/libs/row_dispatcher/actors_factory.h
+++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.h
@@ -23,6 +23,7 @@ struct IActorFactory : public TThrRefBase {
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
+ const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
ui64 maxBufferSize) const = 0;
};
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp
index 8340bbf62f..cfb7526bdc 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp
@@ -10,4 +10,12 @@ TString TSchemaColumn::ToString() const {
return TStringBuilder() << "'" << Name << "' : " << TypeYson;
}
+//// TCountersDesc
+
+TCountersDesc TCountersDesc::CopyWithNewMkqlCountersName(const TString& mkqlCountersName) const {
+ TCountersDesc result(*this);
+ result.MkqlCountersName = mkqlCountersName;
+ return result;
+}
+
} // namespace NFq::NRowDispatcher
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h
index 10c5701b5d..dbd5e10bc2 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h
@@ -1,5 +1,7 @@
#pragma once
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
#include <ydb/library/conclusion/generic/result.h>
#include <ydb/library/conclusion/status.h>
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
@@ -22,4 +24,12 @@ struct TSchemaColumn {
TString ToString() const;
};
+struct TCountersDesc {
+ NMonitoring::TDynamicCounterPtr CountersRoot = MakeIntrusive<NMonitoring::TDynamicCounters>();
+ NMonitoring::TDynamicCounterPtr CountersSubgroup = MakeIntrusive<NMonitoring::TDynamicCounters>();
+ TString MkqlCountersName; // Used for TAlignedPagePoolCounters created from CountersRoot
+
+ [[nodiscard]] TCountersDesc CopyWithNewMkqlCountersName(const TString& mkqlCountersName) const;
+};
+
} // namespace NFq::NRowDispatcher
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make b/ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make
index e4d689c0a1..1ac30eda68 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make
@@ -5,6 +5,8 @@ SRCS(
)
PEERDIR(
+ library/cpp/monlib/dynamic_counters
+
ydb/library/conclusion
ydb/library/yql/dq/actors/protos
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
index 1a8bdedf4b..df46b12bef 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
@@ -18,24 +18,24 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
using TBase = NActors::TActor<TTopicFormatHandler>;
struct TCounters {
- const NMonitoring::TDynamicCounterPtr CountersRoot;
- const NMonitoring::TDynamicCounterPtr CountersSubgroup;
+ TCountersDesc Desc;
NMonitoring::TDynamicCounters::TCounterPtr ActiveFormatHandlers;
NMonitoring::TDynamicCounters::TCounterPtr ActiveClients;
- TCounters(NMonitoring::TDynamicCounterPtr counters, const TSettings& settings)
- : CountersRoot(counters)
- , CountersSubgroup(counters->GetSubgroup("format", settings.ParsingFormat))
+ TCounters(const TCountersDesc& counters, const TSettings& settings)
+ : Desc(counters)
{
+ Desc.CountersSubgroup = Desc.CountersSubgroup->GetSubgroup("format", settings.ParsingFormat);
+
Register();
}
private:
void Register() {
- ActiveFormatHandlers = CountersRoot->GetCounter("ActiveFormatHandlers", false);
+ ActiveFormatHandlers = Desc.CountersRoot->GetCounter("ActiveFormatHandlers", false);
- ActiveClients = CountersSubgroup->GetCounter("ActiveClients", false);
+ ActiveClients = Desc.CountersSubgroup->GetCounter("ActiveClients", false);
}
};
@@ -282,9 +282,9 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
};
public:
- TTopicFormatHandler(const TFormatHandlerConfig& config, const TSettings& settings, NMonitoring::TDynamicCounterPtr counters)
+ TTopicFormatHandler(const TFormatHandlerConfig& config, const TSettings& settings, const TCountersDesc& counters)
: TBase(&TTopicFormatHandler::StateFunc)
- , TTypeParser(__LOCATION__)
+ , TTypeParser(__LOCATION__, counters.CopyWithNewMkqlCountersName("row_dispatcher"))
, Config(config)
, Settings(settings)
, LogPrefix(TStringBuilder() << "TTopicFormatHandler [" << Settings.ParsingFormat << "]: ")
@@ -487,18 +487,19 @@ private:
}
TValueStatus<ITopicParser::TPtr> CreateParserForFormat() const {
+ const auto& counters = Counters.Desc.CopyWithNewMkqlCountersName("row_dispatcher_parser");
if (Settings.ParsingFormat == "raw") {
- return CreateRawParser(ParserHandler);
+ return CreateRawParser(ParserHandler, counters);
}
if (Settings.ParsingFormat == "json_each_row") {
- return CreateJsonParser(ParserHandler, Config.JsonParserConfig);
+ return CreateJsonParser(ParserHandler, Config.JsonParserConfig, counters);
}
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Unsupported parsing format: " << Settings.ParsingFormat);
}
void CreateFilters() {
if (!Filters) {
- Filters = CreateTopicFilters(SelfId(), Config.FiltersConfig, Counters.CountersSubgroup);
+ Filters = CreateTopicFilters(SelfId(), Config.FiltersConfig, Counters.Desc.CountersSubgroup);
}
}
@@ -567,7 +568,7 @@ void ITopicFormatHandler::TDestroy::Destroy(ITopicFormatHandler* handler) {
}
}
-ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, NMonitoring::TDynamicCounterPtr counters) {
+ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, const TCountersDesc& counters) {
const auto handler = new TTopicFormatHandler(config, settings, counters);
owner.RegisterWithSameMailbox(handler);
return ITopicFormatHandler::TPtr(handler);
@@ -585,7 +586,7 @@ TFormatHandlerConfig CreateFormatHandlerConfig(const NConfig::TRowDispatcherConf
namespace NTests {
ITopicFormatHandler::TPtr CreateTestFormatHandler(const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings) {
- const auto handler = new TTopicFormatHandler(config, settings, MakeIntrusive<NMonitoring::TDynamicCounters>());
+ const auto handler = new TTopicFormatHandler(config, settings, {});
NActors::TActivationContext::ActorSystem()->Register(handler);
return ITopicFormatHandler::TPtr(handler);
}
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h
index 809d59b42e..ca028a6888 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h
@@ -66,7 +66,7 @@ struct TFormatHandlerConfig {
TTopicFiltersConfig FiltersConfig;
};
-ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, NMonitoring::TDynamicCounterPtr counters);
+ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, const TCountersDesc& counters);
TFormatHandlerConfig CreateFormatHandlerConfig(const NConfig::TRowDispatcherConfig& rowDispatcherConfig, NActors::TActorId compileServiceId);
namespace NTests {
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp
index 9b241c9635..7c4fb95102 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp
@@ -316,8 +316,8 @@ public:
using TPtr = TIntrusivePtr<TJsonParser>;
public:
- TJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config)
- : TBase(std::move(consumer), __LOCATION__)
+ TJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters)
+ : TBase(std::move(consumer), __LOCATION__, counters)
, Config(config)
, NumberColumns(Consumer->GetColumns().size())
, MaxNumberRows((config.BufferCellCount - 1) / NumberColumns + 1)
@@ -483,8 +483,8 @@ private:
} // anonymous namespace
-TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config) {
- TJsonParser::TPtr parser = MakeIntrusive<TJsonParser>(consumer, config);
+TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters) {
+ TJsonParser::TPtr parser = MakeIntrusive<TJsonParser>(consumer, config, counters);
if (auto status = parser->InitColumnsParsers(); status.IsFail()) {
return status;
}
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h
index fe34abb3cd..c8afd38464 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h
@@ -14,7 +14,7 @@ struct TJsonParserConfig {
ui64 BufferCellCount = 1000000; // (number rows) * (number columns) limit
};
-TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config);
+TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters);
TJsonParserConfig CreateJsonParserConfig(const NConfig::TJsonParserConfig& parserConfig);
} // namespace NFq::NRowDispatcher
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp
index 663ece63a4..916795ea4f 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp
@@ -10,8 +10,8 @@ namespace NFq::NRowDispatcher {
//// TTypeParser
-TTypeParser::TTypeParser(const TSourceLocation& location)
- : Alloc(location, NKikimr::TAlignedPagePoolCounters(), true, false)
+TTypeParser::TTypeParser(const TSourceLocation& location, const TCountersDesc& counters)
+ : Alloc(location, NKikimr::TAlignedPagePoolCounters(counters.CountersRoot, counters.MkqlCountersName), true, false)
, FunctionRegistry(NKikimr::NMiniKQL::CreateFunctionRegistry(&PrintBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, {}))
, TypeEnv(std::make_unique<NKikimr::NMiniKQL::TTypeEnvironment>(Alloc))
, ProgramBuilder(std::make_unique<NKikimr::NMiniKQL::TProgramBuilder>(*TypeEnv, *FunctionRegistry))
@@ -57,8 +57,8 @@ void TTopicParserBase::TStats::Clear() {
//// TTopicParserBase
-TTopicParserBase::TTopicParserBase(IParsedDataConsumer::TPtr consumer, const TSourceLocation& location)
- : TTypeParser(location)
+TTopicParserBase::TTopicParserBase(IParsedDataConsumer::TPtr consumer, const TSourceLocation& location, const TCountersDesc& counters)
+ : TTypeParser(location, counters)
, Consumer(std::move(consumer))
{}
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.h b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.h
index f6d47268e5..b28fadbd4c 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.h
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.h
@@ -9,7 +9,7 @@ namespace NFq::NRowDispatcher {
class TTypeParser {
public:
- explicit TTypeParser(const TSourceLocation& location);
+ explicit TTypeParser(const TSourceLocation& location, const TCountersDesc& counters);
virtual ~TTypeParser();
TValueStatus<NKikimr::NMiniKQL::TType*> ParseTypeYson(const TString& typeYson) const;
@@ -36,7 +36,7 @@ public:
using TPtr = TIntrusivePtr<TTopicParserBase>;
public:
- TTopicParserBase(IParsedDataConsumer::TPtr consumer, const TSourceLocation& location);
+ TTopicParserBase(IParsedDataConsumer::TPtr consumer, const TSourceLocation& location, const TCountersDesc& counters);
virtual ~TTopicParserBase() = default;
public:
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp
index b230b8dd0a..32d4f4c4c1 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp
@@ -17,8 +17,8 @@ public:
using TPtr = TIntrusivePtr<TRawParser>;
public:
- TRawParser(IParsedDataConsumer::TPtr consumer, const TSchemaColumn& schema)
- : TBase(std::move(consumer), __LOCATION__)
+ TRawParser(IParsedDataConsumer::TPtr consumer, const TSchemaColumn& schema, const TCountersDesc& counters)
+ : TBase(std::move(consumer), __LOCATION__, counters)
, Schema(schema)
, LogPrefix("TRawParser: ")
{}
@@ -108,13 +108,13 @@ private:
} // anonymous namespace
-TValueStatus<ITopicParser::TPtr> CreateRawParser(IParsedDataConsumer::TPtr consumer) {
+TValueStatus<ITopicParser::TPtr> CreateRawParser(IParsedDataConsumer::TPtr consumer, const TCountersDesc& counters) {
const auto& columns = consumer->GetColumns();
if (columns.size() != 1) {
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Expected only one column for raw format, but got " << columns.size());
}
- TRawParser::TPtr parser = MakeIntrusive<TRawParser>(consumer, columns[0]);
+ TRawParser::TPtr parser = MakeIntrusive<TRawParser>(consumer, columns[0], counters);
if (auto status = parser->InitColumnParser(); status.IsFail()) {
return status.AddParentIssue(TStringBuilder() << "Failed to create raw parser for column " << columns[0].ToString());
}
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.h b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.h
index f920543a13..6588844884 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.h
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.h
@@ -4,6 +4,6 @@
namespace NFq::NRowDispatcher {
-TValueStatus<ITopicParser::TPtr> CreateRawParser(IParsedDataConsumer::TPtr consumer);
+TValueStatus<ITopicParser::TPtr> CreateRawParser(IParsedDataConsumer::TPtr consumer, const TCountersDesc& counters);
} // namespace NFq::NRowDispatcher
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp
index d7b3812f87..b65911e793 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp
@@ -150,7 +150,7 @@ TBaseFixture::TBatch& TBaseFixture::TBatch::AddRow(TRow row) {
//// TBaseFixture
TBaseFixture::TBaseFixture()
- : TTypeParser(__LOCATION__)
+ : TTypeParser(__LOCATION__, {})
, MemoryInfo("TBaseFixture alloc")
, HolderFactory(std::make_unique<NKikimr::NMiniKQL::THolderFactory>(Alloc.Ref(), MemoryInfo))
, Runtime(1)
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp
index e2d46d5367..0f2d59d810 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp
@@ -162,7 +162,7 @@ public:
protected:
TValueStatus<ITopicParser::TPtr> CreateParser() override {
- return CreateJsonParser(ParserHandler, Config);
+ return CreateJsonParser(ParserHandler, Config, {});
}
public:
@@ -172,7 +172,7 @@ public:
class TRawParserFixture : public TBaseParserFixture {
protected:
TValueStatus<ITopicParser::TPtr> CreateParser() override {
- return CreateRawParser(ParserHandler);
+ return CreateRawParser(ParserHandler, {});
}
};
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
index 64126ef800..1809bc6bcc 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
@@ -280,6 +280,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
TString Tenant;
NFq::NRowDispatcher::IActorFactory::TPtr ActorFactory;
const ::NMonitoring::TDynamicCounterPtr Counters;
+ const ::NMonitoring::TDynamicCounterPtr CountersRoot;
TRowDispatcherMetrics Metrics;
NYql::IPqGateway::TPtr PqGateway;
NActors::TMon* Monitoring;
@@ -358,6 +359,7 @@ public:
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
+ const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring = nullptr);
@@ -435,6 +437,7 @@ TRowDispatcher::TRowDispatcher(
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
+ const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring)
: Config(config)
@@ -445,6 +448,7 @@ TRowDispatcher::TRowDispatcher(
, Tenant(tenant)
, ActorFactory(actorFactory)
, Counters(counters)
+ , CountersRoot(countersRoot)
, Metrics(counters)
, PqGateway(pqGateway)
, Monitoring(monitoring)
@@ -788,6 +792,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
ev->Get()->Record.GetToken(),
source.GetAddBearerToToken()),
Counters,
+ CountersRoot,
PqGateway,
MaxSessionBufferSizeBytes
);
@@ -1055,6 +1060,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
+ const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring)
{
@@ -1066,6 +1072,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
tenant,
actorFactory,
counters,
+ countersRoot,
pqGateway,
monitoring));
}
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h
index f956ac3d8b..947afcbb79 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h
@@ -27,6 +27,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
+ const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring = nullptr);
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp
index 65a63bdea2..bc406e6f62 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp
@@ -17,7 +17,8 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
const TString& tenant,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway,
- NActors::TMon* monitoring)
+ NActors::TMon* monitoring,
+ ::NMonitoring::TDynamicCounterPtr countersRoot)
{
return NewRowDispatcher(
config,
@@ -27,6 +28,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
tenant,
NFq::NRowDispatcher::CreateActorFactory(),
counters,
+ countersRoot,
pqGateway,
monitoring);
}
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h
index 055240eb2e..ebcce4b1e8 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h
@@ -27,6 +27,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
const TString& tenant,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway,
- NActors::TMon* monitoring = nullptr);
+ NActors::TMon* monitoring = nullptr,
+ ::NMonitoring::TDynamicCounterPtr countersRoot = MakeIntrusive<::NMonitoring::TDynamicCounters>());
} // namespace NFq
diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
index 0baf82b93b..d44dd18710 100644
--- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
@@ -252,6 +252,7 @@ private:
TStats ClientsStats;
TTopicSessionMetrics Metrics;
const ::NMonitoring::TDynamicCounterPtr Counters;
+ const ::NMonitoring::TDynamicCounterPtr CountersRoot;
public:
explicit TTopicSession(
@@ -266,6 +267,7 @@ public:
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
+ const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
ui64 maxBufferSize);
@@ -345,6 +347,7 @@ TTopicSession::TTopicSession(
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
+ const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
ui64 maxBufferSize)
: ReadGroup(readGroup)
@@ -362,6 +365,7 @@ TTopicSession::TTopicSession(
, BufferSize(maxBufferSize)
, LogPrefix("TopicSession")
, Counters(counters)
+ , CountersRoot(countersRoot)
{}
void TTopicSession::Bootstrap() {
@@ -718,7 +722,12 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive<TClientsInfo>(*this, LogPrefix, handlerSettings, ev, Counters, ReadGroup)}).first->second;
auto formatIt = FormatHandlers.find(handlerSettings);
if (formatIt == FormatHandlers.end()) {
- formatIt = FormatHandlers.insert({handlerSettings, CreateTopicFormatHandler(ActorContext(), FormatHandlerConfig, handlerSettings, Metrics.PartitionGroup)}).first;
+ formatIt = FormatHandlers.insert({handlerSettings, CreateTopicFormatHandler(
+ ActorContext(),
+ FormatHandlerConfig,
+ handlerSettings,
+ {.CountersRoot = CountersRoot, .CountersSubgroup = Metrics.PartitionGroup}
+ )}).first;
}
if (auto status = formatIt->second->AddClient(clientInfo); status.IsFail()) {
@@ -881,9 +890,10 @@ std::unique_ptr<IActor> NewTopicSession(
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
+ const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
ui64 maxBufferSize) {
- return std::unique_ptr<IActor>(new TTopicSession(readGroup, topicPath, endpoint, database, config, rowDispatcherActorId, compileServiceActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, pqGateway, maxBufferSize));
+ return std::unique_ptr<IActor>(new TTopicSession(readGroup, topicPath, endpoint, database, config, rowDispatcherActorId, compileServiceActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, countersRoot, pqGateway, maxBufferSize));
}
} // namespace NFq
diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.h b/ydb/core/fq/libs/row_dispatcher/topic_session.h
index 470dbbef29..d24201d0d6 100644
--- a/ydb/core/fq/libs/row_dispatcher/topic_session.h
+++ b/ydb/core/fq/libs/row_dispatcher/topic_session.h
@@ -27,6 +27,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
+ const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
ui64 maxBufferSize);
diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
index e767cf0573..e2807adbee 100644
--- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
@@ -38,6 +38,7 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory {
NYdb::TDriver /*driver*/,
std::shared_ptr<NYdb::ICredentialsProviderFactory> /*credentialsProviderFactory*/,
const ::NMonitoring::TDynamicCounterPtr& /*counters*/,
+ const ::NMonitoring::TDynamicCounterPtr& /*counters*/,
const NYql::IPqGateway::TPtr& /*pqGateway*/,
ui64 /*maxBufferSize*/) const override {
auto actorId = Runtime.AllocateEdgeActor();
@@ -94,6 +95,7 @@ public:
"Tenant",
TestActorFactory,
MakeIntrusive<NMonitoring::TDynamicCounters>(),
+ MakeIntrusive<NMonitoring::TDynamicCounters>(),
CreatePqNativeGateway(pqServices)
).release());
diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
index 7016ca9c2b..82eb073241 100644
--- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
@@ -71,6 +71,7 @@ public:
Driver,
CredentialsProviderFactory,
MakeIntrusive<NMonitoring::TDynamicCounters>(),
+ MakeIntrusive<NMonitoring::TDynamicCounters>(),
CreatePqNativeGateway(pqServices),
16000000
).release());