aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-04-07 23:24:41 +0300
committergvit <gvit@ydb.tech>2023-04-07 23:24:41 +0300
commitd055c2573aa4521f926c24de1780f796b0dfe923 (patch)
treecde93bd6ce3d2b0627b710531d7234662ea762a6
parent7b6afb1aca17d4eb10fa73d470db78b0317c1351 (diff)
downloadydb-d055c2573aa4521f926c24de1780f796b0dfe923.tar.gz
fix performance issue
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp12
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp8
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h2
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp14
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.h4
-rw-r--r--ydb/core/kqp/node_service/kqp_node_ut.cpp5
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp7
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp9
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.h3
10 files changed, 39 insertions, 29 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index d4f99134dfb..5169d48cfee 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -127,9 +127,9 @@ public:
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- NYql::IHTTPGateway::TPtr httpGateway)
+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, TWilsonKqp::DataExecuter, "DataExecuter")
- , HttpGateway(std::move(httpGateway))
+ , AsyncIoFactory(std::move(asyncIoFactory))
, StreamResult(streamResult)
{
YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);
@@ -1633,7 +1633,7 @@ private:
return false;
};
- auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), CreateKqpAsyncIoFactory(Counters->Counters, HttpGateway),
+ auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), AsyncIoFactory,
AppData()->FunctionRegistry, settings, limits);
auto computeActorId = shareMailbox ? RegisterWithSameMailbox(computeActor) : Register(computeActor);
@@ -2342,7 +2342,7 @@ private:
}
private:
- NYql::IHTTPGateway::TPtr HttpGateway;
+ NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
bool StreamResult = false;
bool HasStreamLookup = false;
@@ -2381,9 +2381,9 @@ private:
} // namespace
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
- TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::IHTTPGateway::TPtr httpGateway)
+ TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
{
- return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, std::move(httpGateway));
+ return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, std::move(asyncIoFactory));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index ffc106c28f6..1c8f7548700 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -8,7 +8,7 @@
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/kqp.pb.h>
-#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
namespace NKikimr {
namespace NKqp {
@@ -85,7 +85,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- NYql::IHTTPGateway::TPtr httpGateway);
+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory);
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral(
IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 47e93cbf112..1b291aba7d0 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -86,12 +86,12 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- NYql::IHTTPGateway::TPtr httpGateway)
+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
YQL_ENSURE(request.EraseLocks);
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(httpGateway));
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory));
}
TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
@@ -107,13 +107,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
switch (*txsType) {
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(httpGateway));
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory));
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig);
case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(httpGateway));
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory));
default:
YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 08bb56cf661..4e269ba263b 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -1091,7 +1091,7 @@ private:
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
- const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::IHTTPGateway::TPtr httpGateway);
+ const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory);
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index 871df1546d5..739efcc57d0 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -88,11 +88,11 @@ public:
}
TKqpNodeService(const NKikimrConfig::TTableServiceConfig& config, const TIntrusivePtr<TKqpCounters>& counters,
- IKqpNodeComputeActorFactory* caFactory, NYql::IHTTPGateway::TPtr httpGateway)
+ IKqpNodeComputeActorFactory* caFactory, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
: Config(config.GetResourceManager())
, Counters(counters)
, CaFactory(caFactory)
- , HttpGateway(std::move(httpGateway))
+ , AsyncIoFactory(std::move(asyncIoFactory))
{}
void Bootstrap() {
@@ -326,12 +326,12 @@ private:
IActor* computeActor;
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask),
- CreateKqpAsyncIoFactory(Counters, HttpGateway), AppData()->FunctionRegistry, runtimeSettings, memoryLimits, scanPolicy,
+ AsyncIoFactory, AppData()->FunctionRegistry, runtimeSettings, memoryLimits, scanPolicy,
Counters, NWilson::TTraceId(ev->TraceId));
taskCtx.ComputeActorId = Register(computeActor);
} else {
if (Y_LIKELY(!CaFactory)) {
- computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateKqpAsyncIoFactory(Counters, HttpGateway),
+ computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), AsyncIoFactory,
AppData()->FunctionRegistry, runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId));
taskCtx.ComputeActorId = Register(computeActor);
} else {
@@ -514,7 +514,7 @@ private:
TIntrusivePtr<TKqpCounters> Counters;
IKqpNodeComputeActorFactory* CaFactory;
NRm::IKqpResourceManager* ResourceManager_ = nullptr;
- NYql::IHTTPGateway::TPtr HttpGateway;
+ NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
//state sharded by TxId
std::array<NKqpNode::TState, BucketsCount> Buckets;
@@ -524,9 +524,9 @@ private:
} // anonymous namespace
IActor* CreateKqpNodeService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
- TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory, NYql::IHTTPGateway::TPtr httpGateway)
+ TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
{
- return new TKqpNodeService(tableServiceConfig, counters, caFactory, std::move(httpGateway));
+ return new TKqpNodeService(tableServiceConfig, counters, caFactory, std::move(asyncIoFactory));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/node_service/kqp_node_service.h b/ydb/core/kqp/node_service/kqp_node_service.h
index 56ea1361d24..a53def3efbf 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.h
+++ b/ydb/core/kqp/node_service/kqp_node_service.h
@@ -5,7 +5,7 @@
#include <ydb/core/protos/config.pb.h>
#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h>
-#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <library/cpp/actors/core/actor.h>
@@ -66,7 +66,7 @@ struct IKqpNodeComputeActorFactory {
};
NActors::IActor* CreateKqpNodeService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
- TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory = nullptr, NYql::IHTTPGateway::TPtr httpGateway = nullptr);
+ TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory = nullptr, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory = nullptr);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp
index 64719646e0f..43f3f8388e4 100644
--- a/ydb/core/kqp/node_service/kqp_node_ut.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp
@@ -2,6 +2,7 @@
#include <ydb/core/cms/console/console.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/node_service/kqp_node_service.h>
+#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/core/tablet/resource_broker_impl.h>
@@ -170,7 +171,9 @@ public:
Runtime->EnableScheduleForActor(ResourceManagerActorId, true);
WaitForBootstrap();
- auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get(), NYql::IHTTPGateway::Make());
+ auto httpGateway = NYql::IHTTPGateway::Make();
+ auto asyncIoFactory = CreateKqpAsyncIoFactory(KqpCounters, httpGateway);
+ auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get(), asyncIoFactory);
KqpNodeActorId = Runtime->Register(kqpNode);
Runtime->EnableScheduleForActor(KqpNodeActorId, true);
WaitForBootstrap();
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 3a38836c398..67406873e4b 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -19,6 +19,7 @@
#include <ydb/public/lib/operation_id/operation_id.h>
#include <ydb/core/node_whiteboard/node_whiteboard.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
+#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
#include <ydb/library/yql/utils/actor_log/log.h>
#include <ydb/library/yql/core/services/mounts/yql_mounts.h>
@@ -168,6 +169,7 @@ public:
void Bootstrap() {
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(KQP_PROVIDER));
Counters = MakeIntrusive<TKqpCounters>(AppData()->Counters, &TlsActivationContext->AsActorContext());
+ AsyncIoFactory = CreateKqpAsyncIoFactory(Counters, HttpGateway);
ModuleResolverState = MakeIntrusive<TModuleResolverState>();
LocalSessions = std::make_unique<TLocalSessionsRegistry>(AppData()->RandomProvider);
@@ -207,7 +209,7 @@ public:
TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService(
MakeKqpCompileServiceID(SelfId().NodeId()), CompileService);
- KqpNodeService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpNodeService(TableServiceConfig, Counters, nullptr, HttpGateway));
+ KqpNodeService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpNodeService(TableServiceConfig, Counters, nullptr, AsyncIoFactory));
TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService(
MakeKqpNodeServiceID(SelfId().NodeId()), KqpNodeService);
@@ -1233,7 +1235,7 @@ private:
auto config = CreateConfig(KqpSettings, workerSettings);
- IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, HttpGateway, ModuleResolverState, Counters);
+ IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, HttpGateway, AsyncIoFactory, ModuleResolverState, Counters);
auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId);
TKqpSessionInfo* sessionInfo = LocalSessions->Create(
sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration());
@@ -1355,6 +1357,7 @@ private:
TActorId SpillingService;
TActorId WhiteBoardService;
NKikimrKqp::TKqpProxyNodeResources NodeResources;
+ NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
};
} // namespace
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 16bc1f3d6fb..4e48c11f992 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -116,12 +116,14 @@ public:
TKqpSessionActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings,
const TKqpWorkerSettings& workerSettings, NYql::IHTTPGateway::TPtr httpGateway,
+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters)
: Owner(owner)
, SessionId(sessionId)
, Counters(counters)
, Settings(workerSettings)
, HttpGateway(std::move(httpGateway))
+ , AsyncIoFactory(std::move(asyncIoFactory))
, ModuleResolverState(std::move(moduleResolverState))
, KqpSettings(kqpSettings)
, Config(CreateConfig(kqpSettings, workerSettings))
@@ -978,7 +980,7 @@ public:
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterRetriesConfig(),
- HttpGateway);
+ AsyncIoFactory);
auto exId = RegisterWithSameMailbox(executerActor);
LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);
@@ -1902,6 +1904,7 @@ private:
TIntrusivePtr<TKqpRequestCounters> RequestCounters;
TKqpWorkerSettings Settings;
NYql::IHTTPGateway::TPtr HttpGateway;
+ NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
TIntrusivePtr<TModuleResolverState> ModuleResolverState;
TKqpSettings::TConstPtr KqpSettings;
std::optional<TActorId> WorkerId;
@@ -1923,10 +1926,10 @@ private:
IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,
const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings,
- NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState,
+ NYql::IHTTPGateway::TPtr httpGateway, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TIntrusivePtr<TModuleResolverState> moduleResolverState,
TIntrusivePtr<TKqpCounters> counters)
{
- return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), std::move(moduleResolverState), counters);
+ return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), std::move(asyncIoFactory), std::move(moduleResolverState), counters);
}
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h
index 7ac6532f3fc..36b50632da5 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.h
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.h
@@ -4,6 +4,7 @@
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
#include <library/cpp/actors/core/actorid.h>
@@ -28,7 +29,7 @@ struct TKqpWorkerSettings {
IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,
const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings,
- NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState,
+ NYql::IHTTPGateway::TPtr httpGateway, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TIntrusivePtr<TModuleResolverState> moduleResolverState,
TIntrusivePtr<TKqpCounters> counters);
} // namespace NKikimr::NKqp