aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2022-10-10 19:49:19 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2022-10-10 19:49:19 +0300
commit3f1ac4455ebfdaf68d0dac1bca26110f014457cf (patch)
tree7960919cf3a8d0315df3e855b553cc72027db787
parent9482c913e2ec8adcdb94f0bf34a45b9017fd7add (diff)
downloadydb-3f1ac4455ebfdaf68d0dac1bca26110f014457cf.tar.gz
TRequestActor for control plane storage requests
Рефакторинг состоит в том, чтобы сделать возможным/удобным создание специальных акторов для обработки запроса в control plane storage. Сейчас практически все запросы в control plane storage обрабатываются как один вызов метода для доступа к базе и в колбеке на фьючу отправка ответа клиенту. Это не позволяет нам вставить какие-то акторные действия перед запросом в базу. Сделал рефакторинг, чтобы такие действия были возможны. Далее в планах в качестве основного кода для данного тикета вставить создание ресурса квотировщика перед запросом в базу на создание квери.
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp168
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/request_actor.h94
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp22
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp10
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp10
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h95
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp20
11 files changed, 239 insertions, 188 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp b/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp
index 0c589ceb7e..b8ac7a40e6 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp
@@ -107,7 +107,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth
const auto readQuery = readQueryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- TAsyncStatus status = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo);
+ TAsyncStatus status = ReadModifyWrite(readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo);
auto prepare = [response] { return *response; };
auto success = SendResponse<TEvControlPlaneStorage::TEvNodesHealthCheckResponse, Fq::Private::NodesHealthCheckResult>(
"NodesHealthCheckRequest - NodesHealthCheckResult",
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp b/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp
index 00814c6a03..2a7d4491ac 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp
@@ -1,6 +1,7 @@
#include "utils.h"
#include <ydb/public/lib/yq/scope.h>
+#include <ydb/core/yq/libs/control_plane_storage/request_actor.h>
#include <ydb/core/yq/libs/db_schema/db_schema.h>
#include <ydb/core/yq/libs/quota_manager/quota_manager.h>
#include <ydb/core/yq/libs/quota_manager/events/events.h>
@@ -41,21 +42,62 @@ struct TEvPrivate {
};
template <class TRequest, class TResponse, class TDerived>
-class TRateLimiterRequestActor : public NActors::TActor<TDerived> {
+class TRateLimiterRequestActor : public TControlPlaneRequestActor<TRequest, TResponse, TDerived> {
+ using TRequestActorBase = TControlPlaneRequestActor<TRequest, TResponse, TDerived>;
+
+protected:
+ using TRequestActorBase::ReplyWithError;
+
public:
- using TResponseEvent = TResponse;
-
- TRateLimiterRequestActor(TInstant startTime, typename TRequest::TPtr&& ev, TRequestCounters requestCounters, TDebugInfoPtr debugInfo)
- : NActors::TActor<TDerived>(&TDerived::StateFunc)
- , Request(std::move(ev))
- , RequestCounters(std::move(requestCounters))
- , DebugInfo(std::move(debugInfo))
- , QueryId(Request->Get()->Request.query_id().value())
- , OwnerId(Request->Get()->Request.owner_id())
- , StartTime(startTime)
+ TRateLimiterRequestActor(typename TRequest::TPtr&& ev, TRequestCounters requestCounters, TDebugInfoPtr debugInfo, TDbPool::TPtr dbPool, TYdbConnectionPtr ydbConnection)
+ : TRequestActorBase(std::move(ev), std::move(requestCounters), std::move(debugInfo), std::move(dbPool), std::move(ydbConnection))
+ , QueryId(this->Request->Get()->Request.query_id().value())
+ , OwnerId(this->Request->Get()->Request.owner_id())
{
}
+ void Start() {
+ auto& request = this->Request->Get()->Request;
+ const TString& scope = request.scope();
+ const TString& tenant = request.tenant();
+
+ if (NYql::TIssues issues = ValidateCreateOrDeleteRateLimiterResource(QueryId, scope, tenant, OwnerId)) {
+ CPS_LOG_W(TDerived::RequestTypeName << "Request: {" << request.DebugString() << "} validation FAILED: " << issues.ToOneLineString());
+ ReplyWithError(issues);
+ return;
+ }
+
+ this->Become(&TDerived::StateFunc);
+
+ TSqlQueryBuilder readQueryBuilder(this->YdbConnection->TablePathPrefix, TDerived::RequestTypeName);
+ readQueryBuilder.AddString("query_id", QueryId);
+ readQueryBuilder.AddString("scope", scope);
+ readQueryBuilder.AddString("tenant", tenant);
+ TStringBuilder text;
+ text <<
+ "SELECT `" SCOPE_COLUMN_NAME "`, `" OWNER_COLUMN_NAME "` FROM " PENDING_SMALL_TABLE_NAME
+ " WHERE `" TENANT_COLUMN_NAME "` = $tenant"
+ " AND `" SCOPE_COLUMN_NAME "` = $scope"
+ " AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
+
+ "SELECT `" INTERNAL_COLUMN_NAME "`";
+ if constexpr (TDerived::IsCreateRequest) {
+ text << ", `" QUERY_COLUMN_NAME "`";
+ }
+ text << " FROM " QUERIES_TABLE_NAME
+ " WHERE `" SCOPE_COLUMN_NAME "` = $scope"
+ " AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n";
+ readQueryBuilder.AddText(text);
+
+ const auto query = readQueryBuilder.Build();
+ auto [readStatus, resultSets] = this->Read(query.Sql, query.Params, this->RequestCounters, this->DebugInfo);
+ readStatus.Subscribe(
+ [resultSets = resultSets, actorSystem = NActors::TActivationContext::ActorSystem(), selfId = this->SelfId()] (const TAsyncStatus& status) {
+ actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvDbRequestResult(status, std::move(resultSets))));
+ }
+ );
+ }
+
void Handle(TEvPrivate::TEvDbRequestResult::TPtr& ev) {
const auto& status = ev->Get()->Status.GetValueSync();
CPS_LOG_D(TDerived::RequestTypeName << "Request. Got response from database: " << status.GetStatus());
@@ -121,47 +163,9 @@ public:
}
}
- void ReplyWithError(const TString& msg) {
- NYql::TIssues issues;
- issues.AddIssue(msg);
- ReplyWithError(issues);
- }
-
- void ReplyWithError(const NYql::TIssues& issues) {
- SendResponseEventAndPassAway(std::make_unique<TResponse>(issues), false);
- }
-
- void Reply(const typename TResponse::TProto& proto) {
- SendResponseEventAndPassAway(std::make_unique<TResponse>(proto), true);
- }
-
- void SendResponseEventAndPassAway(std::unique_ptr<TResponse> event, bool success) {
- event->DebugInfo = std::move(DebugInfo);
-
- RequestCounters.Common->ResponseBytes->Add(event->GetByteSize());
- RequestCounters.IncInFly();
- if (success) {
- RequestCounters.IncOk();
- } else {
- RequestCounters.IncError();
- }
- const TDuration duration = TInstant::Now() - StartTime;
- RequestCounters.Common->LatencyMs->Collect(duration.MilliSeconds());
-
- TDerived::LwProbe(QueryId, duration, success);
-
- this->Send(Request->Sender, event.release(), 0, Request->Cookie);
-
- this->PassAway();
- }
-
protected:
- const typename TRequest::TPtr Request;
- TRequestCounters RequestCounters;
- TDebugInfoPtr DebugInfo;
const TString QueryId;
const TString OwnerId;
- const TInstant StartTime;
TString CloudId;
TString FolderId;
TMaybe<double> QueryLimit;
@@ -208,12 +212,8 @@ public:
}
}
- static void LwProbe(const TString& queryId, TInstant startTime, bool success) {
- LwProbe(queryId, TInstant::Now() - startTime, success);
- }
-
- static void LwProbe(const TString& queryId, TDuration duration, bool success) {
- LWPROBE(CreateRateLimiterResourceRequest, queryId, duration, success);
+ void LwProbe(bool success) {
+ LWPROBE(CreateRateLimiterResourceRequest, QueryId, GetRequestDuration(), success);
}
static const TString RequestTypeName;
@@ -253,12 +253,8 @@ public:
}
}
- static void LwProbe(const TString& queryId, TInstant startTime, bool success) {
- LwProbe(queryId, TInstant::Now() - startTime, success);
- }
-
- static void LwProbe(const TString& queryId, TDuration duration, bool success) {
- LWPROBE(DeleteRateLimiterResourceRequest, queryId, duration, success);
+ void LwProbe(bool success) {
+ LWPROBE(DeleteRateLimiterResourceRequest, QueryId, GetRequestDuration(), success);
}
static const TString RequestTypeName;
@@ -271,59 +267,11 @@ const TString TRateLimiterDeleteRequest::RequestTypeName = "DeleteRateLimiterRes
template <class TEventPtr, class TRequestActor, TYdbControlPlaneStorageActor::ERequestTypeCommon requestType>
void TYdbControlPlaneStorageActor::HandleRateLimiterImpl(TEventPtr& ev) {
- const TInstant startTime = TInstant::Now();
TRequestCounters requestCounters{nullptr, Counters.GetCommonCounters(requestType)};
- requestCounters.IncInFly();
- requestCounters.Common->RequestBytes->Add(ev->Get()->GetByteSize());
-
- auto& request = ev->Get()->Request;
- const TString& queryId = request.query_id().value();
- const TString& scope = request.scope();
- const TString& tenant = request.tenant();
- const TString& owner = request.owner_id();
-
- CPS_LOG_T(TRequestActor::RequestTypeName << "Request: {" << request.DebugString() << "}");
-
- NYql::TIssues issues = ValidateCreateOrDeleteRateLimiterResource(queryId, scope, tenant, owner);
- if (issues) {
- CPS_LOG_W(TRequestActor::RequestTypeName << "Request: {" << request.DebugString() << "} validation FAILED: " << issues.ToOneLineString());
- const TDuration delta = TInstant::Now() - startTime;
- SendResponseIssues<typename TRequestActor::TResponseEvent>(ev->Sender, issues, ev->Cookie, delta, requestCounters);
- TRequestActor::LwProbe(queryId, startTime, false);
- return;
- }
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- const NActors::TActorId requestActor = Register(new TRequestActor(startTime, std::move(ev), requestCounters, debugInfo));
-
- TSqlQueryBuilder readQueryBuilder(YdbConnection->TablePathPrefix, TRequestActor::RequestTypeName);
- readQueryBuilder.AddString("query_id", queryId);
- readQueryBuilder.AddString("scope", scope);
- readQueryBuilder.AddString("tenant", tenant);
- TStringBuilder text;
- text <<
- "SELECT `" SCOPE_COLUMN_NAME "`, `" OWNER_COLUMN_NAME "` FROM " PENDING_SMALL_TABLE_NAME
- " WHERE `" TENANT_COLUMN_NAME "` = $tenant"
- " AND `" SCOPE_COLUMN_NAME "` = $scope"
- " AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
-
- "SELECT `" INTERNAL_COLUMN_NAME "`";
- if constexpr (TRequestActor::IsCreateRequest) {
- text << ", `" QUERY_COLUMN_NAME "`";
- }
- text << " FROM " QUERIES_TABLE_NAME
- " WHERE `" SCOPE_COLUMN_NAME "` = $scope"
- " AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n";
- readQueryBuilder.AddText(text);
-
- const auto query = readQueryBuilder.Build();
- auto [readStatus, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo);
- readStatus.Subscribe(
- [resultSets = resultSets, actorSystem = NActors::TActivationContext::ActorSystem(), requestActor] (const TAsyncStatus& status) {
- actorSystem->Send(new IEventHandle(requestActor, requestActor, new TEvPrivate::TEvDbRequestResult(status, std::move(resultSets))));
- }
- );
+ const NActors::TActorId requestActor = Register(new TRequestActor(std::move(ev), std::move(requestCounters), std::move(debugInfo), DbPool, YdbConnection));
}
void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateRateLimiterResourceRequest::TPtr& ev) {
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
index f33261b49a..bde301450a 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
@@ -300,7 +300,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
};
const auto query = queryBuilder.Build();
- auto [readStatus, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo, TTxSettings::StaleRO());
+ auto [readStatus, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo, TTxSettings::StaleRO());
auto result = readStatus.Apply(
[=,
resultSets=resultSets,
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
index 4cb71db428..06cf2c3983 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
@@ -475,7 +475,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq
auto prepareParams = std::get<2>(pingTaskParams);
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto result = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery, readParams, prepareParams, requestCounters, debugInfo);
+ auto result = ReadModifyWrite(readQuery, readParams, prepareParams, requestCounters, debugInfo);
auto prepare = [response] { return *response; };
auto success = SendResponse<TEvControlPlaneStorage::TEvPingTaskResponse, Fq::Private::PingTaskResult>(
"PingTaskRequest - PingTaskResult",
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp
index c7cdbc3738..fe33852013 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp
@@ -66,7 +66,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvWriteResult
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- TAsyncStatus result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo);
+ TAsyncStatus result = Write(query.Sql, query.Params, requestCounters, debugInfo);
auto prepare = [response] { return *response; };
auto success = SendResponse<TEvControlPlaneStorage::TEvWriteResultDataResponse, Fq::Private::WriteTaskResultResult>(
"WriteResultDataRequest - WriteResultDataResult",
diff --git a/ydb/core/yq/libs/control_plane_storage/request_actor.h b/ydb/core/yq/libs/control_plane_storage/request_actor.h
new file mode 100644
index 0000000000..1197032fb1
--- /dev/null
+++ b/ydb/core/yq/libs/control_plane_storage/request_actor.h
@@ -0,0 +1,94 @@
+#pragma once
+#include "util.h"
+
+#include <ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h>
+#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
+#include <ydb/core/yq/libs/control_plane_storage/schema.h>
+#include <ydb/core/yq/libs/db_schema/db_schema.h>
+#include <ydb/core/yq/libs/shared_resources/db_exec.h>
+#include <ydb/public/api/protos/yq.pb.h>
+#include <ydb/public/lib/yq/scope.h>
+#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
+
+#include <util/datetime/base.h>
+
+namespace NYq {
+
+template <class TRequest, class TResponse, class TDerived>
+class TControlPlaneRequestActor : public NActors::TActorBootstrapped<TDerived>,
+ public TDbRequester
+{
+public:
+ using TResponseEvent = TResponse;
+ using TRequestEvent = TRequest;
+ using TBaseActor = NActors::TActorBootstrapped<TDerived>;
+
+ using TBaseActor::Send;
+
+protected:
+ TControlPlaneRequestActor(typename TRequestEvent::TPtr&& ev, TRequestCounters requestCounters, TDebugInfoPtr debugInfo, TDbPool::TPtr dbPool, TYdbConnectionPtr ydbConnection)
+ : TDbRequester(std::move(dbPool), std::move(ydbConnection))
+ , Request(std::move(ev))
+ , RequestCounters(std::move(requestCounters))
+ , DebugInfo(std::move(debugInfo))
+ {
+ RequestCounters.IncInFly();
+ RequestCounters.Common->RequestBytes->Add(Request->Get()->GetByteSize());
+ }
+
+public:
+ void Bootstrap() {
+ CPS_LOG_T(TDerived::RequestTypeName << "Request: {" << Request->Get()->Request.DebugString() << "}");
+
+ AsDerived()->Start();
+ }
+
+protected:
+ void ReplyWithError(const TString& msg) {
+ NYql::TIssues issues;
+ issues.AddIssue(msg);
+ ReplyWithError(issues);
+ }
+
+ void ReplyWithError(const NYql::TIssues& issues) {
+ SendResponseEventAndPassAway(std::make_unique<TResponse>(issues), false);
+ }
+
+ void Reply(const typename TResponse::TProto& proto) {
+ SendResponseEventAndPassAway(std::make_unique<TResponse>(proto), true);
+ }
+
+ void SendResponseEventAndPassAway(std::unique_ptr<TResponse> event, bool success) {
+ event->DebugInfo = std::move(DebugInfo);
+
+ RequestCounters.Common->ResponseBytes->Add(event->GetByteSize());
+ RequestCounters.IncInFly();
+ if (success) {
+ RequestCounters.IncOk();
+ } else {
+ RequestCounters.IncError();
+ }
+ RequestCounters.Common->LatencyMs->Collect(GetRequestDuration().MilliSeconds());
+
+ AsDerived()->LwProbe(success);
+
+ Send(Request->Sender, event.release(), 0, Request->Cookie);
+ this->PassAway();
+ }
+
+ TDuration GetRequestDuration() const {
+ return TInstant::Now() - StartTime;
+ }
+
+ TDerived* AsDerived() {
+ return static_cast<TDerived*>(this);
+ }
+
+protected:
+ const TInstant StartTime = TInstant::Now();
+ const typename TRequestEvent::TPtr Request;
+ TRequestCounters RequestCounters;
+ TDebugInfoPtr DebugInfo;
+};
+
+} // namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index cab6faa288..3d099b0a6f 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -325,8 +325,7 @@ TAsyncStatus ExecDbRequest(TDbPool::TPtr dbPool, std::function<NYdb::TAsyncStatu
return promise.GetFuture();
}
-std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> TYdbControlPlaneStorageActor::Read(
- NActors::TActorSystem* actorSystem,
+std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> TDbRequester::Read(
const TString& query,
const NYdb::TParams& params,
const TRequestCounters& requestCounters,
@@ -334,6 +333,7 @@ std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> TYdbControlP
TTxSettings transactionMode,
bool retryOnTli)
{
+ NActors::TActorSystem* const actorSystem = TActivationContext::ActorSystem();
auto resultSet = std::make_shared<TVector<NYdb::TResultSet>>();
std::shared_ptr<int> retryCount = std::make_shared<int>();
@@ -362,11 +362,11 @@ std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> TYdbControlP
};
TPromise<NYdb::TStatus> promise = NewPromise<NYdb::TStatus>();
- Register(new TDbRequest(DbPool, promise, handler));
+ TActivationContext::AsActorContext().Register(new TDbRequest(DbPool, promise, handler));
return {promise.GetFuture(), resultSet};
}
-TAsyncStatus TYdbControlPlaneStorageActor::Validate(
+TAsyncStatus TDbRequester::Validate(
NActors::TActorSystem* actorSystem,
std::shared_ptr<TMaybe<TTransaction>> transaction,
size_t item,
@@ -402,8 +402,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::Validate(
});
}
-TAsyncStatus TYdbControlPlaneStorageActor::Write(
- NActors::TActorSystem* actorSystem,
+TAsyncStatus TDbRequester::Write(
const TString& query,
const NYdb::TParams& params,
const TRequestCounters& requestCounters,
@@ -412,6 +411,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::Write(
TTxSettings transactionMode,
bool retryOnTli)
{
+ NActors::TActorSystem* const actorSystem = TActivationContext::ActorSystem();
std::shared_ptr<int> retryCount = std::make_shared<int>();
auto transaction = std::make_shared<TMaybe<TTransaction>>();
auto writeHandler = [=, retryOnTli=retryOnTli] (TSession session) {
@@ -462,7 +462,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::Write(
});
};
TPromise<NYdb::TStatus> promise = NewPromise<NYdb::TStatus>();
- Register(new TDbRequest(DbPool, promise, handler));
+ TActivationContext::AsActorContext().Register(new TDbRequest(DbPool, promise, handler));
return promise.GetFuture();
}
@@ -474,7 +474,7 @@ NThreading::TFuture<void> TYdbControlPlaneStorageActor::PickTask(
const TVector<TValidationQuery>& validators,
TTxSettings transactionMode)
{
- return ReadModifyWrite(NActors::TActivationContext::ActorSystem(), taskParams.ReadQuery, taskParams.ReadParams,
+ return ReadModifyWrite(taskParams.ReadQuery, taskParams.ReadParams,
taskParams.PrepareParams, requestCounters, debugInfo, validators, transactionMode, taskParams.RetryOnTli)
.Apply([=, responseTasks=responseTasks, queryId = taskParams.QueryId](const auto& future) {
const auto status = future.GetValue();
@@ -484,8 +484,7 @@ NThreading::TFuture<void> TYdbControlPlaneStorageActor::PickTask(
});
}
-TAsyncStatus TYdbControlPlaneStorageActor::ReadModifyWrite(
- NActors::TActorSystem* actorSystem,
+TAsyncStatus TDbRequester::ReadModifyWrite(
const TString& readQuery,
const NYdb::TParams& readParams,
const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>& prepare,
@@ -495,6 +494,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::ReadModifyWrite(
TTxSettings transactionMode,
bool retryOnTli)
{
+ NActors::TActorSystem* const actorSystem = TActivationContext::ActorSystem();
std::shared_ptr<int> retryCount = std::make_shared<int>();
auto resultSets = std::make_shared<TVector<NYdb::TResultSet>>();
auto transaction = std::make_shared<TMaybe<TTransaction>>();
@@ -599,7 +599,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::ReadModifyWrite(
});
};
TPromise<NYdb::TStatus> promise = NewPromise<NYdb::TStatus>();
- Register(new TDbRequest(DbPool, promise, handler));
+ TActivationContext::AsActorContext().Register(new TDbRequest(DbPool, promise, handler));
return promise.GetFuture();
}
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp
index 8f32943d7a..44a04ab846 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp
@@ -126,7 +126,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateBindi
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- TAsyncStatus result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo, validators);
+ TAsyncStatus result = Write(query.Sql, query.Params, requestCounters, debugInfo, validators);
auto prepare = [response] { return *response; };
auto success = SendAuditResponse<TEvControlPlaneStorage::TEvCreateBindingResponse, YandexQuery::CreateBindingResult, TAuditDetails<YandexQuery::Binding>>(
MakeLogPrefix(scope, user, bindingId) + "CreateBindingRequest",
@@ -233,7 +233,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListBinding
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo);
+ auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo);
auto prepare = [resultSets=resultSets, limit] {
if (resultSets->size() != 1) {
ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support";
@@ -337,7 +337,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeBin
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo);
+ auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo);
auto prepare = [=, resultSets=resultSets] {
if (resultSets->size() != 1) {
ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support";
@@ -555,7 +555,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyBindi
const auto readQuery = readQueryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto result = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo, validators);
+ auto result = ReadModifyWrite(readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo, validators);
auto prepare = [response] { return *response; };
auto success = SendAuditResponse<TEvControlPlaneStorage::TEvModifyBindingResponse, YandexQuery::ModifyBindingResult, TAuditDetails<YandexQuery::Binding>>(
MakeLogPrefix(scope, user, bindingId) + "ModifyBindingRequest",
@@ -666,7 +666,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDeleteBindi
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo, validators);
+ auto result = Write(query.Sql, query.Params, requestCounters, debugInfo, validators);
auto prepare = [response] { return *response; };
auto success = SendAuditResponse<TEvControlPlaneStorage::TEvDeleteBindingResponse, YandexQuery::DeleteBindingResult, TAuditDetails<YandexQuery::Binding>>(
MakeLogPrefix(scope, user, bindingId) + "DeleteBindingRequest",
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp
index e6915b8f4a..913b1cf513 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp
@@ -120,7 +120,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateConne
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- TAsyncStatus result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo, validators);
+ TAsyncStatus result = Write(query.Sql, query.Params, requestCounters, debugInfo, validators);
auto prepare = [response] { return *response; };
auto success = SendAuditResponse<TEvControlPlaneStorage::TEvCreateConnectionResponse, YandexQuery::CreateConnectionResult, TAuditDetails<YandexQuery::Connection>>(
MakeLogPrefix(scope, user, connectionId) + "CreateConnectionRequest",
@@ -228,7 +228,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListConnect
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo);
+ auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo);
auto prepare = [resultSets=resultSets, limit] {
if (resultSets->size() != 1) {
ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support";
@@ -322,7 +322,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeCon
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo);
+ auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo);
auto prepare = [=, resultSets=resultSets] {
if (resultSets->size() != 1) {
ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support";
@@ -526,7 +526,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyConne
const auto readQuery = readQueryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto result = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo, validators);
+ auto result = ReadModifyWrite(readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo, validators);
auto prepare = [response] { return *response; };
auto success = SendAuditResponse<TEvControlPlaneStorage::TEvModifyConnectionResponse, YandexQuery::ModifyConnectionResult, TAuditDetails<YandexQuery::Connection>>(
MakeLogPrefix(scope, user, connectionId) + "ModifyConnectionRequest",
@@ -645,7 +645,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDeleteConne
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo, validators);
+ auto result = Write(query.Sql, query.Params, requestCounters, debugInfo, validators);
auto prepare = [response] { return *response; };
auto success = SendAuditResponse<TEvControlPlaneStorage::TEvDeleteConnectionResponse, YandexQuery::DeleteConnectionResult, TAuditDetails<YandexQuery::Connection>>(
MakeLogPrefix(scope, user, connectionId) + "DeleteConnectionRequest",
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
index 01aac43de5..9a71373447 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
@@ -211,7 +211,58 @@ void InsertIdempotencyKey(TSqlQueryBuilder& builder, const TString& scope, const
void ReadIdempotencyKeyQuery(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey);
-class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbControlPlaneStorageActor> {
+class TDbRequester {
+protected:
+ explicit TDbRequester(TDbPool::TPtr pool = nullptr, TYdbConnectionPtr ydbConnection = nullptr)
+ : DbPool(std::move(pool))
+ , YdbConnection(std::move(ydbConnection))
+ {
+ }
+
+ std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> Read(
+ const TString& query,
+ const NYdb::TParams& params,
+ const TRequestCounters& requestCounters,
+ TDebugInfoPtr debugInfo,
+ TTxSettings transactionMode = TTxSettings::SerializableRW(),
+ bool retryOnTli = true);
+
+ TAsyncStatus Validate(
+ NActors::TActorSystem* actorSystem,
+ std::shared_ptr<TMaybe<TTransaction>> transaction,
+ size_t item, const TVector<TValidationQuery>& validators,
+ TSession session,
+ std::shared_ptr<bool> successFinish,
+ TDebugInfoPtr debugInfo,
+ TTxSettings transactionMode = TTxSettings::SerializableRW());
+
+ TAsyncStatus Write(
+ const TString& query,
+ const NYdb::TParams& params,
+ const TRequestCounters& requestCounters,
+ TDebugInfoPtr debugInfo,
+ const TVector<TValidationQuery>& validators = {},
+ TTxSettings transactionMode = TTxSettings::SerializableRW(),
+ bool retryTli = true);
+
+ TAsyncStatus ReadModifyWrite(
+ const TString& readQuery,
+ const NYdb::TParams& readParams,
+ const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>& prepare,
+ const TRequestCounters& requestCounters,
+ TDebugInfoPtr debugInfo = {},
+ const TVector<TValidationQuery>& validators = {},
+ TTxSettings transactionMode = TTxSettings::SerializableRW(),
+ bool retryOnTli = true);
+
+protected:
+ TDbPool::TPtr DbPool;
+ TYdbConnectionPtr YdbConnection;
+};
+
+class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbControlPlaneStorageActor>,
+ public TDbRequester
+{
enum ERequestTypeScope {
RTS_CREATE_QUERY,
RTS_LIST_QUERIES,
@@ -408,10 +459,7 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
TCounters Counters;
- TYdbConnectionPtr YdbConnection;
-
::NYq::TYqSharedResources::TPtr YqSharedResources;
- TDbPool::TPtr DbPool;
static constexpr int64_t InitialRevision = 1;
@@ -574,45 +622,6 @@ private:
*/
bool IsSuperUser(const TString& user);
- std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> Read(
- NActors::TActorSystem* actorSystem,
- const TString& query,
- const NYdb::TParams& params,
- const TRequestCounters& requestCounters,
- TDebugInfoPtr debugInfo,
- TTxSettings transactionMode = TTxSettings::SerializableRW(),
- bool retryOnTli = true);
-
- TAsyncStatus Validate(
- NActors::TActorSystem* actorSystem,
- std::shared_ptr<TMaybe<TTransaction>> transaction,
- size_t item, const TVector<TValidationQuery>& validators,
- TSession session,
- std::shared_ptr<bool> successFinish,
- TDebugInfoPtr debugInfo,
- TTxSettings transactionMode = TTxSettings::SerializableRW());
-
- TAsyncStatus Write(
- NActors::TActorSystem* actorSystem,
- const TString& query,
- const NYdb::TParams& params,
- const TRequestCounters& requestCounters,
- TDebugInfoPtr debugInfo,
- const TVector<TValidationQuery>& validators = {},
- TTxSettings transactionMode = TTxSettings::SerializableRW(),
- bool retryTli = true);
-
- TAsyncStatus ReadModifyWrite(
- NActors::TActorSystem* actorSystem,
- const TString& readQuery,
- const NYdb::TParams& readParams,
- const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>& prepare,
- const TRequestCounters& requestCounters,
- TDebugInfoPtr debugInfo = {},
- const TVector<TValidationQuery>& validators = {},
- TTxSettings transactionMode = TTxSettings::SerializableRW(),
- bool retryOnTli = true);
-
template<class ResponseEvent, class Result, class RequestEventPtr>
TFuture<bool> SendResponse(const TString& name,
NActors::TActorSystem* actorSystem,
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
index 4d3e5b5e78..25c921e44d 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
@@ -348,7 +348,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
const auto read = readQueryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- TAsyncStatus status = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), read.Sql, read.Params, prepareParams, requestCounters, debugInfo);
+ TAsyncStatus status = ReadModifyWrite(read.Sql, read.Params, prepareParams, requestCounters, debugInfo);
auto prepare = [response] { return *response; };
auto success = SendAuditResponse<TEvControlPlaneStorage::TEvCreateQueryResponse, YandexQuery::CreateQueryResult, TAuditDetails<YandexQuery::Query>>(
"CreateQueryRequest - CreateQueryResult",
@@ -484,7 +484,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListQueries
const auto read = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), read.Sql, read.Params, requestCounters, debugInfo);
+ auto [result, resultSets] = Read(read.Sql, read.Params, requestCounters, debugInfo);
auto prepare = [resultSets=resultSets, limit] {
if (resultSets->size() != 1) {
ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support";
@@ -574,7 +574,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeQue
);
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo);
+ auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo);
auto prepare = [resultSets=resultSets, user,permissions] {
if (resultSets->size() != 1) {
ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support";
@@ -698,7 +698,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetQuerySta
const auto read = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), read.Sql, read.Params, requestCounters, debugInfo);
+ auto [result, resultSets] = Read(read.Sql, read.Params, requestCounters, debugInfo);
auto prepare = [resultSets=resultSets, user,permissions] {
if (resultSets->size() != 1) {
ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support";
@@ -1108,7 +1108,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery
const auto read = readQueryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto result = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), read.Sql, read.Params, prepareParams, requestCounters, debugInfo, validators);
+ auto result = ReadModifyWrite(read.Sql, read.Params, prepareParams, requestCounters, debugInfo, validators);
auto prepare = [response] { return *response; };
auto success = SendAuditResponse<TEvControlPlaneStorage::TEvModifyQueryResponse, YandexQuery::ModifyQueryResult, TAuditDetails<YandexQuery::Query>>(
"ModifyQueryRequest - ModifyQueryResult",
@@ -1223,7 +1223,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDeleteQuery
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo, validators);
+ auto result = Write(query.Sql, query.Params, requestCounters, debugInfo, validators);
auto prepare = [response] { return *response; };
auto success = SendAuditResponse<TEvControlPlaneStorage::TEvDeleteQueryResponse, YandexQuery::DeleteQueryResult, TAuditDetails<YandexQuery::Query>>(
"DeleteQueryRequest - DeleteQueryResult",
@@ -1451,7 +1451,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvControlQuer
const auto readQuery = readQueryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto result = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo, validators);
+ auto result = ReadModifyWrite(readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo, validators);
auto prepare = [response] { return *response; };
auto success = SendAuditResponse<TEvControlPlaneStorage::TEvControlQueryResponse, YandexQuery::ControlQueryResult, TAuditDetails<YandexQuery::Query>>(
"ControlQueryRequest - ControlQueryRequest",
@@ -1527,7 +1527,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetResultDa
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo);
+ auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo);
auto prepare = [resultSets=resultSets, resultSetIndex, user, permissions] {
if (resultSets->size() != 2) {
ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 2 but equal " << resultSets->size() << ". Please contact internal support";
@@ -1685,7 +1685,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListJobsReq
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo);
+ auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo);
auto prepare = [resultSets=resultSets, limit] {
if (resultSets->size() != 1) {
ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support";
@@ -1785,7 +1785,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeJob
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto [result, resultSets] = Read(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo);
+ auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo);
auto prepare = [=, id=request.job_id(), resultSets=resultSets] {
if (resultSets->size() != 1) {