diff options
author | gvit <gvit@ydb.tech> | 2023-04-07 23:24:41 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-04-07 23:24:41 +0300 |
commit | d055c2573aa4521f926c24de1780f796b0dfe923 (patch) | |
tree | cde93bd6ce3d2b0627b710531d7234662ea762a6 | |
parent | 7b6afb1aca17d4eb10fa73d470db78b0317c1351 (diff) | |
download | ydb-d055c2573aa4521f926c24de1780f796b0dfe923.tar.gz |
fix performance issue
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_service.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_service.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_ut.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.h | 3 |
10 files changed, 39 insertions, 29 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index d4f99134dfb..5169d48cfee 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -127,9 +127,9 @@ public: const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - NYql::IHTTPGateway::TPtr httpGateway) + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory) : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, TWilsonKqp::DataExecuter, "DataExecuter") - , HttpGateway(std::move(httpGateway)) + , AsyncIoFactory(std::move(asyncIoFactory)) , StreamResult(streamResult) { YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED); @@ -1633,7 +1633,7 @@ private: return false; }; - auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), CreateKqpAsyncIoFactory(Counters->Counters, HttpGateway), + auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), AsyncIoFactory, AppData()->FunctionRegistry, settings, limits); auto computeActorId = shareMailbox ? RegisterWithSameMailbox(computeActor) : Register(computeActor); @@ -2342,7 +2342,7 @@ private: } private: - NYql::IHTTPGateway::TPtr HttpGateway; + NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; bool StreamResult = false; bool HasStreamLookup = false; @@ -2381,9 +2381,9 @@ private: } // namespace IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, - TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::IHTTPGateway::TPtr httpGateway) + TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory) { - return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, std::move(httpGateway)); + return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, std::move(asyncIoFactory)); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index ffc106c28f6..1c8f7548700 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -8,7 +8,7 @@ #include <ydb/core/tx/long_tx_service/public/lock_handle.h> #include <ydb/core/protos/config.pb.h> #include <ydb/core/protos/kqp.pb.h> -#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> namespace NKikimr { namespace NKqp { @@ -85,7 +85,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - NYql::IHTTPGateway::TPtr httpGateway); + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory); std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral( IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 47e93cbf112..1b291aba7d0 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -86,12 +86,12 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - NYql::IHTTPGateway::TPtr httpGateway) + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction YQL_ENSURE(request.EraseLocks); - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(httpGateway)); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory)); } TMaybe<NKqpProto::TKqpPhyTx::EType> txsType; @@ -107,13 +107,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt switch (*txsType) { case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: case NKqpProto::TKqpPhyTx::TYPE_DATA: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(httpGateway)); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory)); case NKqpProto::TKqpPhyTx::TYPE_SCAN: return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig); case NKqpProto::TKqpPhyTx::TYPE_GENERIC: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(httpGateway)); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory)); default: YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 08bb56cf661..4e269ba263b 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1091,7 +1091,7 @@ private: IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::IHTTPGateway::TPtr httpGateway); + const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 871df1546d5..739efcc57d0 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -88,11 +88,11 @@ public: } TKqpNodeService(const NKikimrConfig::TTableServiceConfig& config, const TIntrusivePtr<TKqpCounters>& counters, - IKqpNodeComputeActorFactory* caFactory, NYql::IHTTPGateway::TPtr httpGateway) + IKqpNodeComputeActorFactory* caFactory, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory) : Config(config.GetResourceManager()) , Counters(counters) , CaFactory(caFactory) - , HttpGateway(std::move(httpGateway)) + , AsyncIoFactory(std::move(asyncIoFactory)) {} void Bootstrap() { @@ -326,12 +326,12 @@ private: IActor* computeActor; if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) { computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask), - CreateKqpAsyncIoFactory(Counters, HttpGateway), AppData()->FunctionRegistry, runtimeSettings, memoryLimits, scanPolicy, + AsyncIoFactory, AppData()->FunctionRegistry, runtimeSettings, memoryLimits, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId)); taskCtx.ComputeActorId = Register(computeActor); } else { if (Y_LIKELY(!CaFactory)) { - computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateKqpAsyncIoFactory(Counters, HttpGateway), + computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), AsyncIoFactory, AppData()->FunctionRegistry, runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId)); taskCtx.ComputeActorId = Register(computeActor); } else { @@ -514,7 +514,7 @@ private: TIntrusivePtr<TKqpCounters> Counters; IKqpNodeComputeActorFactory* CaFactory; NRm::IKqpResourceManager* ResourceManager_ = nullptr; - NYql::IHTTPGateway::TPtr HttpGateway; + NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; //state sharded by TxId std::array<NKqpNode::TState, BucketsCount> Buckets; @@ -524,9 +524,9 @@ private: } // anonymous namespace IActor* CreateKqpNodeService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig, - TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory, NYql::IHTTPGateway::TPtr httpGateway) + TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory) { - return new TKqpNodeService(tableServiceConfig, counters, caFactory, std::move(httpGateway)); + return new TKqpNodeService(tableServiceConfig, counters, caFactory, std::move(asyncIoFactory)); } } // namespace NKqp diff --git a/ydb/core/kqp/node_service/kqp_node_service.h b/ydb/core/kqp/node_service/kqp_node_service.h index 56ea1361d24..a53def3efbf 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.h +++ b/ydb/core/kqp/node_service/kqp_node_service.h @@ -5,7 +5,7 @@ #include <ydb/core/protos/config.pb.h> #include <ydb/library/yql/dq/runtime/dq_tasks_runner.h> -#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> #include <ydb/library/yql/public/issue/yql_issue.h> #include <library/cpp/actors/core/actor.h> @@ -66,7 +66,7 @@ struct IKqpNodeComputeActorFactory { }; NActors::IActor* CreateKqpNodeService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig, - TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory = nullptr, NYql::IHTTPGateway::TPtr httpGateway = nullptr); + TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory = nullptr, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory = nullptr); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp index 64719646e0f..43f3f8388e4 100644 --- a/ydb/core/kqp/node_service/kqp_node_ut.cpp +++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp @@ -2,6 +2,7 @@ #include <ydb/core/cms/console/console.h> #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/node_service/kqp_node_service.h> +#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h> #include <ydb/core/kqp/rm_service/kqp_rm_service.h> #include <ydb/core/tablet/resource_broker_impl.h> @@ -170,7 +171,9 @@ public: Runtime->EnableScheduleForActor(ResourceManagerActorId, true); WaitForBootstrap(); - auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get(), NYql::IHTTPGateway::Make()); + auto httpGateway = NYql::IHTTPGateway::Make(); + auto asyncIoFactory = CreateKqpAsyncIoFactory(KqpCounters, httpGateway); + auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get(), asyncIoFactory); KqpNodeActorId = Runtime->Register(kqpNode); Runtime->EnableScheduleForActor(KqpNodeActorId, true); WaitForBootstrap(); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 3a38836c398..67406873e4b 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -19,6 +19,7 @@ #include <ydb/public/lib/operation_id/operation_id.h> #include <ydb/core/node_whiteboard/node_whiteboard.h> #include <ydb/core/ydb_convert/ydb_convert.h> +#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h> #include <ydb/library/yql/utils/actor_log/log.h> #include <ydb/library/yql/core/services/mounts/yql_mounts.h> @@ -168,6 +169,7 @@ public: void Bootstrap() { NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(KQP_PROVIDER)); Counters = MakeIntrusive<TKqpCounters>(AppData()->Counters, &TlsActivationContext->AsActorContext()); + AsyncIoFactory = CreateKqpAsyncIoFactory(Counters, HttpGateway); ModuleResolverState = MakeIntrusive<TModuleResolverState>(); LocalSessions = std::make_unique<TLocalSessionsRegistry>(AppData()->RandomProvider); @@ -207,7 +209,7 @@ public: TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService( MakeKqpCompileServiceID(SelfId().NodeId()), CompileService); - KqpNodeService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpNodeService(TableServiceConfig, Counters, nullptr, HttpGateway)); + KqpNodeService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpNodeService(TableServiceConfig, Counters, nullptr, AsyncIoFactory)); TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService( MakeKqpNodeServiceID(SelfId().NodeId()), KqpNodeService); @@ -1233,7 +1235,7 @@ private: auto config = CreateConfig(KqpSettings, workerSettings); - IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, HttpGateway, ModuleResolverState, Counters); + IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, HttpGateway, AsyncIoFactory, ModuleResolverState, Counters); auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId); TKqpSessionInfo* sessionInfo = LocalSessions->Create( sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration()); @@ -1355,6 +1357,7 @@ private: TActorId SpillingService; TActorId WhiteBoardService; NKikimrKqp::TKqpProxyNodeResources NodeResources; + NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; }; } // namespace diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 16bc1f3d6fb..4e48c11f992 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -116,12 +116,14 @@ public: TKqpSessionActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, NYql::IHTTPGateway::TPtr httpGateway, + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters) : Owner(owner) , SessionId(sessionId) , Counters(counters) , Settings(workerSettings) , HttpGateway(std::move(httpGateway)) + , AsyncIoFactory(std::move(asyncIoFactory)) , ModuleResolverState(std::move(moduleResolverState)) , KqpSettings(kqpSettings) , Config(CreateConfig(kqpSettings, workerSettings)) @@ -978,7 +980,7 @@ public: auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(), RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterRetriesConfig(), - HttpGateway); + AsyncIoFactory); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); @@ -1902,6 +1904,7 @@ private: TIntrusivePtr<TKqpRequestCounters> RequestCounters; TKqpWorkerSettings Settings; NYql::IHTTPGateway::TPtr HttpGateway; + NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; TIntrusivePtr<TModuleResolverState> ModuleResolverState; TKqpSettings::TConstPtr KqpSettings; std::optional<TActorId> WorkerId; @@ -1923,10 +1926,10 @@ private: IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, - NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState, + NYql::IHTTPGateway::TPtr httpGateway, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters) { - return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), std::move(moduleResolverState), counters); + return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), std::move(asyncIoFactory), std::move(moduleResolverState), counters); } } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h index 7ac6532f3fc..36b50632da5 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.h +++ b/ydb/core/kqp/session_actor/kqp_session_actor.h @@ -4,6 +4,7 @@ #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/protos/config.pb.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> #include <library/cpp/actors/core/actorid.h> @@ -28,7 +29,7 @@ struct TKqpWorkerSettings { IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, - NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState, + NYql::IHTTPGateway::TPtr httpGateway, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters); } // namespace NKikimr::NKqp |