diff options
author | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-04-18 18:18:15 +0300 |
---|---|---|
committer | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-04-18 18:18:15 +0300 |
commit | 023a13e200c615656818363b898c1362f63b7cda (patch) | |
tree | 6f31f0609f48bc53e8d1e6da083178c4003bf4f9 | |
parent | 5c93b36e1b397f278188aebb3b02276dcf172621 (diff) | |
download | ydb-023a13e200c615656818363b898c1362f63b7cda.tar.gz |
Dedup and simplifiaction
ref:e05914350d45b1d7aa5c13f981abc83d1f43ccf2
-rw-r--r-- | ydb/core/yq/libs/actors/nodes_manager.cpp | 6 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/pending_fetcher.cpp | 4 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/pinger.cpp | 12 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/result_writer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/yq/libs/private_client/internal_service.cpp | 77 | ||||
-rw-r--r-- | ydb/core/yq/libs/private_client/internal_service.h | 19 |
6 files changed, 36 insertions, 84 deletions
diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp index cc72e15dd5..df7f804d26 100644 --- a/ydb/core/yq/libs/actors/nodes_manager.cpp +++ b/ydb/core/yq/libs/actors/nodes_manager.cpp @@ -175,10 +175,10 @@ private: void HandleResponse(TEvInternalService::TEvHealthCheckResponse::TPtr& ev) { try { - const auto& status = ev->Get()->Status; + const auto& status = ev->Get()->Status.GetStatus(); THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo()); - if (!ev->Get()->Success) { - ythrow yexception() << status << '\n' << ev->Get()->Issues.ToString(); + if (!ev->Get()->Status.IsSuccess()) { + ythrow yexception() << status << '\n' << ev->Get()->Status.GetIssues().ToString(); } const auto& res = ev->Get()->Result; diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp index d756cc9482..2c86c939e7 100644 --- a/ydb/core/yq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp @@ -206,8 +206,8 @@ private: void Handle(TEvInternalService::TEvGetTaskResponse::TPtr& ev) { HasRunningRequest = false; LOG_D("Got GetTask response from PrivateApi"); - if (!ev->Get()->Success) { - LOG_E("Error with GetTask: "<< ev->Get()->Issues.ToString()); + if (!ev->Get()->Status.IsSuccess()) { + LOG_E("Error with GetTask: "<< ev->Get()->Status.GetIssues().ToString()); return; } diff --git a/ydb/core/yq/libs/actors/pinger.cpp b/ydb/core/yq/libs/actors/pinger.cpp index 9b44b86419..9cd8b06258 100644 --- a/ydb/core/yq/libs/actors/pinger.cpp +++ b/ydb/core/yq/libs/actors/pinger.cpp @@ -263,11 +263,11 @@ private: } static bool Retryable(TEvInternalService::TEvPingTaskResponse::TPtr& ev) { - if (ev->Get()->TransportError) { + if (ev->Get()->Status.IsTransportError()) { return true; } - const NYdb::EStatus status = ev->Get()->Status; + const NYdb::EStatus status = ev->Get()->Status.GetStatus(); if (status == NYdb::EStatus::INTERNAL_ERROR || status == NYdb::EStatus::UNAVAILABLE || status == NYdb::EStatus::OVERLOADED @@ -288,7 +288,7 @@ private: } const TInstant now = TActivationContext::Now(); - const bool success = ev->Get()->Success; + const bool success = ev->Get()->Status.IsSuccess(); const bool retryable = !success && Retryable(ev); const bool continueLeaseRequest = ev->Cookie == ContinueLeaseRequestCookie; TRetryState* retryState = nullptr; @@ -337,15 +337,15 @@ private: } } } else if (retryAfter) { - LOG_W("Ping response error: " << ev->Get()->Issues.ToOneLineString() << ". Retry after: " << *retryAfter); + LOG_W("Ping response error: " << ev->Get()->Status.GetIssues().ToOneLineString() << ". Retry after: " << *retryAfter); Schedule(*retryAfter, new NActors::TEvents::TEvWakeup(continueLeaseRequest ? RetryContinueLeaseWakeupTag : RetryForwardPingRequestWakeupTag)); } else { TRetryState* retryStateForLogging = retryState; if (!retryStateForLogging) { retryStateForLogging = continueLeaseRequest ? &RetryState : &ForwardRequests.front().RetryState; } - LOG_E("Ping response error: " << ev->Get()->Issues.ToOneLineString() << ". Retried " << retryStateForLogging->GetRetriesCount() << " times during " << retryStateForLogging->GetRetryTime(now)); - auto action = ev->Get()->Success ? ev->Get()->Result.action() : YandexQuery::QUERY_ACTION_UNSPECIFIED; + LOG_E("Ping response error: " << ev->Get()->Status.GetIssues().ToOneLineString() << ". Retried " << retryStateForLogging->GetRetriesCount() << " times during " << retryStateForLogging->GetRetryTime(now)); + auto action = ev->Get()->Status.IsSuccess() ? ev->Get()->Result.action() : YandexQuery::QUERY_ACTION_UNSPECIFIED; Send(Parent, new TEvents::TEvForwardPingResponse(false, action), 0, ev->Cookie); FatalError = true; ForwardRequests.clear(); diff --git a/ydb/core/yq/libs/actors/result_writer.cpp b/ydb/core/yq/libs/actors/result_writer.cpp index 6b891f2b12..0888d112b6 100644 --- a/ydb/core/yq/libs/actors/result_writer.cpp +++ b/ydb/core/yq/libs/actors/result_writer.cpp @@ -113,7 +113,7 @@ private: void OnReadyState(TEvReadyState::TPtr&, const TActorContext&) { } void HandleResponse(TEvInternalService::TEvWriteResultResponse::TPtr& ev) { - const auto& issues = ev->Get()->Issues; + const auto& issues = ev->Get()->Status.GetIssues(); if (issues) { SendIssuesAndSetErrorFlag(issues); return; diff --git a/ydb/core/yq/libs/private_client/internal_service.cpp b/ydb/core/yq/libs/private_client/internal_service.cpp index ba9cd274c1..e80c40fe90 100644 --- a/ydb/core/yq/libs/private_client/internal_service.cpp +++ b/ydb/core/yq/libs/private_client/internal_service.cpp @@ -60,105 +60,52 @@ private: void Handle(TEvInternalService::TEvHealthCheckRequest::TPtr& ev) { EventLatency->Collect((TInstant::Now() - ev->Get()->SentAt).MilliSeconds()); - const auto actorSystem = NActors::TActivationContext::ActorSystem(); - const auto senderId = ev->Sender; PrivateClient .NodesHealthCheck(std::move(ev->Get()->Request)) - .Subscribe([actorSystem, senderId](const NThreading::TFuture<TNodesHealthCheckResult>& future) { + .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), senderId = ev->Sender, selfId = SelfId(), cookie = ev->Cookie](const NThreading::TFuture<TNodesHealthCheckResult>& future) { try { - const auto& wrappedResult = future.GetValue(); - if (wrappedResult.IsResultSet()) { - actorSystem->Send(senderId, new TEvInternalService::TEvHealthCheckResponse( - wrappedResult.IsSuccess(), wrappedResult.GetStatus(), wrappedResult.GetIssues(), wrappedResult.GetResult()) - ); - } else { - actorSystem->Send(senderId, new TEvInternalService::TEvHealthCheckResponse( - false, wrappedResult.GetStatus(), NYql::TIssues{{NYql::TIssue{"grpc private api result is not set for health check call"}}}, Yq::Private::NodesHealthCheckResult{}) - ); - } + actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvHealthCheckResponse(future.GetValue()), 0, cookie)); } catch (...) { - actorSystem->Send(senderId, new TEvInternalService::TEvHealthCheckResponse( - false, NYdb::EStatus::STATUS_UNDEFINED, NYql::TIssues{{NYql::TIssue{CurrentExceptionMessage()}}}, Yq::Private::NodesHealthCheckResult{}) - ); + actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvHealthCheckResponse(CurrentExceptionMessage()), 0, cookie)); } }); } void Handle(TEvInternalService::TEvGetTaskRequest::TPtr& ev) { EventLatency->Collect((TInstant::Now() - ev->Get()->SentAt).MilliSeconds()); - const auto actorSystem = NActors::TActivationContext::ActorSystem(); - const auto senderId = ev->Sender; PrivateClient .GetTask(std::move(ev->Get()->Request)) - .Subscribe([actorSystem, senderId](const NThreading::TFuture<TGetTaskResult>& future) { + .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), senderId = ev->Sender, selfId = SelfId(), cookie = ev->Cookie](const NThreading::TFuture<TGetTaskResult>& future) { try { - const auto& wrappedResult = future.GetValue(); - if (wrappedResult.IsResultSet()) { - actorSystem->Send(senderId, new TEvInternalService::TEvGetTaskResponse( - wrappedResult.IsSuccess(), wrappedResult.GetStatus(), wrappedResult.GetIssues(), wrappedResult.GetResult()) - ); - } else { - actorSystem->Send(senderId, new TEvInternalService::TEvGetTaskResponse( - false, wrappedResult.GetStatus(), NYql::TIssues{{NYql::TIssue{"grpc private api result is not set for get task call"}}}, Yq::Private::GetTaskResult{}) - ); - } + actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvGetTaskResponse(future.GetValue()), 0, cookie)); } catch (...) { - actorSystem->Send(senderId, new TEvInternalService::TEvGetTaskResponse( - false, NYdb::EStatus::STATUS_UNDEFINED, NYql::TIssues{{NYql::TIssue{CurrentExceptionMessage()}}}, Yq::Private::GetTaskResult{}) - ); + actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvGetTaskResponse(CurrentExceptionMessage()), 0, cookie)); } }); } void Handle(TEvInternalService::TEvPingTaskRequest::TPtr& ev) { EventLatency->Collect((TInstant::Now() - ev->Get()->SentAt).MilliSeconds()); - const auto actorSystem = NActors::TActivationContext::ActorSystem(); - const auto senderId = ev->Sender; - const auto selfId = SelfId(); PrivateClient .PingTask(std::move(ev->Get()->Request)) - .Subscribe([actorSystem, senderId, selfId, cookie=ev->Cookie](const NThreading::TFuture<TPingTaskResult>& future) { + .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), senderId = ev->Sender, selfId = SelfId(), cookie = ev->Cookie](const NThreading::TFuture<TPingTaskResult>& future) { try { - const auto& wrappedResult = future.GetValue(); - if (wrappedResult.IsResultSet()) { - actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvPingTaskResponse( - wrappedResult.IsSuccess(), wrappedResult.GetStatus(), wrappedResult.GetIssues(), wrappedResult.GetResult(), wrappedResult.IsTransportError()) - , 0, cookie)); - } else { - actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvPingTaskResponse( - false, wrappedResult.GetStatus(), NYql::TIssues{{NYql::TIssue{"grpc private api result is not set for ping task call"}}}, Yq::Private::PingTaskResult{}) - , 0, cookie)); - } + actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvPingTaskResponse(future.GetValue()), 0, cookie)); } catch (...) { - actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvPingTaskResponse( - false, NYdb::EStatus::STATUS_UNDEFINED, NYql::TIssues{{NYql::TIssue{CurrentExceptionMessage()}}}, Yq::Private::PingTaskResult{}) - , 0, cookie)); + actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvPingTaskResponse(CurrentExceptionMessage()), 0, cookie)); } }); } void Handle(TEvInternalService::TEvWriteResultRequest::TPtr& ev) { EventLatency->Collect((TInstant::Now() - ev->Get()->SentAt).MilliSeconds()); - const auto actorSystem = NActors::TActivationContext::ActorSystem(); - const auto senderId = ev->Sender; PrivateClient .WriteTaskResult(std::move(ev->Get()->Request)) - .Subscribe([actorSystem, senderId](const auto& future) { + .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), senderId = ev->Sender, selfId = SelfId(), cookie = ev->Cookie](const auto& future) { try { - const auto& wrappedResult = future.GetValue(); - if (wrappedResult.IsResultSet()) { - actorSystem->Send(senderId, new TEvInternalService::TEvWriteResultResponse( - wrappedResult.IsSuccess(), wrappedResult.GetStatus(), wrappedResult.GetIssues(), wrappedResult.GetResult()) - ); - } else { - actorSystem->Send(senderId, new TEvInternalService::TEvWriteResultResponse( - false, wrappedResult.GetStatus(), NYql::TIssues{{NYql::TIssue{"grpc private api result is not set for write result task call"}}}, Yq::Private::WriteTaskResultResult{}) - ); - } + actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvWriteResultResponse(future.GetValue()), 0, cookie)); } catch (...) { - actorSystem->Send(senderId, new TEvInternalService::TEvWriteResultResponse( - false, NYdb::EStatus::STATUS_UNDEFINED, NYql::TIssues{{NYql::TIssue{CurrentExceptionMessage()}}}, Yq::Private::WriteTaskResultResult{}) - ); + actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvWriteResultResponse(CurrentExceptionMessage()), 0, cookie)); } }); } diff --git a/ydb/core/yq/libs/private_client/internal_service.h b/ydb/core/yq/libs/private_client/internal_service.h index 207a1b7fd3..5bad866dfe 100644 --- a/ydb/core/yq/libs/private_client/internal_service.h +++ b/ydb/core/yq/libs/private_client/internal_service.h @@ -1,5 +1,7 @@ #pragma once +#include "private_client.h" + #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/event_local.h> @@ -45,14 +47,17 @@ struct TEvInternalService { template <class TProtoResult, ui32 TEventType> struct TInternalServiceResponseEvent : public NActors::TEventLocal<TInternalServiceResponseEvent<TProtoResult, TEventType>, TEventType> { - bool Success = false; - NYdb::EStatus Status; - const NYql::TIssues Issues; + NYdb::TStatus Status; TProtoResult Result; - bool TransportError = false; - explicit TInternalServiceResponseEvent(bool success, NYdb::EStatus status, const NYql::TIssues& issues, const TProtoResult& result, bool transportError = false) - : Success(success), Status(status), Issues(issues), Result(result), TransportError(transportError) - { } + TInternalServiceResponseEvent(const TProtoResultInternalWrapper<TProtoResult>& wrappedResult) : Status(wrappedResult) + { + if (wrappedResult.IsResultSet()) { + Result = wrappedResult.GetResult(); + } + } + TInternalServiceResponseEvent(const TString& errorMessage) : Status(NYdb::EStatus::INTERNAL_ERROR, {NYql::TIssue(errorMessage).SetCode(NYql::UNEXPECTED_ERROR, NYql::TSeverityIds::S_ERROR)}) + { + } }; using TEvHealthCheckResponse = TInternalServiceResponseEvent<Yq::Private::NodesHealthCheckResult, EvHealthCheckResponse>; |