diff options
| author | Aleksandr Khoroshilov <[email protected]> | 2022-05-20 01:58:35 +0300 |
|---|---|---|
| committer | Aleksandr Khoroshilov <[email protected]> | 2022-05-20 01:58:35 +0300 |
| commit | f2a4a71e6cfbd54a9659a5ed811ed932f3cdf7ec (patch) | |
| tree | 9abdd7cede43441492999bdb9def912fdc909343 | |
| parent | 38be10651adf0fd663d500ec242aecc0e30c2201 (diff) | |
WriteResult CP message is proto based + LoopbackService support
ref:01171b43217e479f06b377baa05ca0a521510b87
5 files changed, 64 insertions, 56 deletions
diff --git a/ydb/core/yq/libs/actors/task_result_write.cpp b/ydb/core/yq/libs/actors/task_result_write.cpp index 5919ddbaaba..dafa7f70d8f 100644 --- a/ydb/core/yq/libs/actors/task_result_write.cpp +++ b/ydb/core/yq/libs/actors/task_result_write.cpp @@ -73,19 +73,13 @@ public: void Bootstrap(const TActorContext&) { Become(&TWriteTaskRequestActor::StateFunc); - const auto& req = Ev->Record; - - Deadline = NProtoInterop::CastFromProto(req.deadline()); - - const auto& resultSet = req.result_set(); - ResultId = req.result_id().value(); - const auto& resultSetId = req.result_set_id(); - const auto& startRowIndex = req.offset(); - RequestId = req.request_id(); - LOG_D("Request CP::WriteTaskResult with size: " << req.ByteSize() << " bytes"); - RequestedMBytes->Collect(req.ByteSize() / 1024 / 1024); + auto request = Ev->Record; + ResultId = request.result_id().value(); + RequestId = request.request_id(); + LOG_D("Request CP::WriteTaskResult with size: " << request.ByteSize() << " bytes"); + RequestedMBytes->Collect(request.ByteSize() / 1024 / 1024); Send(NYq::ControlPlaneStorageServiceActorId(), - new NYq::TEvControlPlaneStorage::TEvWriteResultDataRequest(ResultId, resultSetId, startRowIndex, Deadline, resultSet), 0, RequestId); + new NYq::TEvControlPlaneStorage::TEvWriteResultDataRequest(std::move(request)), 0, RequestId); } private: @@ -122,7 +116,6 @@ private: TString ResultId; ui64 RequestId = 0; - TInstant Deadline; NYql::TIssues Issues; diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h index bee465482c4..4566e1239cf 100644 --- a/ydb/core/yq/libs/control_plane_storage/events/events.h +++ b/ydb/core/yq/libs/control_plane_storage/events/events.h @@ -342,54 +342,43 @@ struct TEvControlPlaneStorage { // internal messages struct TEvWriteResultDataRequest : NActors::TEventLocal<TEvWriteResultDataRequest, EvWriteResultDataRequest> { - explicit TEvWriteResultDataRequest(const TString& resultId, - const int32_t resultSetId, - const int64_t startRowId, - const TInstant& deadline, - const Ydb::ResultSet& resultSet) - : ResultId(resultId) - , ResultSetId(resultSetId) - , StartRowId(startRowId) - , Deadline(deadline) - , ResultSet(resultSet) - { - } + + TEvWriteResultDataRequest() = default; + + explicit TEvWriteResultDataRequest( + Yq::Private::WriteTaskResultRequest&& request) + : Request(std::move(request)) + {} size_t GetByteSize() const { return sizeof(*this) - + ResultId.Size() - + ResultSet.ByteSizeLong(); + + Request.ByteSizeLong(); } - TString ResultId; - int32_t ResultSetId = 0; - int64_t StartRowId = 0; - TInstant Deadline; - Ydb::ResultSet ResultSet; + Yq::Private::WriteTaskResultRequest Request; }; struct TEvWriteResultDataResponse : NActors::TEventLocal<TEvWriteResultDataResponse, EvWriteResultDataResponse> { - explicit TEvWriteResultDataResponse(const NYql::TIssues& issues) - : Issues(issues) - { - } explicit TEvWriteResultDataResponse( - const NYql::TIssues& issues, - const ui64 requestId) + const Yq::Private::WriteTaskResultResult& record) + : Record(record) + {} + + explicit TEvWriteResultDataResponse( + const NYql::TIssues& issues) : Issues(issues) - , RequestId(requestId) - { - } + {} size_t GetByteSize() const { return sizeof(*this) + + Record.ByteSizeLong() + GetIssuesByteSize(Issues) + GetDebugInfoByteSize(DebugInfo); } + Yq::Private::WriteTaskResultResult Record; NYql::TIssues Issues; - const ui64 RequestId = 0; TDebugInfoPtr DebugInfo; }; @@ -602,6 +591,9 @@ struct TEvControlPlaneStorage { }; struct TEvNodesHealthCheckRequest : NActors::TEventLocal<TEvNodesHealthCheckRequest, EvNodesHealthCheckRequest> { + + TEvNodesHealthCheckRequest() = default; + explicit TEvNodesHealthCheckRequest( Yq::Private::NodesHealthCheckRequest&& request) : Request(std::move(request)) diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp index b9313524caa..4b0c7535c35 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp @@ -8,13 +8,13 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvWriteResult TRequestCountersPtr requestCounters = Counters.GetCommonCounters(RTC_WRITE_RESULT_DATA); requestCounters->InFly->Inc(); - TEvControlPlaneStorage::TEvWriteResultDataRequest& request = *ev->Get(); - requestCounters->RequestBytes->Add(request.GetByteSize()); - const TString resultId = request.ResultId; - const int32_t resultSetId = request.ResultSetId; - const int64_t startRowId = request.StartRowId; - const TInstant deadline = request.Deadline; - const Ydb::ResultSet& resultSet = request.ResultSet; + requestCounters->RequestBytes->Add(ev->Get()->GetByteSize()); + auto& request = ev->Get()->Request; + const TString resultId = request.result_id().value(); + const int32_t resultSetId = request.result_set_id(); + const int64_t startRowId = request.offset(); + const TInstant deadline = NProtoInterop::CastFromProto(request.deadline()); + const Ydb::ResultSet& resultSet = request.result_set(); const int byteSize = resultSet.ByteSize(); CPS_LOG_T("WriteResultDataRequest: " << resultId << " " << resultSetId << " " << startRowId << " " << resultSet.ByteSize() << " " << deadline); @@ -28,6 +28,9 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvWriteResult return; } + std::shared_ptr<Yq::Private::WriteTaskResultResult> response = std::make_shared<Yq::Private::WriteTaskResultResult>(); + response->set_request_id(request.request_id()); + NYdb::TValueBuilder itemsAsList; itemsAsList.BeginList(); @@ -63,8 +66,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvWriteResult const auto query = queryBuilder.Build(); auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; TAsyncStatus result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo); - auto prepare = []() { return std::make_tuple<NYql::TIssues>(NYql::TIssues{}); }; - auto success = SendResponseTuple<TEvControlPlaneStorage::TEvWriteResultDataResponse, std::tuple<NYql::TIssues>>( + auto prepare = [response] { return *response; }; + auto success = SendResponse<TEvControlPlaneStorage::TEvWriteResultDataResponse, Yq::Private::WriteTaskResultResult>( "WriteResultDataRequest", NActors::TActivationContext::ActorSystem(), result, diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h index 51985c9795e..541677186e3 100644 --- a/ydb/core/yq/libs/control_plane_storage/message_builders.h +++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h @@ -7,6 +7,8 @@ #include <ydb/core/yq/libs/control_plane_storage/events/events.h> +#include <library/cpp/protobuf/interop/cast.h> + namespace NYq { // Queries @@ -956,7 +958,13 @@ public: std::unique_ptr<TEvControlPlaneStorage::TEvWriteResultDataRequest> Build() { - return std::make_unique<TEvControlPlaneStorage::TEvWriteResultDataRequest>(ResultId, ResultSetId, StartRowId, Deadline, ResultSet); + auto request = std::make_unique<TEvControlPlaneStorage::TEvWriteResultDataRequest>(); + request->Request.mutable_result_id()->set_value(ResultId); + *request->Request.mutable_result_set() = ResultSet; + request->Request.set_result_set_id(ResultSetId); + request->Request.set_offset(StartRowId); + *request->Request.mutable_deadline() = NProtoInterop::CastToProto(Deadline); + return request; } }; diff --git a/ydb/core/yq/libs/private_client/loopback_service.cpp b/ydb/core/yq/libs/private_client/loopback_service.cpp index 0187ff44ded..35849129854 100644 --- a/ydb/core/yq/libs/private_client/loopback_service.cpp +++ b/ydb/core/yq/libs/private_client/loopback_service.cpp @@ -43,6 +43,7 @@ private: hFunc(TEvInternalService::TEvWriteResultRequest, Handle) hFunc(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse, Handle) + hFunc(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse, Handle) ); void Handle(TEvInternalService::TEvHealthCheckRequest::TPtr& ev) { @@ -83,13 +84,24 @@ private: */ } - void Handle(TEvInternalService::TEvWriteResultRequest::TPtr& /*ev*/) { - /* + void Handle(TEvInternalService::TEvWriteResultRequest::TPtr& ev) { Cookie++; Senders[Cookie] = ev->Sender; auto request = ev->Get()->Request; - Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvWriteResultRequest(std::move(request)), 0, Cookie); - */ + Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvWriteResultDataRequest(std::move(request)), 0, Cookie); + } + + void Handle(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse::TPtr& ev) { + auto it = Senders.find(ev->Cookie); + if (it != Senders.end()) { + if (ev->Get()->Issues.Size() == 0) { + Send(it->second, new TEvInternalService::TEvWriteResultResponse(ev->Get()->Record)); + } else { + auto issues = ev->Get()->Issues; + Send(it->second, new TEvInternalService::TEvWriteResultResponse(NYdb::EStatus::INTERNAL_ERROR, std::move(issues))); + } + Senders.erase(it); + } } const NMonitoring::TDynamicCounterPtr ServiceCounters; |
