aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2022-10-06 11:59:54 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2022-10-06 11:59:54 +0300
commitf691e892568ed44ba61c85b96bfe665cb0e47a7a (patch)
tree004663abda2a4097d3ca7f50c31175ec44437738
parenta190eb72f986f7d864949720a2d6d26b723da8ad (diff)
downloadydb-f691e892568ed44ba61c85b96bfe665cb0e47a7a.tar.gz
Remove TActorContext usage in YQ code
-rw-r--r--ydb/core/yq/libs/actors/database_resolver.cpp6
-rw-r--r--ydb/core/yq/libs/actors/nodes_health_check.cpp18
-rw-r--r--ydb/core/yq/libs/actors/nodes_manager.cpp4
-rw-r--r--ydb/core/yq/libs/actors/pending_fetcher.cpp8
-rw-r--r--ydb/core/yq/libs/actors/proxy_private.cpp30
-rw-r--r--ydb/core/yq/libs/actors/result_writer.cpp20
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp34
-rw-r--r--ydb/core/yq/libs/actors/task_get.cpp22
-rw-r--r--ydb/core/yq/libs/actors/task_ping.cpp21
-rw-r--r--ydb/core/yq/libs/actors/task_result_write.cpp20
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/gc.cpp12
-rw-r--r--ydb/core/yq/libs/logs/log.cpp57
-rw-r--r--ydb/core/yq/libs/mock/yql_mock.cpp8
-rw-r--r--ydb/core/yq/libs/shared_resources/db_pool.cpp94
14 files changed, 178 insertions, 176 deletions
diff --git a/ydb/core/yq/libs/actors/database_resolver.cpp b/ydb/core/yq/libs/actors/database_resolver.cpp
index e36f8318367..53a42199b6d 100644
--- a/ydb/core/yq/libs/actors/database_resolver.cpp
+++ b/ydb/core/yq/libs/actors/database_resolver.cpp
@@ -265,7 +265,7 @@ public:
private:
STRICT_STFUNC(State, {
- HFunc(TEvents::TEvEndpointRequest, Handle);
+ hFunc(TEvents::TEvEndpointRequest, Handle);
cFunc(NActors::TEvents::TEvPoison::EventType, PassAway);
});
@@ -283,7 +283,7 @@ private:
new TEvents::TEvEndpointResponse(NYql::TDbResolverResponse{std::move(ready), success, issues}));
}
- void Handle(TEvents::TEvEndpointRequest::TPtr ev, const TActorContext& ctx)
+ void Handle(TEvents::TEvEndpointRequest::TPtr ev)
{
TraceId = ev->Get()->TraceId;
LOG_D("Start databaseId resolver for " << ev->Get()->DatabaseIds.size() << " ids");
@@ -347,7 +347,7 @@ private:
new TResponseProcessor(ev->Sender, Cache, ready, requests, TraceId, ev->Get()->MdbTransformHost, Parsers));
for (const auto& [request, _] : requests) {
- ctx.Send(new IEventHandle(HttpProxy, helper, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(request)));
+ TActivationContext::Send(new IEventHandle(HttpProxy, helper, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(request)));
}
} else {
SendResponse(ev->Sender, std::move(ready));
diff --git a/ydb/core/yq/libs/actors/nodes_health_check.cpp b/ydb/core/yq/libs/actors/nodes_health_check.cpp
index 96e7c1c1cd9..5d9e65ba97a 100644
--- a/ydb/core/yq/libs/actors/nodes_health_check.cpp
+++ b/ydb/core/yq/libs/actors/nodes_health_check.cpp
@@ -43,13 +43,13 @@ public:
static constexpr char ActorName[] = "YQ_PRIVATE_NODES_HEALTH_CHECK";
- void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev, const NActors::TActorContext& ctx) {
+ void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) {
LOG_E("TNodesHealthCheckActor::OnUndelivered");
auto res = MakeHolder<TEvents::TEvNodesHealthCheckResponse>();
res->Status = Ydb::StatusIds::GENERIC_ERROR;
res->Issues.AddIssue("UNDELIVERED");
- ctx.Send(ev->Sender, res.Release());
- Die(ctx);
+ Send(ev->Sender, res.Release());
+ PassAway();
}
void PassAway() final {
@@ -70,7 +70,7 @@ public:
PassAway();
}
- void Bootstrap(const TActorContext&) {
+ void Bootstrap() {
Become(&TNodesHealthCheckActor::StateFunc);
auto& req = Ev->Record;
Tenant = req.tenant();
@@ -83,11 +83,11 @@ private:
STRICT_STFUNC(
StateFunc,
CFunc(NActors::TEvents::TEvPoison::EventType, Die)
- HFunc(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse, HandleResponse)
- HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
+ hFunc(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse, HandleResponse)
+ hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
)
- void HandleResponse(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse::TPtr& ev, const TActorContext& ctx) {
+ void HandleResponse(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse::TPtr& ev) {
auto res = MakeHolder<TEvents::TEvNodesHealthCheckResponse>();
try {
const auto& issues = ev->Get()->Issues;
@@ -97,8 +97,8 @@ private:
res->Record.ConstructInPlace();
res->Status = Ydb::StatusIds::SUCCESS;
res->Record = ev->Get()->Record;
- ctx.Send(Sender, res.Release());
- Die(ctx);
+ Send(Sender, res.Release());
+ PassAway();
} catch (...) {
const auto msg = TStringBuilder() << "Can't do NodesHealthCheck: " << CurrentExceptionMessage();
Fail(msg);
diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp
index 9ebdd7f6f22..435ab810db7 100644
--- a/ydb/core/yq/libs/actors/nodes_manager.cpp
+++ b/ydb/core/yq/libs/actors/nodes_manager.cpp
@@ -73,11 +73,11 @@ public:
NActors::IActor::PassAway();
}
- void Bootstrap(const TActorContext&) {
+ void Bootstrap() {
Become(&TNodesManagerActor::StateFunc);
ServiceCounters.Counters->GetCounter("EvBootstrap", true)->Inc();
LOG_I("Bootstrap, InstanceId: " << InstanceId);
- ResolveSelfAddress();
+ ResolveSelfAddress();
}
private:
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp
index effba69ba5d..09ab19fa93d 100644
--- a/ydb/core/yq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp
@@ -172,10 +172,10 @@ public:
void Bootstrap() {
if (Monitoring) {
- Monitoring->RegisterActorPage(Monitoring->RegisterIndexPage("fq_diag", "Federated Query diagnostics"),
+ Monitoring->RegisterActorPage(Monitoring->RegisterIndexPage("fq_diag", "Federated Query diagnostics"),
"fetcher", "Pending Fetcher", false, TActivationContext::ActorSystem(), SelfId());
}
-
+
Become(&TPendingFetcher::StateFunc);
DatabaseResolver = Register(CreateDatabaseResolver(MakeYqlAnalyticsHttpProxyId(), CredentialsFactory));
Send(SelfId(), new NActors::TEvents::TEvWakeup());
@@ -184,7 +184,7 @@ public:
}
private:
- void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&, const NActors::TActorContext&) {
+ void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&) {
LOG_E("TYqlPendingFetcher::OnUndelivered");
HasRunningRequest = false;
@@ -408,7 +408,7 @@ private:
STRICT_STFUNC(StateFunc,
hFunc(NActors::TEvents::TEvWakeup, HandleWakeup)
- HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
+ hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
hFunc(TEvInternalService::TEvGetTaskResponse, Handle)
hFunc(NActors::TEvents::TEvPoisonTaken, HandlePoisonTaken)
hFunc(TEvPrivate::TEvCleanupCounters, HandleCleanupCounters)
diff --git a/ydb/core/yq/libs/actors/proxy_private.cpp b/ydb/core/yq/libs/actors/proxy_private.cpp
index 5b232ff8d73..38173b8335e 100644
--- a/ydb/core/yq/libs/actors/proxy_private.cpp
+++ b/ydb/core/yq/libs/actors/proxy_private.cpp
@@ -49,7 +49,7 @@ public:
NActors::IActor::PassAway();
}
- void Bootstrap(const TActorContext&) {
+ void Bootstrap() {
Become(&TYqlAnalyticsPrivateProxy::StateFunc);
Counters->GetCounter("EvBootstrap", true)->Inc();
}
@@ -70,7 +70,7 @@ private:
return issues;
}
- void Handle(TEvents::TEvPingTaskRequest::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvents::TEvPingTaskRequest::TPtr& ev) {
Counters->GetCounter("EvPingTaskRequest", true)->Inc();
NYql::TIssues issues = ValidatePermissions(ev);
@@ -84,10 +84,10 @@ private:
Register(
CreatePingTaskRequestActor(ev->Sender, TimeProvider, ev->Release(), Counters),
- NActors::TMailboxType::HTSwap, ctx.SelfID.PoolID());
+ NActors::TMailboxType::HTSwap, SelfId().PoolID());
}
- void Handle(TEvents::TEvGetTaskRequest::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvents::TEvGetTaskRequest::TPtr& ev) {
Counters->GetCounter("EvGetTaskRequest", true)->Inc();
NYql::TIssues issues = ValidatePermissions(ev);
@@ -101,10 +101,10 @@ private:
Register(
CreateGetTaskRequestActor(ev->Sender, TokenAccessorConfig, TimeProvider, ev->Release(), Counters),
- NActors::TMailboxType::HTSwap, ctx.SelfID.PoolID());
+ NActors::TMailboxType::HTSwap, SelfId().PoolID());
}
- void Handle(TEvents::TEvWriteTaskResultRequest::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvents::TEvWriteTaskResultRequest::TPtr& ev) {
Counters->GetCounter("EvWriteTaskResultRequest", true)->Inc();
NYql::TIssues issues = ValidatePermissions(ev);
@@ -118,10 +118,10 @@ private:
Register(
CreateWriteTaskResultRequestActor(ev->Sender, TimeProvider, ev->Release(), Counters),
- NActors::TMailboxType::HTSwap, ctx.SelfID.PoolID());
+ NActors::TMailboxType::HTSwap, SelfId().PoolID());
}
- void Handle(TEvents::TEvNodesHealthCheckRequest::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvents::TEvNodesHealthCheckRequest::TPtr& ev) {
Counters->GetCounter("EvNodesHealthCheckRequest", true)->Inc();
NYql::TIssues issues = ValidatePermissions(ev);
@@ -135,7 +135,7 @@ private:
Register(
CreateNodesHealthCheckActor(ev->Sender, TimeProvider, ev->Release(), Counters),
- NActors::TMailboxType::HTSwap, ctx.SelfID.PoolID());
+ NActors::TMailboxType::HTSwap, SelfId().PoolID());
}
void Handle(TEvents::TEvCreateRateLimiterResourceRequest::TPtr& ev) {
@@ -176,16 +176,16 @@ private:
STRICT_STFUNC(
StateFunc,
- HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
- HFunc(TEvents::TEvPingTaskRequest, Handle)
- HFunc(TEvents::TEvGetTaskRequest, Handle)
- HFunc(TEvents::TEvWriteTaskResultRequest, Handle)
- HFunc(TEvents::TEvNodesHealthCheckRequest, Handle)
+ hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
+ hFunc(TEvents::TEvPingTaskRequest, Handle)
+ hFunc(TEvents::TEvGetTaskRequest, Handle)
+ hFunc(TEvents::TEvWriteTaskResultRequest, Handle)
+ hFunc(TEvents::TEvNodesHealthCheckRequest, Handle)
hFunc(TEvents::TEvCreateRateLimiterResourceRequest, Handle)
hFunc(TEvents::TEvDeleteRateLimiterResourceRequest, Handle)
)
- void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&, const NActors::TActorContext&) {
+ void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&) {
LOG_E("TYqlAnalyticsPrivateProxy::OnUndelivered");
Counters->GetCounter("OnUndelivered", true)->Inc();
}
diff --git a/ydb/core/yq/libs/actors/result_writer.cpp b/ydb/core/yq/libs/actors/result_writer.cpp
index 86c2522b5e9..d76f37c39af 100644
--- a/ydb/core/yq/libs/actors/result_writer.cpp
+++ b/ydb/core/yq/libs/actors/result_writer.cpp
@@ -61,7 +61,7 @@ public:
static constexpr char ActorName[] = "YQ_RESULT_WRITER";
- void Bootstrap(const TActorContext&) {
+ void Bootstrap() {
LOG_I("Bootstrap");
Become(&TResultWriter::StateFunc);
}
@@ -69,11 +69,11 @@ public:
private:
STRICT_STFUNC(StateFunc,
cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
- HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
+ hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
- HFunc(NDq::TEvDqCompute::TEvChannelData, OnChannelData)
- HFunc(TEvReadyState, OnReadyState);
- HFunc(TEvQueryResponse, OnQueryResult);
+ hFunc(NDq::TEvDqCompute::TEvChannelData, OnChannelData)
+ hFunc(TEvReadyState, OnReadyState);
+ hFunc(TEvQueryResponse, OnQueryResult);
hFunc(NFq::TEvInternalService::TEvWriteResultResponse, HandleResponse);
)
@@ -91,7 +91,7 @@ private:
NActors::IActor::PassAway();
}
- void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&, const NActors::TActorContext& ) {
+ void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&) {
auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNAVAILABLE, TIssue("Undelivered").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR));
Send(ExecuterId, req.Release());
HasError = true;
@@ -111,7 +111,7 @@ private:
}
}
- void OnQueryResult(TEvQueryResponse::TPtr& ev, const TActorContext&) {
+ void OnQueryResult(TEvQueryResponse::TPtr& ev) {
Finished = true;
NYql::NDqProto::TQueryResponse queryResult(ev->Get()->Record);
@@ -125,7 +125,7 @@ private:
Send(ExecuterId, new TEvQueryResponse(std::move(queryResult)));
}
- void OnReadyState(TEvReadyState::TPtr&, const TActorContext&) { }
+ void OnReadyState(TEvReadyState::TPtr&) { }
void HandleResponse(NFq::TEvInternalService::TEvWriteResultResponse::TPtr& ev) {
auto statusCode = ev->Get()->Result.status_code();
@@ -311,9 +311,7 @@ private:
Cookie++;
}
- void OnChannelData(NDq::TEvDqCompute::TEvChannelData::TPtr& ev, const TActorContext& ctx) {
- Y_UNUSED(ctx);
-
+ void OnChannelData(NDq::TEvDqCompute::TEvChannelData::TPtr& ev) {
if (HasError) {
StopChannel(ev);
return;
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 70385d299e0..fcf3b2a2704 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -149,7 +149,7 @@ public:
{
}
- void Bootstrap(const TActorContext& ctx) {
+ void Bootstrap() {
TProgramFactory progFactory(false, FunctionRegistry, NextUniqueId, DataProvidersInit, "yq");
progFactory.SetModules(ModuleResolver);
progFactory.SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(FunctionRegistry, nullptr));
@@ -162,12 +162,12 @@ public:
{
if (!Program->ParseSql(SqlSettings)) {
Issues.AddIssues(Program->Issues());
- SendStatusAndDie(ctx, TProgram::TStatus::Error, "Failed to parse query");
+ SendStatusAndDie(TProgram::TStatus::Error, "Failed to parse query");
return;
}
if (ExecuteMode == YandexQuery::ExecuteMode::PARSE) {
- SendStatusAndDie(ctx, TProgram::TStatus::Ok);
+ SendStatusAndDie(TProgram::TStatus::Ok);
return;
}
}
@@ -176,12 +176,12 @@ public:
{
if (!Program->Compile("")) {
Issues.AddIssues(Program->Issues());
- SendStatusAndDie(ctx, TProgram::TStatus::Error, "Failed to compile query");
+ SendStatusAndDie(TProgram::TStatus::Error, "Failed to compile query");
return;
}
if (ExecuteMode == YandexQuery::ExecuteMode::COMPILE) {
- SendStatusAndDie(ctx, TProgram::TStatus::Ok);
+ SendStatusAndDie(TProgram::TStatus::Ok);
return;
}
}
@@ -201,7 +201,7 @@ public:
futureStatus = Program->RunAsync("");
break;
default:
- SendStatusAndDie(ctx, TProgram::TStatus::Error, TStringBuilder() << "Unexpected execute mode " << static_cast<int>(ExecuteMode));
+ SendStatusAndDie(TProgram::TStatus::Error, TStringBuilder() << "Unexpected execute mode " << static_cast<int>(ExecuteMode));
return;
}
@@ -212,7 +212,7 @@ public:
Become(&TProgramRunnerActor::StateFunc);
}
- void SendStatusAndDie(const TActorContext& ctx, NYql::TProgram::TStatus status, const TString& message = "") {
+ void SendStatusAndDie(NYql::TProgram::TStatus status, const TString& message = "") {
TString expr;
TString plan;
if (Compiled) {
@@ -224,14 +224,14 @@ public:
}
Issues.AddIssues(Program->Issues());
Send(RunActorId, new TEvPrivate::TEvProgramFinished(Issues, plan, expr, status, message));
- Die(ctx);
+ PassAway();
}
STRICT_STFUNC(StateFunc,
- HFunc(TEvents::TEvAsyncContinue, Handle);
+ hFunc(TEvents::TEvAsyncContinue, Handle);
)
- void Handle(TEvents::TEvAsyncContinue::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvents::TEvAsyncContinue::TPtr& ev) {
NYql::TProgram::TStatus status = TProgram::TStatus::Error;
const auto& f = ev->Get()->Future;
@@ -239,9 +239,7 @@ public:
status = f.GetValue();
if (status == TProgram::TStatus::Async) {
auto futureStatus = Program->ContinueAsync();
- auto actorSystem = ctx.ActorSystem();
- auto selfId = ctx.SelfID;
- futureStatus.Subscribe([actorSystem, selfId](const TProgram::TFutureStatus& f) {
+ futureStatus.Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) {
actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f));
});
return;
@@ -249,7 +247,7 @@ public:
} catch (const std::exception& err) {
Issues.AddIssue(ExceptionToIssue(err));
}
- SendStatusAndDie(ctx, status);
+ SendStatusAndDie(status);
}
private:
@@ -348,7 +346,7 @@ private:
STRICT_STFUNC(StateFunc,
hFunc(TEvPrivate::TEvProgramFinished, Handle);
- HFunc(TEvents::TEvAsyncContinue, Handle);
+ hFunc(TEvents::TEvAsyncContinue, Handle);
hFunc(NActors::TEvents::TEvUndelivered, Handle);
hFunc(TEvents::TEvGraphParams, Handle);
hFunc(TEvents::TEvDataStreamsReadRulesCreationResult, Handle);
@@ -1714,7 +1712,7 @@ private:
}
}
- void Handle(TEvents::TEvAsyncContinue::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvents::TEvAsyncContinue::TPtr& ev) {
LOG_D("Compiling finished");
NYql::TProgram::TStatus status = TProgram::TStatus::Error;
@@ -1723,9 +1721,7 @@ private:
status = f.GetValue();
if (status == TProgram::TStatus::Async) {
auto futureStatus = Program->ContinueAsync();
- auto actorSystem = ctx.ActorSystem();
- auto selfId = ctx.SelfID;
- futureStatus.Subscribe([actorSystem, selfId](const TProgram::TFutureStatus& f) {
+ futureStatus.Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) {
actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f));
});
return;
diff --git a/ydb/core/yq/libs/actors/task_get.cpp b/ydb/core/yq/libs/actors/task_get.cpp
index ce788677bd5..8cb0170afe3 100644
--- a/ydb/core/yq/libs/actors/task_get.cpp
+++ b/ydb/core/yq/libs/actors/task_get.cpp
@@ -53,13 +53,13 @@ public:
static constexpr char ActorName[] = "YQ_PRIVATE_GET_TASK";
- void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev, const NActors::TActorContext& ctx) {
+ void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) {
LOG_E("TGetTaskRequestActor::OnUndelivered");
auto response = MakeHolder<TEvents::TEvGetTaskResponse>();
response->Status = Ydb::StatusIds::GENERIC_ERROR;
response->Issues.AddIssue("UNDELIVERED");
- ctx.Send(ev->Sender, response.Release());
- Die(ctx);
+ Send(ev->Sender, response.Release());
+ PassAway();
}
void PassAway() final {
@@ -80,7 +80,7 @@ public:
PassAway();
}
- void Bootstrap(const TActorContext& ctx) {
+ void Bootstrap() {
Become(&TGetTaskRequestActor::StateFunc);
auto request = Ev->Record;
LOG_D("Request CP::GetTask with size: " << request.ByteSize() << " bytes");
@@ -88,12 +88,12 @@ public:
OwnerId = request.owner_id();
Host = request.host();
Tenant = request.tenant();
- ctx.Send(NYq::ControlPlaneStorageServiceActorId(),
+ Send(NYq::ControlPlaneStorageServiceActorId(),
new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(std::move(request)));
}
private:
- void HandleResponse(NYq::TEvControlPlaneStorage::TEvGetTaskResponse::TPtr& ev, const TActorContext& ctx) { // YQ
+ void HandleResponse(NYq::TEvControlPlaneStorage::TEvGetTaskResponse::TPtr& ev) { // YQ
LOG_D("Got CP::GetTask Response");
const auto& issues = ev->Get()->Issues;
@@ -120,8 +120,8 @@ private:
account.set_signature(signature);
}
}
- ctx.Send(Sender, response.Release());
- Die(ctx);
+ Send(Sender, response.Release());
+ PassAway();
} catch (...) {
const auto msg = TStringBuilder() << "Can't do GetTask: " << CurrentExceptionMessage();
Fail(msg);
@@ -131,9 +131,9 @@ private:
private:
STRICT_STFUNC(
StateFunc,
- CFunc(NActors::TEvents::TEvPoison::EventType, Die)
- HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
- HFunc(NYq::TEvControlPlaneStorage::TEvGetTaskResponse, HandleResponse)
+ cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
+ hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
+ hFunc(NYq::TEvControlPlaneStorage::TEvGetTaskResponse, HandleResponse)
)
const NConfig::TTokenAccessorConfig TokenAccessorConfig;
diff --git a/ydb/core/yq/libs/actors/task_ping.cpp b/ydb/core/yq/libs/actors/task_ping.cpp
index 5647011cbbd..d7b9cf1a7ff 100644
--- a/ydb/core/yq/libs/actors/task_ping.cpp
+++ b/ydb/core/yq/libs/actors/task_ping.cpp
@@ -45,13 +45,13 @@ public:
static constexpr char ActorName[] = "YQ_PRIVATE_PING_TASK";
- void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev, const NActors::TActorContext& ctx) {
+ void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) {
LOG_E("TTaskPingRequestActor::OnUndelivered");
auto res = MakeHolder<TEvents::TEvPingTaskResponse>();
res->Status = Ydb::StatusIds::GENERIC_ERROR;
res->Issues.AddIssue("UNDELIVERED");
- ctx.Send(ev->Sender, res.Release());
- Die(ctx);
+ Send(ev->Sender, res.Release());
+ PassAway();
}
void PassAway() final {
@@ -72,8 +72,7 @@ public:
PassAway();
}
- void Bootstrap(const TActorContext& ctx) {
- Y_UNUSED(ctx);
+ void Bootstrap() {
Become(&TTaskPingRequestActor::StateFunc);
const auto& req = Ev->Record;
OperationId = req.query_id().value();
@@ -95,9 +94,9 @@ public:
private:
STRICT_STFUNC(
StateFunc,
- CFunc(NActors::TEvents::TEvPoison::EventType, Die)
- HFunc(NYq::TEvControlPlaneStorage::TEvPingTaskResponse, HandleResponse)
- HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
+ cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
+ hFunc(NYq::TEvControlPlaneStorage::TEvPingTaskResponse, HandleResponse)
+ hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
)
std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> CreateControlPlaneEvent() {
@@ -105,7 +104,7 @@ private:
return std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request));
}
- void HandleResponse(NYq::TEvControlPlaneStorage::TEvPingTaskResponse::TPtr& ev, const TActorContext& ctx) {
+ void HandleResponse(NYq::TEvControlPlaneStorage::TEvPingTaskResponse::TPtr& ev) {
LOG_D("Got CP::PingTaskResponse");
const auto& issues = ev->Get()->Issues;
if (issues) {
@@ -117,8 +116,8 @@ private:
auto response = MakeHolder<TEvents::TEvPingTaskResponse>();
response->Status = Ydb::StatusIds::SUCCESS;
response->Record.ConstructInPlace(ev->Get()->Record);
- ctx.Send(Sender, response.Release());
- Die(ctx);
+ Send(Sender, response.Release());
+ PassAway();
}
private:
diff --git a/ydb/core/yq/libs/actors/task_result_write.cpp b/ydb/core/yq/libs/actors/task_result_write.cpp
index dafa7f70d8f..5433de62bde 100644
--- a/ydb/core/yq/libs/actors/task_result_write.cpp
+++ b/ydb/core/yq/libs/actors/task_result_write.cpp
@@ -46,12 +46,12 @@ public:
static constexpr char ActorName[] = "YQ_PRIVATE_WRITE_RESULT_TASK";
- void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev, const NActors::TActorContext& ctx) {
+ void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) {
LOG_E("TWriteTaskRequestActor::OnUndelivered");
Res->Status = Ydb::StatusIds::GENERIC_ERROR;
Res->Issues.AddIssue("UNDELIVERED");
- ctx.Send(ev->Sender, Res.Release());
- Die(ctx);
+ Send(ev->Sender, Res.Release());
+ PassAway();
}
void PassAway() final {
@@ -71,7 +71,7 @@ public:
PassAway();
}
- void Bootstrap(const TActorContext&) {
+ void Bootstrap() {
Become(&TWriteTaskRequestActor::StateFunc);
auto request = Ev->Record;
ResultId = request.result_id().value();
@@ -85,12 +85,12 @@ public:
private:
STRICT_STFUNC(
StateFunc,
- CFunc(NActors::TEvents::TEvPoison::EventType, Die)
- HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
- HFunc(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse, HandleResponse);
+ cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
+ hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
+ hFunc(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse, HandleResponse);
)
- void HandleResponse(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse::TPtr& ev, const TActorContext& ctx) {
+ void HandleResponse(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse::TPtr& ev) {
LOG_D("Got CP::WriteTaskResult Response");
const auto& issues = ev->Get()->Issues;
if (issues) {
@@ -102,8 +102,8 @@ private:
Res->Record->set_request_id(RequestId);
Res->Issues.AddIssues(Issues);
Res->Status = Ydb::StatusIds::SUCCESS;
- ctx.Send(Sender, Res.Release());
- Die(ctx);
+ Send(Sender, Res.Release());
+ PassAway();
}
const TActorId Sender;
diff --git a/ydb/core/yq/libs/checkpoint_storage/gc.cpp b/ydb/core/yq/libs/checkpoint_storage/gc.cpp
index 2a0e73ecb4f..7d136cf15c1 100644
--- a/ydb/core/yq/libs/checkpoint_storage/gc.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/gc.cpp
@@ -58,16 +58,16 @@ class TActorGC : public TActorBootstrapped<TActorGC> {
public:
TActorGC(const TCheckpointStoragePtr& checkpointStorage, const TStateStoragePtr& stateStorage);
- void Bootstrap(const TActorContext& ctx);
+ void Bootstrap();
static constexpr char ActorName[] = "YQ_GC_ACTOR";
private:
STRICT_STFUNC(StateFunc,
- HFunc(TEvCheckpointStorage::TEvNewCheckpointSucceeded, Handle);
+ hFunc(TEvCheckpointStorage::TEvNewCheckpointSucceeded, Handle);
)
- void Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev, const NActors::TActorContext& ctx);
+ void Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev);
};
TActorGC::TActorGC(const TCheckpointStoragePtr& checkpointStorage, const TStateStoragePtr& stateStorage)
@@ -76,14 +76,14 @@ TActorGC::TActorGC(const TCheckpointStoragePtr& checkpointStorage, const TStateS
{
}
-void TActorGC::Bootstrap(const TActorContext&)
+void TActorGC::Bootstrap()
{
Become(&TActorGC::StateFunc);
LOG_STREAMS_STORAGE_SERVICE_INFO("Successfully bootstrapped storage GC " << SelfId());
}
-void TActorGC::Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev, const NActors::TActorContext& ctx)
+void TActorGC::Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev)
{
const auto* event = ev->Get();
const auto& graphId = event->CoordinatorId.GraphId;
@@ -98,7 +98,7 @@ void TActorGC::Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev,
// 3. Delete marked checkpoints
auto context = MakeIntrusive<TContext>(
- ctx.ActorSystem(),
+ TActivationContext::ActorSystem(),
CheckpointStorage,
StateStorage,
graphId,
diff --git a/ydb/core/yq/libs/logs/log.cpp b/ydb/core/yq/libs/logs/log.cpp
index 91c4af73ca9..546c895c1f4 100644
--- a/ydb/core/yq/libs/logs/log.cpp
+++ b/ydb/core/yq/libs/logs/log.cpp
@@ -7,6 +7,15 @@
#include <library/cpp/actors/core/log.h>
+#define LOG_C(stream) \
+ LOG_CRIT_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQL_PROXY, stream)
+
+#define LOG_E(stream) \
+ LOG_ERROR_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQL_PROXY, stream)
+
+#define LOG_D(stream) \
+ LOG_DEBUG_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQL_PROXY, stream)
+
namespace NKikimr {
using namespace NActors;
@@ -20,12 +29,12 @@ public:
, LogConfig(logConfig)
{ }
- void Bootstrap(const TActorContext& ctx) {
- UpdateYqlLogLevels(ctx);
+ void Bootstrap() {
+ UpdateYqlLogLevels();
// Subscribe for Logger config changes
ui32 logConfigKind = (ui32)NKikimrConsole::TConfigItem::LogConfigItem;
- ctx.Send(NConsole::MakeConfigsDispatcherID(ctx.SelfID.NodeId()),
+ Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()),
new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest(
{logConfigKind}),
IEventHandle::FlagTrackDelivery);
@@ -34,68 +43,64 @@ public:
}
private:
- STFUNC(MainState) {
+ STATEFN(MainState) {
switch (ev->GetTypeRewrite()) {
- HFunc(TEvents::TEvUndelivered, Handle);
- HFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle);
- HFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle);
+ hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle);
default:
Y_FAIL("TYqlLogsUpdater: unexpected event type: %" PRIx32 " event: %s",
ev->GetTypeRewrite(), ev->HasEvent() ? ev->GetBase()->ToString().data() : "serialized?");
}
}
- void Handle(TEvents::TEvUndelivered::TPtr &ev, const TActorContext &ctx) {
+ void Handle(TEvents::TEvUndelivered::TPtr& ev) {
switch (ev->Get()->SourceType) {
case NConsole::TEvConfigsDispatcher::EvSetConfigSubscriptionRequest:
- LOG_CRIT_S(ctx, NKikimrServices::YQL_PROXY,
- "Failed to deliver subscription request to config dispatcher.");
+ LOG_C("Failed to deliver subscription request to config dispatcher.");
break;
case NConsole::TEvConsole::EvConfigNotificationResponse:
- LOG_ERROR_S(ctx, NKikimrServices::YQL_PROXY, "Failed to deliver config notification response.");
+ LOG_E("Failed to deliver config notification response.");
break;
default:
- LOG_ERROR_S(ctx, NKikimrServices::YQL_PROXY,
- "Undelivered event with unexpected source type: " << ev->Get()->SourceType);
+ LOG_E("Undelivered event with unexpected source type: " << ev->Get()->SourceType);
break;
}
}
- void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr &ev, const TActorContext &ctx) {
+ void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr& ev) {
Y_UNUSED(ev);
- LOG_DEBUG_S(ctx, NKikimrServices::YQL_PROXY, "Subscribed for config changes.");
+ LOG_D("Subscribed for config changes.");
}
- void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr &ev, const TActorContext &ctx) {
- auto &event = ev->Get()->Record;
+ void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) {
+ auto& event = ev->Get()->Record;
- LOG_DEBUG_S(ctx, NKikimrServices::YQL_PROXY, "Updated table service config.");
+ LOG_D("Updated table service config.");
LogConfig.Swap(event.MutableConfig()->MutableLogConfig());
- UpdateYqlLogLevels(ctx);
+ UpdateYqlLogLevels();
}
- void UpdateYqlLogLevels(const TActorContext& ctx) {
+ void UpdateYqlLogLevels() {
const auto& kqpYqlName = NKikimrServices::EServiceKikimr_Name(NKikimrServices::KQP_YQL);
- for (auto &entry : LogConfig.GetEntry()) {
+ for (auto& entry : LogConfig.GetEntry()) {
if (entry.GetComponent() == kqpYqlName && entry.HasLevel()) {
auto yqlPriority = static_cast<NActors::NLog::EPriority>(entry.GetLevel());
NYql::NDq::SetYqlLogLevels(yqlPriority);
- LOG_DEBUG_S(ctx, NKikimrServices::YQL_PROXY, "Updated YQL logs priority: "
- << (ui32)yqlPriority);
+ LOG_D("Updated YQL logs priority: " << (ui32)yqlPriority);
return;
}
}
// Set log level based on current logger settings
- ui8 currentLevel = ctx.LoggerSettings()->GetComponentSettings(NKikimrServices::KQP_YQL).Raw.X.Level;
+ ui8 currentLevel = TActivationContext::AsActorContext().LoggerSettings()->GetComponentSettings(NKikimrServices::KQP_YQL).Raw.X.Level;
auto yqlPriority = static_cast<NActors::NLog::EPriority>(currentLevel);
- LOG_DEBUG_S(ctx, NKikimrServices::YQL_PROXY, "Updated YQL logs priority to current level: "
- << (ui32)yqlPriority);
+ LOG_D("Updated YQL logs priority to current level: " << (ui32)yqlPriority);
NYql::NDq::SetYqlLogLevels(yqlPriority);
}
diff --git a/ydb/core/yq/libs/mock/yql_mock.cpp b/ydb/core/yq/libs/mock/yql_mock.cpp
index 8fe694f35b3..ba17d98101d 100644
--- a/ydb/core/yq/libs/mock/yql_mock.cpp
+++ b/ydb/core/yq/libs/mock/yql_mock.cpp
@@ -40,13 +40,11 @@ public:
private:
STRICT_STFUNC(Handler, {
- HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
- });
+ hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
+ });
- void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event,
- const NActors::TActorContext& ctx)
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event)
{
- Y_UNUSED(ctx);
auto request = event->Get()->Request;
auto parameters = NHttp::TUrlParameters(request->URL);
TString databaseId = parameters["databaseId"];
diff --git a/ydb/core/yq/libs/shared_resources/db_pool.cpp b/ydb/core/yq/libs/shared_resources/db_pool.cpp
index 0fa75f141e0..9af52e67e19 100644
--- a/ydb/core/yq/libs/shared_resources/db_pool.cpp
+++ b/ydb/core/yq/libs/shared_resources/db_pool.cpp
@@ -11,15 +11,25 @@
#include <util/stream/file.h>
#include <util/string/strip.h>
-#define LOG_F(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, EMERG, STREAMS, logRecordStream)
-#define LOG_A(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, ALERT, STREAMS, logRecordStream)
-#define LOG_C(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, CRIT, STREAMS, logRecordStream)
-#define LOG_E(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, ERROR, STREAMS, logRecordStream)
-#define LOG_W(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, WARN, STREAMS, logRecordStream)
-#define LOG_N(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, NOTICE, STREAMS, logRecordStream)
-#define LOG_I(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, INFO, STREAMS, logRecordStream)
-#define LOG_D(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, DEBUG, STREAMS, logRecordStream)
-#define LOG_T(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, TRACE, STREAMS, logRecordStream)
+#define LOG_F_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, EMERG, STREAMS, logRecordStream)
+#define LOG_A_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, ALERT, STREAMS, logRecordStream)
+#define LOG_C_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, CRIT, STREAMS, logRecordStream)
+#define LOG_E_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, ERROR, STREAMS, logRecordStream)
+#define LOG_W_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, WARN, STREAMS, logRecordStream)
+#define LOG_N_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, NOTICE, STREAMS, logRecordStream)
+#define LOG_I_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, INFO, STREAMS, logRecordStream)
+#define LOG_D_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, DEBUG, STREAMS, logRecordStream)
+#define LOG_T_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, TRACE, STREAMS, logRecordStream)
+
+#define LOG_F(logRecordStream) LOG_F_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream)
+#define LOG_A(logRecordStream) LOG_A_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream)
+#define LOG_C(logRecordStream) LOG_C_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream)
+#define LOG_E(logRecordStream) LOG_E_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream)
+#define LOG_W(logRecordStream) LOG_W_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream)
+#define LOG_N(logRecordStream) LOG_N_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream)
+#define LOG_I(logRecordStream) LOG_I_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream)
+#define LOG_D(logRecordStream) LOG_D_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream)
+#define LOG_T(logRecordStream) LOG_T_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream)
namespace NYq {
@@ -41,30 +51,30 @@ public:
static constexpr char ActorName[] = "YQ_DB_POOL";
STRICT_STFUNC(WorkingState,
- CFunc(NActors::TEvents::TEvPoison::EventType, Die)
- HFunc(TEvents::TEvDbRequest, HandleRequest)
- HFunc(TEvents::TEvDbResponse, HandleResponse)
- HFunc(TEvents::TEvDbFunctionRequest, HandleRequest)
- HFunc(TEvents::TEvDbFunctionResponse, HandleResponse)
+ cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
+ hFunc(TEvents::TEvDbRequest, HandleRequest)
+ hFunc(TEvents::TEvDbResponse, HandleResponse)
+ hFunc(TEvents::TEvDbFunctionRequest, HandleRequest)
+ hFunc(TEvents::TEvDbFunctionResponse, HandleResponse)
)
- void Die(const TActorContext& ctx) override {
+ void PassAway() override {
NYql::TIssues issues;
issues.AddIssue("DB connection closed");
auto cancelled = NYdb::TStatus(NYdb::EStatus::CANCELLED, std::move(issues));
for (const auto& x : Requests) {
if (auto pRequest = std::get_if<TRequest>(&x)) {
- ctx.Send(pRequest->Sender, new TEvents::TEvDbResponse(cancelled, {}));
+ Send(pRequest->Sender, new TEvents::TEvDbResponse(cancelled, {}));
} else if (auto pRequest = std::get_if<TFunctionRequest>(&x)) {
- ctx.Send(pRequest->Sender, new TEvents::TEvDbFunctionResponse(cancelled));
+ Send(pRequest->Sender, new TEvents::TEvDbFunctionResponse(cancelled));
}
}
State.reset();
- IActor::Die(ctx);
+ IActor::PassAway();
}
- void ProcessQueue(const TActorContext& ctx) {
+ void ProcessQueue() {
QueueSize->Collect(Requests.size());
if (Requests.empty() || RequestInProgress) {
return;
@@ -75,12 +85,10 @@ public:
RequestInProgressTimestamp = TInstant::Now();
const auto& requestVariant = Requests.front();
- LOG_T(ctx, "TDbPoolActor: ProcessQueue " << SelfId() << " Queue size = " << Requests.size());
+ LOG_T("TDbPoolActor: ProcessQueue " << SelfId() << " Queue size = " << Requests.size());
if (auto pRequest = std::get_if<TRequest>(&requestVariant)) {
auto& request = *pRequest;
- auto actorSystem = ctx.ActorSystem();
- auto selfId = ctx.SelfID;
auto cookie = request.Cookie;
auto sharedResult = std::make_shared<TVector<NYdb::TResultSet>>();
NYdb::NTable::TRetryOperationSettings settings;
@@ -94,65 +102,63 @@ public:
return future;
});
}, settings)
- .Subscribe([state = std::weak_ptr<int>(State), sharedResult, actorSystem, cookie, selfId](const NThreading::TFuture<NYdb::TStatus>& statusFuture) {
+ .Subscribe([state = std::weak_ptr<int>(State), sharedResult, actorSystem = TActivationContext::ActorSystem(), cookie, selfId = SelfId()](const NThreading::TFuture<NYdb::TStatus>& statusFuture) {
if (state.lock()) {
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvents::TEvDbResponse(statusFuture.GetValue(), *sharedResult), 0, cookie));
} else {
- LOG_T(*actorSystem, "TDbPoolActor: ProcessQueue " << selfId << " State destroyed");
+ LOG_T_AS(actorSystem, "TDbPoolActor: ProcessQueue " << selfId << " State destroyed");
}
});
} else if (auto pRequest = std::get_if<TFunctionRequest>(&requestVariant)) {
auto& request = *pRequest;
- auto selfId = ctx.SelfID;
auto cookie = request.Cookie;
- auto actorSystem = ctx.ActorSystem();
TableClient.RetryOperation([request](NYdb::NTable::TSession session) {
return request.Handler(session);
})
- .Subscribe([state = std::weak_ptr<int>(State), actorSystem, selfId, cookie](const NThreading::TFuture<NYdb::TStatus>& statusFuture) {
+ .Subscribe([state = std::weak_ptr<int>(State), actorSystem = TActivationContext::ActorSystem(), selfId = SelfId(), cookie](const NThreading::TFuture<NYdb::TStatus>& statusFuture) {
if (state.lock()) {
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvents::TEvDbFunctionResponse(statusFuture.GetValue()), 0, cookie));
} else {
- LOG_T(*actorSystem, "TDbPoolActor: ProcessQueue " << selfId << " State destroyed");
+ LOG_T_AS(actorSystem, "TDbPoolActor: ProcessQueue " << selfId << " State destroyed");
}
});
}
}
- void HandleRequest(TEvents::TEvDbRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_D(ctx, "TDbPoolActor: TEvDbRequest " << SelfId() << " Queue size = " << Requests.size());
+ void HandleRequest(TEvents::TEvDbRequest::TPtr& ev) {
+ LOG_D("TDbPoolActor: TEvDbRequest " << SelfId() << " Queue size = " << Requests.size());
auto request = ev->Get();
Requests.emplace_back(TRequest{ev->Sender, ev->Cookie, request->Sql, std::move(request->Params), request->Idempotent});
- ProcessQueue(ctx);
+ ProcessQueue();
}
- void PopFromQueueAndProcess(const TActorContext& ctx) {
+ void PopFromQueueAndProcess() {
RequestInProgress = false;
RequestsTime->Collect(TInstant::Now().MilliSeconds() - RequestInProgressTimestamp.MilliSeconds());
Requests.pop_front();
TotalInFlight->Dec();
- ProcessQueue(ctx);
+ ProcessQueue();
}
- void HandleResponse(TEvents::TEvDbResponse::TPtr& ev, const TActorContext& ctx) {
- LOG_T(ctx, "TDbPoolActor: TEvDbResponse " << SelfId() << " Queue size = " << Requests.size());
+ void HandleResponse(TEvents::TEvDbResponse::TPtr& ev) {
+ LOG_T("TDbPoolActor: TEvDbResponse " << SelfId() << " Queue size = " << Requests.size());
const auto& request = Requests.front();
- ctx.Send(ev->Forward(std::visit([](const auto& arg) { return arg.Sender; }, request)));
- PopFromQueueAndProcess(ctx);
+ TActivationContext::Send(ev->Forward(std::visit([](const auto& arg) { return arg.Sender; }, request)));
+ PopFromQueueAndProcess();
}
- void HandleRequest(TEvents::TEvDbFunctionRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_T(ctx, "TDbPoolActor: TEvDbFunctionRequest " << SelfId() << " Queue size = " << Requests.size());
+ void HandleRequest(TEvents::TEvDbFunctionRequest::TPtr& ev) {
+ LOG_T("TDbPoolActor: TEvDbFunctionRequest " << SelfId() << " Queue size = " << Requests.size());
auto request = ev->Get();
Requests.emplace_back(TFunctionRequest{ev->Sender, ev->Cookie, std::move(request->Handler)});
- ProcessQueue(ctx);
+ ProcessQueue();
}
- void HandleResponse(TEvents::TEvDbFunctionResponse::TPtr& ev, const TActorContext& ctx) {
- LOG_T(ctx, "TDbPoolActor: TEvDbFunctionResponse " << SelfId() << " Queue size = " << Requests.size());
+ void HandleResponse(TEvents::TEvDbFunctionResponse::TPtr& ev) {
+ LOG_T("TDbPoolActor: TEvDbFunctionResponse " << SelfId() << " Queue size = " << Requests.size());
const auto& request = Requests.front();
- ctx.Send(ev->Forward(std::visit([](const auto& arg) { return arg.Sender; }, request)));
- PopFromQueueAndProcess(ctx);
+ TActivationContext::Send(ev->Forward(std::visit([](const auto& arg) { return arg.Sender; }, request)));
+ PopFromQueueAndProcess();
}
private: