summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <[email protected]>2022-05-20 01:58:35 +0300
committerAleksandr Khoroshilov <[email protected]>2022-05-20 01:58:35 +0300
commitf2a4a71e6cfbd54a9659a5ed811ed932f3cdf7ec (patch)
tree9abdd7cede43441492999bdb9def912fdc909343
parent38be10651adf0fd663d500ec242aecc0e30c2201 (diff)
WriteResult CP message is proto based + LoopbackService support
ref:01171b43217e479f06b377baa05ca0a521510b87
-rw-r--r--ydb/core/yq/libs/actors/task_result_write.cpp19
-rw-r--r--ydb/core/yq/libs/control_plane_storage/events/events.h50
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp21
-rw-r--r--ydb/core/yq/libs/control_plane_storage/message_builders.h10
-rw-r--r--ydb/core/yq/libs/private_client/loopback_service.cpp20
5 files changed, 64 insertions, 56 deletions
diff --git a/ydb/core/yq/libs/actors/task_result_write.cpp b/ydb/core/yq/libs/actors/task_result_write.cpp
index 5919ddbaaba..dafa7f70d8f 100644
--- a/ydb/core/yq/libs/actors/task_result_write.cpp
+++ b/ydb/core/yq/libs/actors/task_result_write.cpp
@@ -73,19 +73,13 @@ public:
void Bootstrap(const TActorContext&) {
Become(&TWriteTaskRequestActor::StateFunc);
- const auto& req = Ev->Record;
-
- Deadline = NProtoInterop::CastFromProto(req.deadline());
-
- const auto& resultSet = req.result_set();
- ResultId = req.result_id().value();
- const auto& resultSetId = req.result_set_id();
- const auto& startRowIndex = req.offset();
- RequestId = req.request_id();
- LOG_D("Request CP::WriteTaskResult with size: " << req.ByteSize() << " bytes");
- RequestedMBytes->Collect(req.ByteSize() / 1024 / 1024);
+ auto request = Ev->Record;
+ ResultId = request.result_id().value();
+ RequestId = request.request_id();
+ LOG_D("Request CP::WriteTaskResult with size: " << request.ByteSize() << " bytes");
+ RequestedMBytes->Collect(request.ByteSize() / 1024 / 1024);
Send(NYq::ControlPlaneStorageServiceActorId(),
- new NYq::TEvControlPlaneStorage::TEvWriteResultDataRequest(ResultId, resultSetId, startRowIndex, Deadline, resultSet), 0, RequestId);
+ new NYq::TEvControlPlaneStorage::TEvWriteResultDataRequest(std::move(request)), 0, RequestId);
}
private:
@@ -122,7 +116,6 @@ private:
TString ResultId;
ui64 RequestId = 0;
- TInstant Deadline;
NYql::TIssues Issues;
diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h
index bee465482c4..4566e1239cf 100644
--- a/ydb/core/yq/libs/control_plane_storage/events/events.h
+++ b/ydb/core/yq/libs/control_plane_storage/events/events.h
@@ -342,54 +342,43 @@ struct TEvControlPlaneStorage {
// internal messages
struct TEvWriteResultDataRequest : NActors::TEventLocal<TEvWriteResultDataRequest, EvWriteResultDataRequest> {
- explicit TEvWriteResultDataRequest(const TString& resultId,
- const int32_t resultSetId,
- const int64_t startRowId,
- const TInstant& deadline,
- const Ydb::ResultSet& resultSet)
- : ResultId(resultId)
- , ResultSetId(resultSetId)
- , StartRowId(startRowId)
- , Deadline(deadline)
- , ResultSet(resultSet)
- {
- }
+
+ TEvWriteResultDataRequest() = default;
+
+ explicit TEvWriteResultDataRequest(
+ Yq::Private::WriteTaskResultRequest&& request)
+ : Request(std::move(request))
+ {}
size_t GetByteSize() const {
return sizeof(*this)
- + ResultId.Size()
- + ResultSet.ByteSizeLong();
+ + Request.ByteSizeLong();
}
- TString ResultId;
- int32_t ResultSetId = 0;
- int64_t StartRowId = 0;
- TInstant Deadline;
- Ydb::ResultSet ResultSet;
+ Yq::Private::WriteTaskResultRequest Request;
};
struct TEvWriteResultDataResponse : NActors::TEventLocal<TEvWriteResultDataResponse, EvWriteResultDataResponse> {
- explicit TEvWriteResultDataResponse(const NYql::TIssues& issues)
- : Issues(issues)
- {
- }
explicit TEvWriteResultDataResponse(
- const NYql::TIssues& issues,
- const ui64 requestId)
+ const Yq::Private::WriteTaskResultResult& record)
+ : Record(record)
+ {}
+
+ explicit TEvWriteResultDataResponse(
+ const NYql::TIssues& issues)
: Issues(issues)
- , RequestId(requestId)
- {
- }
+ {}
size_t GetByteSize() const {
return sizeof(*this)
+ + Record.ByteSizeLong()
+ GetIssuesByteSize(Issues)
+ GetDebugInfoByteSize(DebugInfo);
}
+ Yq::Private::WriteTaskResultResult Record;
NYql::TIssues Issues;
- const ui64 RequestId = 0;
TDebugInfoPtr DebugInfo;
};
@@ -602,6 +591,9 @@ struct TEvControlPlaneStorage {
};
struct TEvNodesHealthCheckRequest : NActors::TEventLocal<TEvNodesHealthCheckRequest, EvNodesHealthCheckRequest> {
+
+ TEvNodesHealthCheckRequest() = default;
+
explicit TEvNodesHealthCheckRequest(
Yq::Private::NodesHealthCheckRequest&& request)
: Request(std::move(request))
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp
index b9313524caa..4b0c7535c35 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp
@@ -8,13 +8,13 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvWriteResult
TRequestCountersPtr requestCounters = Counters.GetCommonCounters(RTC_WRITE_RESULT_DATA);
requestCounters->InFly->Inc();
- TEvControlPlaneStorage::TEvWriteResultDataRequest& request = *ev->Get();
- requestCounters->RequestBytes->Add(request.GetByteSize());
- const TString resultId = request.ResultId;
- const int32_t resultSetId = request.ResultSetId;
- const int64_t startRowId = request.StartRowId;
- const TInstant deadline = request.Deadline;
- const Ydb::ResultSet& resultSet = request.ResultSet;
+ requestCounters->RequestBytes->Add(ev->Get()->GetByteSize());
+ auto& request = ev->Get()->Request;
+ const TString resultId = request.result_id().value();
+ const int32_t resultSetId = request.result_set_id();
+ const int64_t startRowId = request.offset();
+ const TInstant deadline = NProtoInterop::CastFromProto(request.deadline());
+ const Ydb::ResultSet& resultSet = request.result_set();
const int byteSize = resultSet.ByteSize();
CPS_LOG_T("WriteResultDataRequest: " << resultId << " " << resultSetId << " " << startRowId << " " << resultSet.ByteSize() << " " << deadline);
@@ -28,6 +28,9 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvWriteResult
return;
}
+ std::shared_ptr<Yq::Private::WriteTaskResultResult> response = std::make_shared<Yq::Private::WriteTaskResultResult>();
+ response->set_request_id(request.request_id());
+
NYdb::TValueBuilder itemsAsList;
itemsAsList.BeginList();
@@ -63,8 +66,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvWriteResult
const auto query = queryBuilder.Build();
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
TAsyncStatus result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo);
- auto prepare = []() { return std::make_tuple<NYql::TIssues>(NYql::TIssues{}); };
- auto success = SendResponseTuple<TEvControlPlaneStorage::TEvWriteResultDataResponse, std::tuple<NYql::TIssues>>(
+ auto prepare = [response] { return *response; };
+ auto success = SendResponse<TEvControlPlaneStorage::TEvWriteResultDataResponse, Yq::Private::WriteTaskResultResult>(
"WriteResultDataRequest",
NActors::TActivationContext::ActorSystem(),
result,
diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h
index 51985c9795e..541677186e3 100644
--- a/ydb/core/yq/libs/control_plane_storage/message_builders.h
+++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h
@@ -7,6 +7,8 @@
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
+#include <library/cpp/protobuf/interop/cast.h>
+
namespace NYq {
// Queries
@@ -956,7 +958,13 @@ public:
std::unique_ptr<TEvControlPlaneStorage::TEvWriteResultDataRequest> Build()
{
- return std::make_unique<TEvControlPlaneStorage::TEvWriteResultDataRequest>(ResultId, ResultSetId, StartRowId, Deadline, ResultSet);
+ auto request = std::make_unique<TEvControlPlaneStorage::TEvWriteResultDataRequest>();
+ request->Request.mutable_result_id()->set_value(ResultId);
+ *request->Request.mutable_result_set() = ResultSet;
+ request->Request.set_result_set_id(ResultSetId);
+ request->Request.set_offset(StartRowId);
+ *request->Request.mutable_deadline() = NProtoInterop::CastToProto(Deadline);
+ return request;
}
};
diff --git a/ydb/core/yq/libs/private_client/loopback_service.cpp b/ydb/core/yq/libs/private_client/loopback_service.cpp
index 0187ff44ded..35849129854 100644
--- a/ydb/core/yq/libs/private_client/loopback_service.cpp
+++ b/ydb/core/yq/libs/private_client/loopback_service.cpp
@@ -43,6 +43,7 @@ private:
hFunc(TEvInternalService::TEvWriteResultRequest, Handle)
hFunc(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse, Handle)
+ hFunc(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse, Handle)
);
void Handle(TEvInternalService::TEvHealthCheckRequest::TPtr& ev) {
@@ -83,13 +84,24 @@ private:
*/
}
- void Handle(TEvInternalService::TEvWriteResultRequest::TPtr& /*ev*/) {
- /*
+ void Handle(TEvInternalService::TEvWriteResultRequest::TPtr& ev) {
Cookie++;
Senders[Cookie] = ev->Sender;
auto request = ev->Get()->Request;
- Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvWriteResultRequest(std::move(request)), 0, Cookie);
- */
+ Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvWriteResultDataRequest(std::move(request)), 0, Cookie);
+ }
+
+ void Handle(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse::TPtr& ev) {
+ auto it = Senders.find(ev->Cookie);
+ if (it != Senders.end()) {
+ if (ev->Get()->Issues.Size() == 0) {
+ Send(it->second, new TEvInternalService::TEvWriteResultResponse(ev->Get()->Record));
+ } else {
+ auto issues = ev->Get()->Issues;
+ Send(it->second, new TEvInternalService::TEvWriteResultResponse(NYdb::EStatus::INTERNAL_ERROR, std::move(issues)));
+ }
+ Senders.erase(it);
+ }
}
const NMonitoring::TDynamicCounterPtr ServiceCounters;