aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <hor911@gmail.com>2022-04-18 18:18:15 +0300
committerAleksandr Khoroshilov <hor911@gmail.com>2022-04-18 18:18:15 +0300
commit023a13e200c615656818363b898c1362f63b7cda (patch)
tree6f31f0609f48bc53e8d1e6da083178c4003bf4f9
parent5c93b36e1b397f278188aebb3b02276dcf172621 (diff)
downloadydb-023a13e200c615656818363b898c1362f63b7cda.tar.gz
Dedup and simplifiaction
ref:e05914350d45b1d7aa5c13f981abc83d1f43ccf2
-rw-r--r--ydb/core/yq/libs/actors/nodes_manager.cpp6
-rw-r--r--ydb/core/yq/libs/actors/pending_fetcher.cpp4
-rw-r--r--ydb/core/yq/libs/actors/pinger.cpp12
-rw-r--r--ydb/core/yq/libs/actors/result_writer.cpp2
-rw-r--r--ydb/core/yq/libs/private_client/internal_service.cpp77
-rw-r--r--ydb/core/yq/libs/private_client/internal_service.h19
6 files changed, 36 insertions, 84 deletions
diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp
index cc72e15dd5..df7f804d26 100644
--- a/ydb/core/yq/libs/actors/nodes_manager.cpp
+++ b/ydb/core/yq/libs/actors/nodes_manager.cpp
@@ -175,10 +175,10 @@ private:
void HandleResponse(TEvInternalService::TEvHealthCheckResponse::TPtr& ev) {
try {
- const auto& status = ev->Get()->Status;
+ const auto& status = ev->Get()->Status.GetStatus();
THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo());
- if (!ev->Get()->Success) {
- ythrow yexception() << status << '\n' << ev->Get()->Issues.ToString();
+ if (!ev->Get()->Status.IsSuccess()) {
+ ythrow yexception() << status << '\n' << ev->Get()->Status.GetIssues().ToString();
}
const auto& res = ev->Get()->Result;
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp
index d756cc9482..2c86c939e7 100644
--- a/ydb/core/yq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp
@@ -206,8 +206,8 @@ private:
void Handle(TEvInternalService::TEvGetTaskResponse::TPtr& ev) {
HasRunningRequest = false;
LOG_D("Got GetTask response from PrivateApi");
- if (!ev->Get()->Success) {
- LOG_E("Error with GetTask: "<< ev->Get()->Issues.ToString());
+ if (!ev->Get()->Status.IsSuccess()) {
+ LOG_E("Error with GetTask: "<< ev->Get()->Status.GetIssues().ToString());
return;
}
diff --git a/ydb/core/yq/libs/actors/pinger.cpp b/ydb/core/yq/libs/actors/pinger.cpp
index 9b44b86419..9cd8b06258 100644
--- a/ydb/core/yq/libs/actors/pinger.cpp
+++ b/ydb/core/yq/libs/actors/pinger.cpp
@@ -263,11 +263,11 @@ private:
}
static bool Retryable(TEvInternalService::TEvPingTaskResponse::TPtr& ev) {
- if (ev->Get()->TransportError) {
+ if (ev->Get()->Status.IsTransportError()) {
return true;
}
- const NYdb::EStatus status = ev->Get()->Status;
+ const NYdb::EStatus status = ev->Get()->Status.GetStatus();
if (status == NYdb::EStatus::INTERNAL_ERROR
|| status == NYdb::EStatus::UNAVAILABLE
|| status == NYdb::EStatus::OVERLOADED
@@ -288,7 +288,7 @@ private:
}
const TInstant now = TActivationContext::Now();
- const bool success = ev->Get()->Success;
+ const bool success = ev->Get()->Status.IsSuccess();
const bool retryable = !success && Retryable(ev);
const bool continueLeaseRequest = ev->Cookie == ContinueLeaseRequestCookie;
TRetryState* retryState = nullptr;
@@ -337,15 +337,15 @@ private:
}
}
} else if (retryAfter) {
- LOG_W("Ping response error: " << ev->Get()->Issues.ToOneLineString() << ". Retry after: " << *retryAfter);
+ LOG_W("Ping response error: " << ev->Get()->Status.GetIssues().ToOneLineString() << ". Retry after: " << *retryAfter);
Schedule(*retryAfter, new NActors::TEvents::TEvWakeup(continueLeaseRequest ? RetryContinueLeaseWakeupTag : RetryForwardPingRequestWakeupTag));
} else {
TRetryState* retryStateForLogging = retryState;
if (!retryStateForLogging) {
retryStateForLogging = continueLeaseRequest ? &RetryState : &ForwardRequests.front().RetryState;
}
- LOG_E("Ping response error: " << ev->Get()->Issues.ToOneLineString() << ". Retried " << retryStateForLogging->GetRetriesCount() << " times during " << retryStateForLogging->GetRetryTime(now));
- auto action = ev->Get()->Success ? ev->Get()->Result.action() : YandexQuery::QUERY_ACTION_UNSPECIFIED;
+ LOG_E("Ping response error: " << ev->Get()->Status.GetIssues().ToOneLineString() << ". Retried " << retryStateForLogging->GetRetriesCount() << " times during " << retryStateForLogging->GetRetryTime(now));
+ auto action = ev->Get()->Status.IsSuccess() ? ev->Get()->Result.action() : YandexQuery::QUERY_ACTION_UNSPECIFIED;
Send(Parent, new TEvents::TEvForwardPingResponse(false, action), 0, ev->Cookie);
FatalError = true;
ForwardRequests.clear();
diff --git a/ydb/core/yq/libs/actors/result_writer.cpp b/ydb/core/yq/libs/actors/result_writer.cpp
index 6b891f2b12..0888d112b6 100644
--- a/ydb/core/yq/libs/actors/result_writer.cpp
+++ b/ydb/core/yq/libs/actors/result_writer.cpp
@@ -113,7 +113,7 @@ private:
void OnReadyState(TEvReadyState::TPtr&, const TActorContext&) { }
void HandleResponse(TEvInternalService::TEvWriteResultResponse::TPtr& ev) {
- const auto& issues = ev->Get()->Issues;
+ const auto& issues = ev->Get()->Status.GetIssues();
if (issues) {
SendIssuesAndSetErrorFlag(issues);
return;
diff --git a/ydb/core/yq/libs/private_client/internal_service.cpp b/ydb/core/yq/libs/private_client/internal_service.cpp
index ba9cd274c1..e80c40fe90 100644
--- a/ydb/core/yq/libs/private_client/internal_service.cpp
+++ b/ydb/core/yq/libs/private_client/internal_service.cpp
@@ -60,105 +60,52 @@ private:
void Handle(TEvInternalService::TEvHealthCheckRequest::TPtr& ev) {
EventLatency->Collect((TInstant::Now() - ev->Get()->SentAt).MilliSeconds());
- const auto actorSystem = NActors::TActivationContext::ActorSystem();
- const auto senderId = ev->Sender;
PrivateClient
.NodesHealthCheck(std::move(ev->Get()->Request))
- .Subscribe([actorSystem, senderId](const NThreading::TFuture<TNodesHealthCheckResult>& future) {
+ .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), senderId = ev->Sender, selfId = SelfId(), cookie = ev->Cookie](const NThreading::TFuture<TNodesHealthCheckResult>& future) {
try {
- const auto& wrappedResult = future.GetValue();
- if (wrappedResult.IsResultSet()) {
- actorSystem->Send(senderId, new TEvInternalService::TEvHealthCheckResponse(
- wrappedResult.IsSuccess(), wrappedResult.GetStatus(), wrappedResult.GetIssues(), wrappedResult.GetResult())
- );
- } else {
- actorSystem->Send(senderId, new TEvInternalService::TEvHealthCheckResponse(
- false, wrappedResult.GetStatus(), NYql::TIssues{{NYql::TIssue{"grpc private api result is not set for health check call"}}}, Yq::Private::NodesHealthCheckResult{})
- );
- }
+ actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvHealthCheckResponse(future.GetValue()), 0, cookie));
} catch (...) {
- actorSystem->Send(senderId, new TEvInternalService::TEvHealthCheckResponse(
- false, NYdb::EStatus::STATUS_UNDEFINED, NYql::TIssues{{NYql::TIssue{CurrentExceptionMessage()}}}, Yq::Private::NodesHealthCheckResult{})
- );
+ actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvHealthCheckResponse(CurrentExceptionMessage()), 0, cookie));
}
});
}
void Handle(TEvInternalService::TEvGetTaskRequest::TPtr& ev) {
EventLatency->Collect((TInstant::Now() - ev->Get()->SentAt).MilliSeconds());
- const auto actorSystem = NActors::TActivationContext::ActorSystem();
- const auto senderId = ev->Sender;
PrivateClient
.GetTask(std::move(ev->Get()->Request))
- .Subscribe([actorSystem, senderId](const NThreading::TFuture<TGetTaskResult>& future) {
+ .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), senderId = ev->Sender, selfId = SelfId(), cookie = ev->Cookie](const NThreading::TFuture<TGetTaskResult>& future) {
try {
- const auto& wrappedResult = future.GetValue();
- if (wrappedResult.IsResultSet()) {
- actorSystem->Send(senderId, new TEvInternalService::TEvGetTaskResponse(
- wrappedResult.IsSuccess(), wrappedResult.GetStatus(), wrappedResult.GetIssues(), wrappedResult.GetResult())
- );
- } else {
- actorSystem->Send(senderId, new TEvInternalService::TEvGetTaskResponse(
- false, wrappedResult.GetStatus(), NYql::TIssues{{NYql::TIssue{"grpc private api result is not set for get task call"}}}, Yq::Private::GetTaskResult{})
- );
- }
+ actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvGetTaskResponse(future.GetValue()), 0, cookie));
} catch (...) {
- actorSystem->Send(senderId, new TEvInternalService::TEvGetTaskResponse(
- false, NYdb::EStatus::STATUS_UNDEFINED, NYql::TIssues{{NYql::TIssue{CurrentExceptionMessage()}}}, Yq::Private::GetTaskResult{})
- );
+ actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvGetTaskResponse(CurrentExceptionMessage()), 0, cookie));
}
});
}
void Handle(TEvInternalService::TEvPingTaskRequest::TPtr& ev) {
EventLatency->Collect((TInstant::Now() - ev->Get()->SentAt).MilliSeconds());
- const auto actorSystem = NActors::TActivationContext::ActorSystem();
- const auto senderId = ev->Sender;
- const auto selfId = SelfId();
PrivateClient
.PingTask(std::move(ev->Get()->Request))
- .Subscribe([actorSystem, senderId, selfId, cookie=ev->Cookie](const NThreading::TFuture<TPingTaskResult>& future) {
+ .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), senderId = ev->Sender, selfId = SelfId(), cookie = ev->Cookie](const NThreading::TFuture<TPingTaskResult>& future) {
try {
- const auto& wrappedResult = future.GetValue();
- if (wrappedResult.IsResultSet()) {
- actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvPingTaskResponse(
- wrappedResult.IsSuccess(), wrappedResult.GetStatus(), wrappedResult.GetIssues(), wrappedResult.GetResult(), wrappedResult.IsTransportError())
- , 0, cookie));
- } else {
- actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvPingTaskResponse(
- false, wrappedResult.GetStatus(), NYql::TIssues{{NYql::TIssue{"grpc private api result is not set for ping task call"}}}, Yq::Private::PingTaskResult{})
- , 0, cookie));
- }
+ actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvPingTaskResponse(future.GetValue()), 0, cookie));
} catch (...) {
- actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvPingTaskResponse(
- false, NYdb::EStatus::STATUS_UNDEFINED, NYql::TIssues{{NYql::TIssue{CurrentExceptionMessage()}}}, Yq::Private::PingTaskResult{})
- , 0, cookie));
+ actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvPingTaskResponse(CurrentExceptionMessage()), 0, cookie));
}
});
}
void Handle(TEvInternalService::TEvWriteResultRequest::TPtr& ev) {
EventLatency->Collect((TInstant::Now() - ev->Get()->SentAt).MilliSeconds());
- const auto actorSystem = NActors::TActivationContext::ActorSystem();
- const auto senderId = ev->Sender;
PrivateClient
.WriteTaskResult(std::move(ev->Get()->Request))
- .Subscribe([actorSystem, senderId](const auto& future) {
+ .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), senderId = ev->Sender, selfId = SelfId(), cookie = ev->Cookie](const auto& future) {
try {
- const auto& wrappedResult = future.GetValue();
- if (wrappedResult.IsResultSet()) {
- actorSystem->Send(senderId, new TEvInternalService::TEvWriteResultResponse(
- wrappedResult.IsSuccess(), wrappedResult.GetStatus(), wrappedResult.GetIssues(), wrappedResult.GetResult())
- );
- } else {
- actorSystem->Send(senderId, new TEvInternalService::TEvWriteResultResponse(
- false, wrappedResult.GetStatus(), NYql::TIssues{{NYql::TIssue{"grpc private api result is not set for write result task call"}}}, Yq::Private::WriteTaskResultResult{})
- );
- }
+ actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvWriteResultResponse(future.GetValue()), 0, cookie));
} catch (...) {
- actorSystem->Send(senderId, new TEvInternalService::TEvWriteResultResponse(
- false, NYdb::EStatus::STATUS_UNDEFINED, NYql::TIssues{{NYql::TIssue{CurrentExceptionMessage()}}}, Yq::Private::WriteTaskResultResult{})
- );
+ actorSystem->Send(new NActors::IEventHandle(senderId, selfId, new TEvInternalService::TEvWriteResultResponse(CurrentExceptionMessage()), 0, cookie));
}
});
}
diff --git a/ydb/core/yq/libs/private_client/internal_service.h b/ydb/core/yq/libs/private_client/internal_service.h
index 207a1b7fd3..5bad866dfe 100644
--- a/ydb/core/yq/libs/private_client/internal_service.h
+++ b/ydb/core/yq/libs/private_client/internal_service.h
@@ -1,5 +1,7 @@
#pragma once
+#include "private_client.h"
+
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/event_local.h>
@@ -45,14 +47,17 @@ struct TEvInternalService {
template <class TProtoResult, ui32 TEventType>
struct TInternalServiceResponseEvent : public NActors::TEventLocal<TInternalServiceResponseEvent<TProtoResult, TEventType>, TEventType> {
- bool Success = false;
- NYdb::EStatus Status;
- const NYql::TIssues Issues;
+ NYdb::TStatus Status;
TProtoResult Result;
- bool TransportError = false;
- explicit TInternalServiceResponseEvent(bool success, NYdb::EStatus status, const NYql::TIssues& issues, const TProtoResult& result, bool transportError = false)
- : Success(success), Status(status), Issues(issues), Result(result), TransportError(transportError)
- { }
+ TInternalServiceResponseEvent(const TProtoResultInternalWrapper<TProtoResult>& wrappedResult) : Status(wrappedResult)
+ {
+ if (wrappedResult.IsResultSet()) {
+ Result = wrappedResult.GetResult();
+ }
+ }
+ TInternalServiceResponseEvent(const TString& errorMessage) : Status(NYdb::EStatus::INTERNAL_ERROR, {NYql::TIssue(errorMessage).SetCode(NYql::UNEXPECTED_ERROR, NYql::TSeverityIds::S_ERROR)})
+ {
+ }
};
using TEvHealthCheckResponse = TInternalServiceResponseEvent<Yq::Private::NodesHealthCheckResult, EvHealthCheckResponse>;