diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2022-10-06 11:59:54 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2022-10-06 11:59:54 +0300 |
commit | f691e892568ed44ba61c85b96bfe665cb0e47a7a (patch) | |
tree | 004663abda2a4097d3ca7f50c31175ec44437738 | |
parent | a190eb72f986f7d864949720a2d6d26b723da8ad (diff) | |
download | ydb-f691e892568ed44ba61c85b96bfe665cb0e47a7a.tar.gz |
Remove TActorContext usage in YQ code
-rw-r--r-- | ydb/core/yq/libs/actors/database_resolver.cpp | 6 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/nodes_health_check.cpp | 18 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/nodes_manager.cpp | 4 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/pending_fetcher.cpp | 8 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/proxy_private.cpp | 30 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/result_writer.cpp | 20 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/run_actor.cpp | 34 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/task_get.cpp | 22 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/task_ping.cpp | 21 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/task_result_write.cpp | 20 | ||||
-rw-r--r-- | ydb/core/yq/libs/checkpoint_storage/gc.cpp | 12 | ||||
-rw-r--r-- | ydb/core/yq/libs/logs/log.cpp | 57 | ||||
-rw-r--r-- | ydb/core/yq/libs/mock/yql_mock.cpp | 8 | ||||
-rw-r--r-- | ydb/core/yq/libs/shared_resources/db_pool.cpp | 94 |
14 files changed, 178 insertions, 176 deletions
diff --git a/ydb/core/yq/libs/actors/database_resolver.cpp b/ydb/core/yq/libs/actors/database_resolver.cpp index e36f8318367..53a42199b6d 100644 --- a/ydb/core/yq/libs/actors/database_resolver.cpp +++ b/ydb/core/yq/libs/actors/database_resolver.cpp @@ -265,7 +265,7 @@ public: private: STRICT_STFUNC(State, { - HFunc(TEvents::TEvEndpointRequest, Handle); + hFunc(TEvents::TEvEndpointRequest, Handle); cFunc(NActors::TEvents::TEvPoison::EventType, PassAway); }); @@ -283,7 +283,7 @@ private: new TEvents::TEvEndpointResponse(NYql::TDbResolverResponse{std::move(ready), success, issues})); } - void Handle(TEvents::TEvEndpointRequest::TPtr ev, const TActorContext& ctx) + void Handle(TEvents::TEvEndpointRequest::TPtr ev) { TraceId = ev->Get()->TraceId; LOG_D("Start databaseId resolver for " << ev->Get()->DatabaseIds.size() << " ids"); @@ -347,7 +347,7 @@ private: new TResponseProcessor(ev->Sender, Cache, ready, requests, TraceId, ev->Get()->MdbTransformHost, Parsers)); for (const auto& [request, _] : requests) { - ctx.Send(new IEventHandle(HttpProxy, helper, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(request))); + TActivationContext::Send(new IEventHandle(HttpProxy, helper, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(request))); } } else { SendResponse(ev->Sender, std::move(ready)); diff --git a/ydb/core/yq/libs/actors/nodes_health_check.cpp b/ydb/core/yq/libs/actors/nodes_health_check.cpp index 96e7c1c1cd9..5d9e65ba97a 100644 --- a/ydb/core/yq/libs/actors/nodes_health_check.cpp +++ b/ydb/core/yq/libs/actors/nodes_health_check.cpp @@ -43,13 +43,13 @@ public: static constexpr char ActorName[] = "YQ_PRIVATE_NODES_HEALTH_CHECK"; - void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev, const NActors::TActorContext& ctx) { + void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) { LOG_E("TNodesHealthCheckActor::OnUndelivered"); auto res = MakeHolder<TEvents::TEvNodesHealthCheckResponse>(); res->Status = Ydb::StatusIds::GENERIC_ERROR; res->Issues.AddIssue("UNDELIVERED"); - ctx.Send(ev->Sender, res.Release()); - Die(ctx); + Send(ev->Sender, res.Release()); + PassAway(); } void PassAway() final { @@ -70,7 +70,7 @@ public: PassAway(); } - void Bootstrap(const TActorContext&) { + void Bootstrap() { Become(&TNodesHealthCheckActor::StateFunc); auto& req = Ev->Record; Tenant = req.tenant(); @@ -83,11 +83,11 @@ private: STRICT_STFUNC( StateFunc, CFunc(NActors::TEvents::TEvPoison::EventType, Die) - HFunc(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse, HandleResponse) - HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) + hFunc(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse, HandleResponse) + hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) ) - void HandleResponse(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse::TPtr& ev, const TActorContext& ctx) { + void HandleResponse(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse::TPtr& ev) { auto res = MakeHolder<TEvents::TEvNodesHealthCheckResponse>(); try { const auto& issues = ev->Get()->Issues; @@ -97,8 +97,8 @@ private: res->Record.ConstructInPlace(); res->Status = Ydb::StatusIds::SUCCESS; res->Record = ev->Get()->Record; - ctx.Send(Sender, res.Release()); - Die(ctx); + Send(Sender, res.Release()); + PassAway(); } catch (...) { const auto msg = TStringBuilder() << "Can't do NodesHealthCheck: " << CurrentExceptionMessage(); Fail(msg); diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp index 9ebdd7f6f22..435ab810db7 100644 --- a/ydb/core/yq/libs/actors/nodes_manager.cpp +++ b/ydb/core/yq/libs/actors/nodes_manager.cpp @@ -73,11 +73,11 @@ public: NActors::IActor::PassAway(); } - void Bootstrap(const TActorContext&) { + void Bootstrap() { Become(&TNodesManagerActor::StateFunc); ServiceCounters.Counters->GetCounter("EvBootstrap", true)->Inc(); LOG_I("Bootstrap, InstanceId: " << InstanceId); - ResolveSelfAddress(); + ResolveSelfAddress(); } private: diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp index effba69ba5d..09ab19fa93d 100644 --- a/ydb/core/yq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp @@ -172,10 +172,10 @@ public: void Bootstrap() { if (Monitoring) { - Monitoring->RegisterActorPage(Monitoring->RegisterIndexPage("fq_diag", "Federated Query diagnostics"), + Monitoring->RegisterActorPage(Monitoring->RegisterIndexPage("fq_diag", "Federated Query diagnostics"), "fetcher", "Pending Fetcher", false, TActivationContext::ActorSystem(), SelfId()); } - + Become(&TPendingFetcher::StateFunc); DatabaseResolver = Register(CreateDatabaseResolver(MakeYqlAnalyticsHttpProxyId(), CredentialsFactory)); Send(SelfId(), new NActors::TEvents::TEvWakeup()); @@ -184,7 +184,7 @@ public: } private: - void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&, const NActors::TActorContext&) { + void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&) { LOG_E("TYqlPendingFetcher::OnUndelivered"); HasRunningRequest = false; @@ -408,7 +408,7 @@ private: STRICT_STFUNC(StateFunc, hFunc(NActors::TEvents::TEvWakeup, HandleWakeup) - HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) + hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) hFunc(TEvInternalService::TEvGetTaskResponse, Handle) hFunc(NActors::TEvents::TEvPoisonTaken, HandlePoisonTaken) hFunc(TEvPrivate::TEvCleanupCounters, HandleCleanupCounters) diff --git a/ydb/core/yq/libs/actors/proxy_private.cpp b/ydb/core/yq/libs/actors/proxy_private.cpp index 5b232ff8d73..38173b8335e 100644 --- a/ydb/core/yq/libs/actors/proxy_private.cpp +++ b/ydb/core/yq/libs/actors/proxy_private.cpp @@ -49,7 +49,7 @@ public: NActors::IActor::PassAway(); } - void Bootstrap(const TActorContext&) { + void Bootstrap() { Become(&TYqlAnalyticsPrivateProxy::StateFunc); Counters->GetCounter("EvBootstrap", true)->Inc(); } @@ -70,7 +70,7 @@ private: return issues; } - void Handle(TEvents::TEvPingTaskRequest::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvents::TEvPingTaskRequest::TPtr& ev) { Counters->GetCounter("EvPingTaskRequest", true)->Inc(); NYql::TIssues issues = ValidatePermissions(ev); @@ -84,10 +84,10 @@ private: Register( CreatePingTaskRequestActor(ev->Sender, TimeProvider, ev->Release(), Counters), - NActors::TMailboxType::HTSwap, ctx.SelfID.PoolID()); + NActors::TMailboxType::HTSwap, SelfId().PoolID()); } - void Handle(TEvents::TEvGetTaskRequest::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvents::TEvGetTaskRequest::TPtr& ev) { Counters->GetCounter("EvGetTaskRequest", true)->Inc(); NYql::TIssues issues = ValidatePermissions(ev); @@ -101,10 +101,10 @@ private: Register( CreateGetTaskRequestActor(ev->Sender, TokenAccessorConfig, TimeProvider, ev->Release(), Counters), - NActors::TMailboxType::HTSwap, ctx.SelfID.PoolID()); + NActors::TMailboxType::HTSwap, SelfId().PoolID()); } - void Handle(TEvents::TEvWriteTaskResultRequest::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvents::TEvWriteTaskResultRequest::TPtr& ev) { Counters->GetCounter("EvWriteTaskResultRequest", true)->Inc(); NYql::TIssues issues = ValidatePermissions(ev); @@ -118,10 +118,10 @@ private: Register( CreateWriteTaskResultRequestActor(ev->Sender, TimeProvider, ev->Release(), Counters), - NActors::TMailboxType::HTSwap, ctx.SelfID.PoolID()); + NActors::TMailboxType::HTSwap, SelfId().PoolID()); } - void Handle(TEvents::TEvNodesHealthCheckRequest::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvents::TEvNodesHealthCheckRequest::TPtr& ev) { Counters->GetCounter("EvNodesHealthCheckRequest", true)->Inc(); NYql::TIssues issues = ValidatePermissions(ev); @@ -135,7 +135,7 @@ private: Register( CreateNodesHealthCheckActor(ev->Sender, TimeProvider, ev->Release(), Counters), - NActors::TMailboxType::HTSwap, ctx.SelfID.PoolID()); + NActors::TMailboxType::HTSwap, SelfId().PoolID()); } void Handle(TEvents::TEvCreateRateLimiterResourceRequest::TPtr& ev) { @@ -176,16 +176,16 @@ private: STRICT_STFUNC( StateFunc, - HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) - HFunc(TEvents::TEvPingTaskRequest, Handle) - HFunc(TEvents::TEvGetTaskRequest, Handle) - HFunc(TEvents::TEvWriteTaskResultRequest, Handle) - HFunc(TEvents::TEvNodesHealthCheckRequest, Handle) + hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) + hFunc(TEvents::TEvPingTaskRequest, Handle) + hFunc(TEvents::TEvGetTaskRequest, Handle) + hFunc(TEvents::TEvWriteTaskResultRequest, Handle) + hFunc(TEvents::TEvNodesHealthCheckRequest, Handle) hFunc(TEvents::TEvCreateRateLimiterResourceRequest, Handle) hFunc(TEvents::TEvDeleteRateLimiterResourceRequest, Handle) ) - void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&, const NActors::TActorContext&) { + void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&) { LOG_E("TYqlAnalyticsPrivateProxy::OnUndelivered"); Counters->GetCounter("OnUndelivered", true)->Inc(); } diff --git a/ydb/core/yq/libs/actors/result_writer.cpp b/ydb/core/yq/libs/actors/result_writer.cpp index 86c2522b5e9..d76f37c39af 100644 --- a/ydb/core/yq/libs/actors/result_writer.cpp +++ b/ydb/core/yq/libs/actors/result_writer.cpp @@ -61,7 +61,7 @@ public: static constexpr char ActorName[] = "YQ_RESULT_WRITER"; - void Bootstrap(const TActorContext&) { + void Bootstrap() { LOG_I("Bootstrap"); Become(&TResultWriter::StateFunc); } @@ -69,11 +69,11 @@ public: private: STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) - HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) + hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) - HFunc(NDq::TEvDqCompute::TEvChannelData, OnChannelData) - HFunc(TEvReadyState, OnReadyState); - HFunc(TEvQueryResponse, OnQueryResult); + hFunc(NDq::TEvDqCompute::TEvChannelData, OnChannelData) + hFunc(TEvReadyState, OnReadyState); + hFunc(TEvQueryResponse, OnQueryResult); hFunc(NFq::TEvInternalService::TEvWriteResultResponse, HandleResponse); ) @@ -91,7 +91,7 @@ private: NActors::IActor::PassAway(); } - void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&, const NActors::TActorContext& ) { + void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&) { auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNAVAILABLE, TIssue("Undelivered").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR)); Send(ExecuterId, req.Release()); HasError = true; @@ -111,7 +111,7 @@ private: } } - void OnQueryResult(TEvQueryResponse::TPtr& ev, const TActorContext&) { + void OnQueryResult(TEvQueryResponse::TPtr& ev) { Finished = true; NYql::NDqProto::TQueryResponse queryResult(ev->Get()->Record); @@ -125,7 +125,7 @@ private: Send(ExecuterId, new TEvQueryResponse(std::move(queryResult))); } - void OnReadyState(TEvReadyState::TPtr&, const TActorContext&) { } + void OnReadyState(TEvReadyState::TPtr&) { } void HandleResponse(NFq::TEvInternalService::TEvWriteResultResponse::TPtr& ev) { auto statusCode = ev->Get()->Result.status_code(); @@ -311,9 +311,7 @@ private: Cookie++; } - void OnChannelData(NDq::TEvDqCompute::TEvChannelData::TPtr& ev, const TActorContext& ctx) { - Y_UNUSED(ctx); - + void OnChannelData(NDq::TEvDqCompute::TEvChannelData::TPtr& ev) { if (HasError) { StopChannel(ev); return; diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index 70385d299e0..fcf3b2a2704 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -149,7 +149,7 @@ public: { } - void Bootstrap(const TActorContext& ctx) { + void Bootstrap() { TProgramFactory progFactory(false, FunctionRegistry, NextUniqueId, DataProvidersInit, "yq"); progFactory.SetModules(ModuleResolver); progFactory.SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(FunctionRegistry, nullptr)); @@ -162,12 +162,12 @@ public: { if (!Program->ParseSql(SqlSettings)) { Issues.AddIssues(Program->Issues()); - SendStatusAndDie(ctx, TProgram::TStatus::Error, "Failed to parse query"); + SendStatusAndDie(TProgram::TStatus::Error, "Failed to parse query"); return; } if (ExecuteMode == YandexQuery::ExecuteMode::PARSE) { - SendStatusAndDie(ctx, TProgram::TStatus::Ok); + SendStatusAndDie(TProgram::TStatus::Ok); return; } } @@ -176,12 +176,12 @@ public: { if (!Program->Compile("")) { Issues.AddIssues(Program->Issues()); - SendStatusAndDie(ctx, TProgram::TStatus::Error, "Failed to compile query"); + SendStatusAndDie(TProgram::TStatus::Error, "Failed to compile query"); return; } if (ExecuteMode == YandexQuery::ExecuteMode::COMPILE) { - SendStatusAndDie(ctx, TProgram::TStatus::Ok); + SendStatusAndDie(TProgram::TStatus::Ok); return; } } @@ -201,7 +201,7 @@ public: futureStatus = Program->RunAsync(""); break; default: - SendStatusAndDie(ctx, TProgram::TStatus::Error, TStringBuilder() << "Unexpected execute mode " << static_cast<int>(ExecuteMode)); + SendStatusAndDie(TProgram::TStatus::Error, TStringBuilder() << "Unexpected execute mode " << static_cast<int>(ExecuteMode)); return; } @@ -212,7 +212,7 @@ public: Become(&TProgramRunnerActor::StateFunc); } - void SendStatusAndDie(const TActorContext& ctx, NYql::TProgram::TStatus status, const TString& message = "") { + void SendStatusAndDie(NYql::TProgram::TStatus status, const TString& message = "") { TString expr; TString plan; if (Compiled) { @@ -224,14 +224,14 @@ public: } Issues.AddIssues(Program->Issues()); Send(RunActorId, new TEvPrivate::TEvProgramFinished(Issues, plan, expr, status, message)); - Die(ctx); + PassAway(); } STRICT_STFUNC(StateFunc, - HFunc(TEvents::TEvAsyncContinue, Handle); + hFunc(TEvents::TEvAsyncContinue, Handle); ) - void Handle(TEvents::TEvAsyncContinue::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvents::TEvAsyncContinue::TPtr& ev) { NYql::TProgram::TStatus status = TProgram::TStatus::Error; const auto& f = ev->Get()->Future; @@ -239,9 +239,7 @@ public: status = f.GetValue(); if (status == TProgram::TStatus::Async) { auto futureStatus = Program->ContinueAsync(); - auto actorSystem = ctx.ActorSystem(); - auto selfId = ctx.SelfID; - futureStatus.Subscribe([actorSystem, selfId](const TProgram::TFutureStatus& f) { + futureStatus.Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) { actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f)); }); return; @@ -249,7 +247,7 @@ public: } catch (const std::exception& err) { Issues.AddIssue(ExceptionToIssue(err)); } - SendStatusAndDie(ctx, status); + SendStatusAndDie(status); } private: @@ -348,7 +346,7 @@ private: STRICT_STFUNC(StateFunc, hFunc(TEvPrivate::TEvProgramFinished, Handle); - HFunc(TEvents::TEvAsyncContinue, Handle); + hFunc(TEvents::TEvAsyncContinue, Handle); hFunc(NActors::TEvents::TEvUndelivered, Handle); hFunc(TEvents::TEvGraphParams, Handle); hFunc(TEvents::TEvDataStreamsReadRulesCreationResult, Handle); @@ -1714,7 +1712,7 @@ private: } } - void Handle(TEvents::TEvAsyncContinue::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvents::TEvAsyncContinue::TPtr& ev) { LOG_D("Compiling finished"); NYql::TProgram::TStatus status = TProgram::TStatus::Error; @@ -1723,9 +1721,7 @@ private: status = f.GetValue(); if (status == TProgram::TStatus::Async) { auto futureStatus = Program->ContinueAsync(); - auto actorSystem = ctx.ActorSystem(); - auto selfId = ctx.SelfID; - futureStatus.Subscribe([actorSystem, selfId](const TProgram::TFutureStatus& f) { + futureStatus.Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) { actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f)); }); return; diff --git a/ydb/core/yq/libs/actors/task_get.cpp b/ydb/core/yq/libs/actors/task_get.cpp index ce788677bd5..8cb0170afe3 100644 --- a/ydb/core/yq/libs/actors/task_get.cpp +++ b/ydb/core/yq/libs/actors/task_get.cpp @@ -53,13 +53,13 @@ public: static constexpr char ActorName[] = "YQ_PRIVATE_GET_TASK"; - void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev, const NActors::TActorContext& ctx) { + void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) { LOG_E("TGetTaskRequestActor::OnUndelivered"); auto response = MakeHolder<TEvents::TEvGetTaskResponse>(); response->Status = Ydb::StatusIds::GENERIC_ERROR; response->Issues.AddIssue("UNDELIVERED"); - ctx.Send(ev->Sender, response.Release()); - Die(ctx); + Send(ev->Sender, response.Release()); + PassAway(); } void PassAway() final { @@ -80,7 +80,7 @@ public: PassAway(); } - void Bootstrap(const TActorContext& ctx) { + void Bootstrap() { Become(&TGetTaskRequestActor::StateFunc); auto request = Ev->Record; LOG_D("Request CP::GetTask with size: " << request.ByteSize() << " bytes"); @@ -88,12 +88,12 @@ public: OwnerId = request.owner_id(); Host = request.host(); Tenant = request.tenant(); - ctx.Send(NYq::ControlPlaneStorageServiceActorId(), + Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(std::move(request))); } private: - void HandleResponse(NYq::TEvControlPlaneStorage::TEvGetTaskResponse::TPtr& ev, const TActorContext& ctx) { // YQ + void HandleResponse(NYq::TEvControlPlaneStorage::TEvGetTaskResponse::TPtr& ev) { // YQ LOG_D("Got CP::GetTask Response"); const auto& issues = ev->Get()->Issues; @@ -120,8 +120,8 @@ private: account.set_signature(signature); } } - ctx.Send(Sender, response.Release()); - Die(ctx); + Send(Sender, response.Release()); + PassAway(); } catch (...) { const auto msg = TStringBuilder() << "Can't do GetTask: " << CurrentExceptionMessage(); Fail(msg); @@ -131,9 +131,9 @@ private: private: STRICT_STFUNC( StateFunc, - CFunc(NActors::TEvents::TEvPoison::EventType, Die) - HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) - HFunc(NYq::TEvControlPlaneStorage::TEvGetTaskResponse, HandleResponse) + cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) + hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) + hFunc(NYq::TEvControlPlaneStorage::TEvGetTaskResponse, HandleResponse) ) const NConfig::TTokenAccessorConfig TokenAccessorConfig; diff --git a/ydb/core/yq/libs/actors/task_ping.cpp b/ydb/core/yq/libs/actors/task_ping.cpp index 5647011cbbd..d7b9cf1a7ff 100644 --- a/ydb/core/yq/libs/actors/task_ping.cpp +++ b/ydb/core/yq/libs/actors/task_ping.cpp @@ -45,13 +45,13 @@ public: static constexpr char ActorName[] = "YQ_PRIVATE_PING_TASK"; - void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev, const NActors::TActorContext& ctx) { + void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) { LOG_E("TTaskPingRequestActor::OnUndelivered"); auto res = MakeHolder<TEvents::TEvPingTaskResponse>(); res->Status = Ydb::StatusIds::GENERIC_ERROR; res->Issues.AddIssue("UNDELIVERED"); - ctx.Send(ev->Sender, res.Release()); - Die(ctx); + Send(ev->Sender, res.Release()); + PassAway(); } void PassAway() final { @@ -72,8 +72,7 @@ public: PassAway(); } - void Bootstrap(const TActorContext& ctx) { - Y_UNUSED(ctx); + void Bootstrap() { Become(&TTaskPingRequestActor::StateFunc); const auto& req = Ev->Record; OperationId = req.query_id().value(); @@ -95,9 +94,9 @@ public: private: STRICT_STFUNC( StateFunc, - CFunc(NActors::TEvents::TEvPoison::EventType, Die) - HFunc(NYq::TEvControlPlaneStorage::TEvPingTaskResponse, HandleResponse) - HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) + cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) + hFunc(NYq::TEvControlPlaneStorage::TEvPingTaskResponse, HandleResponse) + hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) ) std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> CreateControlPlaneEvent() { @@ -105,7 +104,7 @@ private: return std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request)); } - void HandleResponse(NYq::TEvControlPlaneStorage::TEvPingTaskResponse::TPtr& ev, const TActorContext& ctx) { + void HandleResponse(NYq::TEvControlPlaneStorage::TEvPingTaskResponse::TPtr& ev) { LOG_D("Got CP::PingTaskResponse"); const auto& issues = ev->Get()->Issues; if (issues) { @@ -117,8 +116,8 @@ private: auto response = MakeHolder<TEvents::TEvPingTaskResponse>(); response->Status = Ydb::StatusIds::SUCCESS; response->Record.ConstructInPlace(ev->Get()->Record); - ctx.Send(Sender, response.Release()); - Die(ctx); + Send(Sender, response.Release()); + PassAway(); } private: diff --git a/ydb/core/yq/libs/actors/task_result_write.cpp b/ydb/core/yq/libs/actors/task_result_write.cpp index dafa7f70d8f..5433de62bde 100644 --- a/ydb/core/yq/libs/actors/task_result_write.cpp +++ b/ydb/core/yq/libs/actors/task_result_write.cpp @@ -46,12 +46,12 @@ public: static constexpr char ActorName[] = "YQ_PRIVATE_WRITE_RESULT_TASK"; - void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev, const NActors::TActorContext& ctx) { + void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) { LOG_E("TWriteTaskRequestActor::OnUndelivered"); Res->Status = Ydb::StatusIds::GENERIC_ERROR; Res->Issues.AddIssue("UNDELIVERED"); - ctx.Send(ev->Sender, Res.Release()); - Die(ctx); + Send(ev->Sender, Res.Release()); + PassAway(); } void PassAway() final { @@ -71,7 +71,7 @@ public: PassAway(); } - void Bootstrap(const TActorContext&) { + void Bootstrap() { Become(&TWriteTaskRequestActor::StateFunc); auto request = Ev->Record; ResultId = request.result_id().value(); @@ -85,12 +85,12 @@ public: private: STRICT_STFUNC( StateFunc, - CFunc(NActors::TEvents::TEvPoison::EventType, Die) - HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) - HFunc(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse, HandleResponse); + cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) + hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) + hFunc(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse, HandleResponse); ) - void HandleResponse(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse::TPtr& ev, const TActorContext& ctx) { + void HandleResponse(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse::TPtr& ev) { LOG_D("Got CP::WriteTaskResult Response"); const auto& issues = ev->Get()->Issues; if (issues) { @@ -102,8 +102,8 @@ private: Res->Record->set_request_id(RequestId); Res->Issues.AddIssues(Issues); Res->Status = Ydb::StatusIds::SUCCESS; - ctx.Send(Sender, Res.Release()); - Die(ctx); + Send(Sender, Res.Release()); + PassAway(); } const TActorId Sender; diff --git a/ydb/core/yq/libs/checkpoint_storage/gc.cpp b/ydb/core/yq/libs/checkpoint_storage/gc.cpp index 2a0e73ecb4f..7d136cf15c1 100644 --- a/ydb/core/yq/libs/checkpoint_storage/gc.cpp +++ b/ydb/core/yq/libs/checkpoint_storage/gc.cpp @@ -58,16 +58,16 @@ class TActorGC : public TActorBootstrapped<TActorGC> { public: TActorGC(const TCheckpointStoragePtr& checkpointStorage, const TStateStoragePtr& stateStorage); - void Bootstrap(const TActorContext& ctx); + void Bootstrap(); static constexpr char ActorName[] = "YQ_GC_ACTOR"; private: STRICT_STFUNC(StateFunc, - HFunc(TEvCheckpointStorage::TEvNewCheckpointSucceeded, Handle); + hFunc(TEvCheckpointStorage::TEvNewCheckpointSucceeded, Handle); ) - void Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev, const NActors::TActorContext& ctx); + void Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev); }; TActorGC::TActorGC(const TCheckpointStoragePtr& checkpointStorage, const TStateStoragePtr& stateStorage) @@ -76,14 +76,14 @@ TActorGC::TActorGC(const TCheckpointStoragePtr& checkpointStorage, const TStateS { } -void TActorGC::Bootstrap(const TActorContext&) +void TActorGC::Bootstrap() { Become(&TActorGC::StateFunc); LOG_STREAMS_STORAGE_SERVICE_INFO("Successfully bootstrapped storage GC " << SelfId()); } -void TActorGC::Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev, const NActors::TActorContext& ctx) +void TActorGC::Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev) { const auto* event = ev->Get(); const auto& graphId = event->CoordinatorId.GraphId; @@ -98,7 +98,7 @@ void TActorGC::Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev, // 3. Delete marked checkpoints auto context = MakeIntrusive<TContext>( - ctx.ActorSystem(), + TActivationContext::ActorSystem(), CheckpointStorage, StateStorage, graphId, diff --git a/ydb/core/yq/libs/logs/log.cpp b/ydb/core/yq/libs/logs/log.cpp index 91c4af73ca9..546c895c1f4 100644 --- a/ydb/core/yq/libs/logs/log.cpp +++ b/ydb/core/yq/libs/logs/log.cpp @@ -7,6 +7,15 @@ #include <library/cpp/actors/core/log.h> +#define LOG_C(stream) \ + LOG_CRIT_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQL_PROXY, stream) + +#define LOG_E(stream) \ + LOG_ERROR_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQL_PROXY, stream) + +#define LOG_D(stream) \ + LOG_DEBUG_S(::NActors::TActivationContext::AsActorContext(), NKikimrServices::YQL_PROXY, stream) + namespace NKikimr { using namespace NActors; @@ -20,12 +29,12 @@ public: , LogConfig(logConfig) { } - void Bootstrap(const TActorContext& ctx) { - UpdateYqlLogLevels(ctx); + void Bootstrap() { + UpdateYqlLogLevels(); // Subscribe for Logger config changes ui32 logConfigKind = (ui32)NKikimrConsole::TConfigItem::LogConfigItem; - ctx.Send(NConsole::MakeConfigsDispatcherID(ctx.SelfID.NodeId()), + Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()), new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest( {logConfigKind}), IEventHandle::FlagTrackDelivery); @@ -34,68 +43,64 @@ public: } private: - STFUNC(MainState) { + STATEFN(MainState) { switch (ev->GetTypeRewrite()) { - HFunc(TEvents::TEvUndelivered, Handle); - HFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle); - HFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle); + hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle); default: Y_FAIL("TYqlLogsUpdater: unexpected event type: %" PRIx32 " event: %s", ev->GetTypeRewrite(), ev->HasEvent() ? ev->GetBase()->ToString().data() : "serialized?"); } } - void Handle(TEvents::TEvUndelivered::TPtr &ev, const TActorContext &ctx) { + void Handle(TEvents::TEvUndelivered::TPtr& ev) { switch (ev->Get()->SourceType) { case NConsole::TEvConfigsDispatcher::EvSetConfigSubscriptionRequest: - LOG_CRIT_S(ctx, NKikimrServices::YQL_PROXY, - "Failed to deliver subscription request to config dispatcher."); + LOG_C("Failed to deliver subscription request to config dispatcher."); break; case NConsole::TEvConsole::EvConfigNotificationResponse: - LOG_ERROR_S(ctx, NKikimrServices::YQL_PROXY, "Failed to deliver config notification response."); + LOG_E("Failed to deliver config notification response."); break; default: - LOG_ERROR_S(ctx, NKikimrServices::YQL_PROXY, - "Undelivered event with unexpected source type: " << ev->Get()->SourceType); + LOG_E("Undelivered event with unexpected source type: " << ev->Get()->SourceType); break; } } - void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr &ev, const TActorContext &ctx) { + void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr& ev) { Y_UNUSED(ev); - LOG_DEBUG_S(ctx, NKikimrServices::YQL_PROXY, "Subscribed for config changes."); + LOG_D("Subscribed for config changes."); } - void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr &ev, const TActorContext &ctx) { - auto &event = ev->Get()->Record; + void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) { + auto& event = ev->Get()->Record; - LOG_DEBUG_S(ctx, NKikimrServices::YQL_PROXY, "Updated table service config."); + LOG_D("Updated table service config."); LogConfig.Swap(event.MutableConfig()->MutableLogConfig()); - UpdateYqlLogLevels(ctx); + UpdateYqlLogLevels(); } - void UpdateYqlLogLevels(const TActorContext& ctx) { + void UpdateYqlLogLevels() { const auto& kqpYqlName = NKikimrServices::EServiceKikimr_Name(NKikimrServices::KQP_YQL); - for (auto &entry : LogConfig.GetEntry()) { + for (auto& entry : LogConfig.GetEntry()) { if (entry.GetComponent() == kqpYqlName && entry.HasLevel()) { auto yqlPriority = static_cast<NActors::NLog::EPriority>(entry.GetLevel()); NYql::NDq::SetYqlLogLevels(yqlPriority); - LOG_DEBUG_S(ctx, NKikimrServices::YQL_PROXY, "Updated YQL logs priority: " - << (ui32)yqlPriority); + LOG_D("Updated YQL logs priority: " << (ui32)yqlPriority); return; } } // Set log level based on current logger settings - ui8 currentLevel = ctx.LoggerSettings()->GetComponentSettings(NKikimrServices::KQP_YQL).Raw.X.Level; + ui8 currentLevel = TActivationContext::AsActorContext().LoggerSettings()->GetComponentSettings(NKikimrServices::KQP_YQL).Raw.X.Level; auto yqlPriority = static_cast<NActors::NLog::EPriority>(currentLevel); - LOG_DEBUG_S(ctx, NKikimrServices::YQL_PROXY, "Updated YQL logs priority to current level: " - << (ui32)yqlPriority); + LOG_D("Updated YQL logs priority to current level: " << (ui32)yqlPriority); NYql::NDq::SetYqlLogLevels(yqlPriority); } diff --git a/ydb/core/yq/libs/mock/yql_mock.cpp b/ydb/core/yq/libs/mock/yql_mock.cpp index 8fe694f35b3..ba17d98101d 100644 --- a/ydb/core/yq/libs/mock/yql_mock.cpp +++ b/ydb/core/yq/libs/mock/yql_mock.cpp @@ -40,13 +40,11 @@ public: private: STRICT_STFUNC(Handler, { - HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); - }); + hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + }); - void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, - const NActors::TActorContext& ctx) + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event) { - Y_UNUSED(ctx); auto request = event->Get()->Request; auto parameters = NHttp::TUrlParameters(request->URL); TString databaseId = parameters["databaseId"]; diff --git a/ydb/core/yq/libs/shared_resources/db_pool.cpp b/ydb/core/yq/libs/shared_resources/db_pool.cpp index 0fa75f141e0..9af52e67e19 100644 --- a/ydb/core/yq/libs/shared_resources/db_pool.cpp +++ b/ydb/core/yq/libs/shared_resources/db_pool.cpp @@ -11,15 +11,25 @@ #include <util/stream/file.h> #include <util/string/strip.h> -#define LOG_F(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, EMERG, STREAMS, logRecordStream) -#define LOG_A(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, ALERT, STREAMS, logRecordStream) -#define LOG_C(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, CRIT, STREAMS, logRecordStream) -#define LOG_E(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, ERROR, STREAMS, logRecordStream) -#define LOG_W(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, WARN, STREAMS, logRecordStream) -#define LOG_N(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, NOTICE, STREAMS, logRecordStream) -#define LOG_I(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, INFO, STREAMS, logRecordStream) -#define LOG_D(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, DEBUG, STREAMS, logRecordStream) -#define LOG_T(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, TRACE, STREAMS, logRecordStream) +#define LOG_F_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, EMERG, STREAMS, logRecordStream) +#define LOG_A_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, ALERT, STREAMS, logRecordStream) +#define LOG_C_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, CRIT, STREAMS, logRecordStream) +#define LOG_E_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, ERROR, STREAMS, logRecordStream) +#define LOG_W_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, WARN, STREAMS, logRecordStream) +#define LOG_N_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, NOTICE, STREAMS, logRecordStream) +#define LOG_I_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, INFO, STREAMS, logRecordStream) +#define LOG_D_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, DEBUG, STREAMS, logRecordStream) +#define LOG_T_AS(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(*actorSystem, TRACE, STREAMS, logRecordStream) + +#define LOG_F(logRecordStream) LOG_F_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream) +#define LOG_A(logRecordStream) LOG_A_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream) +#define LOG_C(logRecordStream) LOG_C_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream) +#define LOG_E(logRecordStream) LOG_E_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream) +#define LOG_W(logRecordStream) LOG_W_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream) +#define LOG_N(logRecordStream) LOG_N_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream) +#define LOG_I(logRecordStream) LOG_I_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream) +#define LOG_D(logRecordStream) LOG_D_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream) +#define LOG_T(logRecordStream) LOG_T_AS(::NActors::TActivationContext::ActorSystem(), logRecordStream) namespace NYq { @@ -41,30 +51,30 @@ public: static constexpr char ActorName[] = "YQ_DB_POOL"; STRICT_STFUNC(WorkingState, - CFunc(NActors::TEvents::TEvPoison::EventType, Die) - HFunc(TEvents::TEvDbRequest, HandleRequest) - HFunc(TEvents::TEvDbResponse, HandleResponse) - HFunc(TEvents::TEvDbFunctionRequest, HandleRequest) - HFunc(TEvents::TEvDbFunctionResponse, HandleResponse) + cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) + hFunc(TEvents::TEvDbRequest, HandleRequest) + hFunc(TEvents::TEvDbResponse, HandleResponse) + hFunc(TEvents::TEvDbFunctionRequest, HandleRequest) + hFunc(TEvents::TEvDbFunctionResponse, HandleResponse) ) - void Die(const TActorContext& ctx) override { + void PassAway() override { NYql::TIssues issues; issues.AddIssue("DB connection closed"); auto cancelled = NYdb::TStatus(NYdb::EStatus::CANCELLED, std::move(issues)); for (const auto& x : Requests) { if (auto pRequest = std::get_if<TRequest>(&x)) { - ctx.Send(pRequest->Sender, new TEvents::TEvDbResponse(cancelled, {})); + Send(pRequest->Sender, new TEvents::TEvDbResponse(cancelled, {})); } else if (auto pRequest = std::get_if<TFunctionRequest>(&x)) { - ctx.Send(pRequest->Sender, new TEvents::TEvDbFunctionResponse(cancelled)); + Send(pRequest->Sender, new TEvents::TEvDbFunctionResponse(cancelled)); } } State.reset(); - IActor::Die(ctx); + IActor::PassAway(); } - void ProcessQueue(const TActorContext& ctx) { + void ProcessQueue() { QueueSize->Collect(Requests.size()); if (Requests.empty() || RequestInProgress) { return; @@ -75,12 +85,10 @@ public: RequestInProgressTimestamp = TInstant::Now(); const auto& requestVariant = Requests.front(); - LOG_T(ctx, "TDbPoolActor: ProcessQueue " << SelfId() << " Queue size = " << Requests.size()); + LOG_T("TDbPoolActor: ProcessQueue " << SelfId() << " Queue size = " << Requests.size()); if (auto pRequest = std::get_if<TRequest>(&requestVariant)) { auto& request = *pRequest; - auto actorSystem = ctx.ActorSystem(); - auto selfId = ctx.SelfID; auto cookie = request.Cookie; auto sharedResult = std::make_shared<TVector<NYdb::TResultSet>>(); NYdb::NTable::TRetryOperationSettings settings; @@ -94,65 +102,63 @@ public: return future; }); }, settings) - .Subscribe([state = std::weak_ptr<int>(State), sharedResult, actorSystem, cookie, selfId](const NThreading::TFuture<NYdb::TStatus>& statusFuture) { + .Subscribe([state = std::weak_ptr<int>(State), sharedResult, actorSystem = TActivationContext::ActorSystem(), cookie, selfId = SelfId()](const NThreading::TFuture<NYdb::TStatus>& statusFuture) { if (state.lock()) { actorSystem->Send(new IEventHandle(selfId, selfId, new TEvents::TEvDbResponse(statusFuture.GetValue(), *sharedResult), 0, cookie)); } else { - LOG_T(*actorSystem, "TDbPoolActor: ProcessQueue " << selfId << " State destroyed"); + LOG_T_AS(actorSystem, "TDbPoolActor: ProcessQueue " << selfId << " State destroyed"); } }); } else if (auto pRequest = std::get_if<TFunctionRequest>(&requestVariant)) { auto& request = *pRequest; - auto selfId = ctx.SelfID; auto cookie = request.Cookie; - auto actorSystem = ctx.ActorSystem(); TableClient.RetryOperation([request](NYdb::NTable::TSession session) { return request.Handler(session); }) - .Subscribe([state = std::weak_ptr<int>(State), actorSystem, selfId, cookie](const NThreading::TFuture<NYdb::TStatus>& statusFuture) { + .Subscribe([state = std::weak_ptr<int>(State), actorSystem = TActivationContext::ActorSystem(), selfId = SelfId(), cookie](const NThreading::TFuture<NYdb::TStatus>& statusFuture) { if (state.lock()) { actorSystem->Send(new IEventHandle(selfId, selfId, new TEvents::TEvDbFunctionResponse(statusFuture.GetValue()), 0, cookie)); } else { - LOG_T(*actorSystem, "TDbPoolActor: ProcessQueue " << selfId << " State destroyed"); + LOG_T_AS(actorSystem, "TDbPoolActor: ProcessQueue " << selfId << " State destroyed"); } }); } } - void HandleRequest(TEvents::TEvDbRequest::TPtr& ev, const TActorContext& ctx) { - LOG_D(ctx, "TDbPoolActor: TEvDbRequest " << SelfId() << " Queue size = " << Requests.size()); + void HandleRequest(TEvents::TEvDbRequest::TPtr& ev) { + LOG_D("TDbPoolActor: TEvDbRequest " << SelfId() << " Queue size = " << Requests.size()); auto request = ev->Get(); Requests.emplace_back(TRequest{ev->Sender, ev->Cookie, request->Sql, std::move(request->Params), request->Idempotent}); - ProcessQueue(ctx); + ProcessQueue(); } - void PopFromQueueAndProcess(const TActorContext& ctx) { + void PopFromQueueAndProcess() { RequestInProgress = false; RequestsTime->Collect(TInstant::Now().MilliSeconds() - RequestInProgressTimestamp.MilliSeconds()); Requests.pop_front(); TotalInFlight->Dec(); - ProcessQueue(ctx); + ProcessQueue(); } - void HandleResponse(TEvents::TEvDbResponse::TPtr& ev, const TActorContext& ctx) { - LOG_T(ctx, "TDbPoolActor: TEvDbResponse " << SelfId() << " Queue size = " << Requests.size()); + void HandleResponse(TEvents::TEvDbResponse::TPtr& ev) { + LOG_T("TDbPoolActor: TEvDbResponse " << SelfId() << " Queue size = " << Requests.size()); const auto& request = Requests.front(); - ctx.Send(ev->Forward(std::visit([](const auto& arg) { return arg.Sender; }, request))); - PopFromQueueAndProcess(ctx); + TActivationContext::Send(ev->Forward(std::visit([](const auto& arg) { return arg.Sender; }, request))); + PopFromQueueAndProcess(); } - void HandleRequest(TEvents::TEvDbFunctionRequest::TPtr& ev, const TActorContext& ctx) { - LOG_T(ctx, "TDbPoolActor: TEvDbFunctionRequest " << SelfId() << " Queue size = " << Requests.size()); + void HandleRequest(TEvents::TEvDbFunctionRequest::TPtr& ev) { + LOG_T("TDbPoolActor: TEvDbFunctionRequest " << SelfId() << " Queue size = " << Requests.size()); auto request = ev->Get(); Requests.emplace_back(TFunctionRequest{ev->Sender, ev->Cookie, std::move(request->Handler)}); - ProcessQueue(ctx); + ProcessQueue(); } - void HandleResponse(TEvents::TEvDbFunctionResponse::TPtr& ev, const TActorContext& ctx) { - LOG_T(ctx, "TDbPoolActor: TEvDbFunctionResponse " << SelfId() << " Queue size = " << Requests.size()); + void HandleResponse(TEvents::TEvDbFunctionResponse::TPtr& ev) { + LOG_T("TDbPoolActor: TEvDbFunctionResponse " << SelfId() << " Queue size = " << Requests.size()); const auto& request = Requests.front(); - ctx.Send(ev->Forward(std::visit([](const auto& arg) { return arg.Sender; }, request))); - PopFromQueueAndProcess(ctx); + TActivationContext::Send(ev->Forward(std::visit([](const auto& arg) { return arg.Sender; }, request))); + PopFromQueueAndProcess(); } private: |