aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2025-02-19 03:50:37 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-02-19 04:05:35 +0300
commita71ea8c4136c53816147a1478f0db6c28f6d7c89 (patch)
tree8b16fe508d3b903935393014d56881f64f8da419
parentafe5f174d1537a99ec81b46bd61861bc7f1a29ee (diff)
downloadydb-a71ea8c4136c53816147a1478f0db6c28f6d7c89.tar.gz
Intermediate changes
commit_hash:e2f7171fc4070059f585443adfecfb9c7ca93e17
-rw-r--r--yql/essentials/utils/runnable.h16
-rw-r--r--yql/essentials/utils/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/client/ya.make18
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp83
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h12
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h2
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make15
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp133
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.h33
-rw-r--r--yt/yql/providers/yt/fmr/proto/coordinator.proto8
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp214
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h249
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ut/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ya.make3
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp2
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h10
-rw-r--r--yt/yql/providers/yt/fmr/worker/interface/ya.make15
-rw-r--r--yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.cpp1
-rw-r--r--yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h19
-rw-r--r--yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp60
-rw-r--r--yt/yql/tools/ytrun/lib/ya.make1
-rw-r--r--yt/yql/tools/ytrun/lib/ytrun_lib.cpp51
-rw-r--r--yt/yql/tools/ytrun/lib/ytrun_lib.h2
24 files changed, 649 insertions, 301 deletions
diff --git a/yql/essentials/utils/runnable.h b/yql/essentials/utils/runnable.h
new file mode 100644
index 0000000000..8179aa7496
--- /dev/null
+++ b/yql/essentials/utils/runnable.h
@@ -0,0 +1,16 @@
+#include <util/generic/ptr.h>
+
+namespace NYql {
+
+class IRunnable: public TThrRefBase {
+public:
+ using TPtr = THolder<IRunnable>;
+
+ virtual ~IRunnable() = default;
+
+ virtual void Start() = 0;
+
+ virtual void Stop() = 0;
+};
+
+} // namespace NYql
diff --git a/yql/essentials/utils/ya.make b/yql/essentials/utils/ya.make
index 12634d6d64..25fdf1775d 100644
--- a/yql/essentials/utils/ya.make
+++ b/yql/essentials/utils/ya.make
@@ -29,6 +29,7 @@ SRCS(
resetable_setting.h
retry.cpp
retry.h
+ runnable.h
sort.cpp
sort.h
swap_bytes.cpp
diff --git a/yt/yql/providers/yt/fmr/coordinator/client/ya.make b/yt/yql/providers/yt/fmr/coordinator/client/ya.make
new file mode 100644
index 0000000000..589e7a8522
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/client/ya.make
@@ -0,0 +1,18 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_coordinator_client.cpp
+)
+
+PEERDIR(
+ library/cpp/http/simple
+ library/cpp/threading/future
+ yt/yql/providers/yt/fmr/coordinator/interface
+ yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers
+ yt/yql/providers/yt/fmr/proto
+ yql/essentials/utils
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp b/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp
new file mode 100644
index 0000000000..e128aece48
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp
@@ -0,0 +1,83 @@
+#include <library/cpp/http/simple/http_client.h>
+
+#include <yt/yql/providers/yt/fmr/proto/coordinator.pb.h>
+#include <yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.h>
+
+#include <yql/essentials/utils/yql_panic.h>
+
+#include "yql_yt_coordinator_client.h"
+
+namespace NYql::NFmr {
+
+namespace {
+
+class TFmrCoordinatorClient: public IFmrCoordinator {
+public:
+ TFmrCoordinatorClient(const TFmrCoordinatorClientSettings& settings): Host_(settings.Host), Port_(settings.Port)
+ {
+ Headers_["Content-Type"] = "application/x-protobuf";
+ }
+
+ NThreading::TFuture<TStartOperationResponse> StartOperation(const TStartOperationRequest& startOperationRequest) override {
+ NProto::TStartOperationRequest protoStartOperationRequest = StartOperationRequestToProto(startOperationRequest);
+ TString startOperationRequestUrl = "/operation";
+ auto httpClient = TKeepAliveHttpClient(Host_, Port_);
+ TStringStream outputStream;
+
+ httpClient.DoPost(startOperationRequestUrl, protoStartOperationRequest.SerializeAsString(), &outputStream, Headers_);
+ TString serializedResponse = outputStream.ReadAll();
+ NProto::TStartOperationResponse protoStartOperationResponse;
+ YQL_ENSURE(protoStartOperationResponse.ParseFromString(serializedResponse));
+ return NThreading::MakeFuture(StartOperationResponseFromProto(protoStartOperationResponse));
+ }
+
+ NThreading::TFuture<TGetOperationResponse> GetOperation(const TGetOperationRequest& getOperationRequest) override {
+ TString getOperationRequestUrl = "/operation/" + getOperationRequest.OperationId;
+ auto httpClient = TKeepAliveHttpClient(Host_, Port_);
+ TStringStream outputStream;
+
+ httpClient.DoGet(getOperationRequestUrl, &outputStream, Headers_);
+ TString serializedResponse = outputStream.ReadAll();
+ NProto::TGetOperationResponse protoGetOperationResponse;
+ YQL_ENSURE(protoGetOperationResponse.ParseFromString(serializedResponse));
+ return NThreading::MakeFuture(GetOperationResponseFromProto(protoGetOperationResponse));
+ }
+
+ NThreading::TFuture<TDeleteOperationResponse> DeleteOperation(const TDeleteOperationRequest& deleteOperationRequest) override {
+ TString deleteOperationRequestUrl = "/operation/" + deleteOperationRequest.OperationId;
+ auto httpClient = TKeepAliveHttpClient(Host_, Port_);
+ TStringStream outputStream;
+
+ httpClient.DoRequest("DELETE", deleteOperationRequestUrl, "", &outputStream, Headers_);
+ TString serializedResponse = outputStream.ReadAll();
+ NProto::TDeleteOperationResponse protoDeleteOperationResponse;
+ YQL_ENSURE(protoDeleteOperationResponse.ParseFromString(serializedResponse));
+ return NThreading::MakeFuture(DeleteOperationResponseFromProto(protoDeleteOperationResponse));
+ }
+
+ NThreading::TFuture<THeartbeatResponse> SendHeartbeatResponse(const THeartbeatRequest& heartbeatRequest) override {
+ NProto::THeartbeatRequest protoSendHeartbeatRequest = HeartbeatRequestToProto(heartbeatRequest);
+ TString sendHearbeatRequestUrl = "/worker_heartbeat";
+ auto httpClient = TKeepAliveHttpClient(Host_, Port_);
+ TStringStream outputStream;
+
+ httpClient.DoPost(sendHearbeatRequestUrl, protoSendHeartbeatRequest.SerializeAsString(), &outputStream, Headers_);
+ TString serializedResponse = outputStream.ReadAll();
+ NProto::THeartbeatResponse protoHeartbeatResponse;
+ YQL_ENSURE(protoHeartbeatResponse.ParseFromString(serializedResponse));
+ return NThreading::MakeFuture(HeartbeatResponseFromProto(protoHeartbeatResponse));
+ }
+
+private:
+ TString Host_;
+ ui16 Port_;
+ TSimpleHttpClient::THeaders Headers_;
+};
+
+} // namespace
+
+IFmrCoordinator::TPtr MakeFmrCoordinatorClient(const TFmrCoordinatorClientSettings& settings) {
+ return MakeIntrusive<TFmrCoordinatorClient>(settings);
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h b/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h
new file mode 100644
index 0000000000..8c9b039cf7
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h
@@ -0,0 +1,12 @@
+#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
+
+namespace NYql::NFmr {
+
+struct TFmrCoordinatorClientSettings {
+ ui16 Port;
+ TString Host = "localhost";
+};
+
+IFmrCoordinator::TPtr MakeFmrCoordinatorClient(const TFmrCoordinatorClientSettings& settings);
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make
index a661e2cb18..2f67f6d05e 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make
@@ -5,7 +5,6 @@ SRCS(
)
PEERDIR(
- library/cpp/yt/assert
yt/yql/providers/yt/fmr/coordinator/impl
yt/yql/providers/yt/fmr/job_factory/impl
yt/yql/providers/yt/fmr/worker/impl
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
index 5a8f6eee55..d8a526096a 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
@@ -1,3 +1,5 @@
+#pragma once
+
#include <library/cpp/random_provider/random_provider.h>
#include <util/system/mutex.h>
#include <util/system/guard.h>
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make
new file mode 100644
index 0000000000..62df195306
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_coordinator_proto_helpers.cpp
+)
+
+PEERDIR(
+ yt/yql/providers/yt/fmr/coordinator/interface
+ yt/yql/providers/yt/fmr/proto
+ yt/yql/providers/yt/fmr/request_options/proto_helpers
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
new file mode 100644
index 0000000000..85f7d5834a
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
@@ -0,0 +1,133 @@
+#include "yql_yt_coordinator_proto_helpers.h"
+
+namespace NYql::NFmr {
+
+NProto::THeartbeatRequest HeartbeatRequestToProto(const THeartbeatRequest& heartbeatRequest) {
+ NProto::THeartbeatRequest protoHeartbeatRequest;
+ protoHeartbeatRequest.SetWorkerId(heartbeatRequest.WorkerId);
+ protoHeartbeatRequest.SetVolatileId(heartbeatRequest.VolatileId);
+ std::vector<NProto::TTaskState> taskStates;
+ for (size_t i = 0; i < heartbeatRequest.TaskStates.size(); ++i) {
+ auto protoTaskState = TaskStateToProto(*heartbeatRequest.TaskStates[i]);
+ protoHeartbeatRequest.AddTaskStates();
+ protoHeartbeatRequest.MutableTaskStates(i)->Swap(&protoTaskState);
+ }
+ return protoHeartbeatRequest;
+}
+
+THeartbeatRequest HeartbeatRequestFromProto(const NProto::THeartbeatRequest protoHeartbeatRequest) {
+ THeartbeatRequest heartbeatRequest;
+ heartbeatRequest.WorkerId = protoHeartbeatRequest.GetWorkerId();
+ heartbeatRequest.VolatileId = protoHeartbeatRequest.GetVolatileId();
+ std::vector<TTaskState::TPtr> taskStates;
+ for (size_t i = 0; i < protoHeartbeatRequest.TaskStatesSize(); ++i) {
+ TTaskState curTaskState = TaskStateFromProto(protoHeartbeatRequest.GetTaskStates(i));
+ taskStates.emplace_back(TIntrusivePtr<TTaskState>(new TTaskState(curTaskState)));
+ }
+ heartbeatRequest.TaskStates = taskStates;
+ return heartbeatRequest;
+}
+
+NProto::THeartbeatResponse HeartbeatResponseToProto(const THeartbeatResponse& heartbeatResponse) {
+ NProto::THeartbeatResponse protoHeartbeatResponse;
+ for (size_t i = 0; i < heartbeatResponse.TasksToRun.size(); ++i) {
+ auto protoTask = TaskToProto(*heartbeatResponse.TasksToRun[i]);
+ auto * taskToRun = protoHeartbeatResponse.AddTasksToRun();
+ taskToRun->Swap(&protoTask);
+ }
+ for (auto& id: heartbeatResponse.TaskToDeleteIds) {
+ protoHeartbeatResponse.AddTaskToDeleteIds(id);
+ }
+ return protoHeartbeatResponse;
+}
+
+THeartbeatResponse HeartbeatResponseFromProto(const NProto::THeartbeatResponse& protoHeartbeatResponse) {
+ THeartbeatResponse heartbeatResponse;
+ std::vector<TTask::TPtr> tasksToRun;
+ std::unordered_set<TString> taskToDeleteIds;
+ for (size_t i = 0; i < protoHeartbeatResponse.TasksToRunSize(); ++i) {
+ TTask curTask = TaskFromProto(protoHeartbeatResponse.GetTasksToRun(i));
+ tasksToRun.emplace_back(TIntrusivePtr<TTask>(new TTask(curTask)));
+ }
+ for (size_t i = 0; i < protoHeartbeatResponse.TaskToDeleteIdsSize(); ++i) {
+ taskToDeleteIds.emplace(protoHeartbeatResponse.GetTaskToDeleteIds(i));
+ }
+
+ heartbeatResponse.TasksToRun = tasksToRun;
+ heartbeatResponse.TaskToDeleteIds = taskToDeleteIds;
+ return heartbeatResponse;
+}
+
+NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperationRequest& startOperationRequest) {
+ NProto::TStartOperationRequest protoStartOperationRequest;
+ protoStartOperationRequest.SetTaskType(static_cast<NProto::ETaskType>(startOperationRequest.TaskType));
+ auto protoTaskParams = TaskParamsToProto(startOperationRequest.TaskParams);
+ protoStartOperationRequest.MutableTaskParams()->Swap(&protoTaskParams);
+ protoStartOperationRequest.SetSessionId(startOperationRequest.SessionId);
+ if (startOperationRequest.IdempotencyKey) {
+ protoStartOperationRequest.SetIdempotencyKey(*startOperationRequest.IdempotencyKey);
+ }
+ protoStartOperationRequest.SetNumRetries(startOperationRequest.NumRetries);
+ return protoStartOperationRequest;
+}
+
+TStartOperationRequest StartOperationRequestFromProto(const NProto::TStartOperationRequest& protoStartOperationRequest) {
+ TStartOperationRequest startOperationRequest;
+ startOperationRequest.TaskType = static_cast<ETaskType>(protoStartOperationRequest.GetTaskType());
+ startOperationRequest.TaskParams = TaskParamsFromProto(protoStartOperationRequest.GetTaskParams());
+ startOperationRequest.SessionId = protoStartOperationRequest.GetSessionId();
+ if (protoStartOperationRequest.HasIdempotencyKey()) {
+ startOperationRequest.IdempotencyKey = protoStartOperationRequest.GetIdempotencyKey();
+ }
+ startOperationRequest.NumRetries = protoStartOperationRequest.GetNumRetries();
+ return startOperationRequest;
+}
+
+NProto::TStartOperationResponse StartOperationResponseToProto(const TStartOperationResponse& startOperationResponse) {
+ NProto::TStartOperationResponse protoStartOperationResponse;
+ protoStartOperationResponse.SetOperationId(startOperationResponse.OperationId);
+ protoStartOperationResponse.SetStatus(static_cast<NProto::EOperationStatus>(startOperationResponse.Status));
+ return protoStartOperationResponse;
+}
+
+TStartOperationResponse StartOperationResponseFromProto(const NProto::TStartOperationResponse& protoStartOperationResponse) {
+ return TStartOperationResponse{
+ .Status = static_cast<EOperationStatus>(protoStartOperationResponse.GetStatus()),
+ .OperationId = protoStartOperationResponse.GetOperationId()
+ };
+}
+
+NProto::TGetOperationResponse GetOperationResponseToProto(const TGetOperationResponse& getOperationResponse) {
+ NProto::TGetOperationResponse protoGetOperationResponse;
+ protoGetOperationResponse.SetStatus(static_cast<NProto::EOperationStatus>(getOperationResponse.Status));
+ for (auto& errorMessage: getOperationResponse.ErrorMessages) {
+ auto* curError = protoGetOperationResponse.AddErrorMessages();
+ auto protoError = FmrErrorToProto(errorMessage);
+ curError->Swap(&protoError);
+ }
+ return protoGetOperationResponse;
+}
+
+TGetOperationResponse GetOperationResponseFromProto(const NProto::TGetOperationResponse protoGetOperationReponse) {
+ TGetOperationResponse getOperationResponse;
+ getOperationResponse.Status = static_cast<EOperationStatus>(protoGetOperationReponse.GetStatus());
+ std::vector<TFmrError> errorMessages;
+ for (size_t i = 0; i < protoGetOperationReponse.ErrorMessagesSize(); ++i) {
+ TFmrError errorMessage = FmrErrorFromProto(protoGetOperationReponse.GetErrorMessages(i));
+ errorMessages.emplace_back(errorMessage);
+ }
+ getOperationResponse.ErrorMessages = errorMessages;
+ return getOperationResponse;
+}
+
+NProto::TDeleteOperationResponse DeleteOperationResponseToProto(const TDeleteOperationResponse& deleteOperationResponse) {
+ NProto::TDeleteOperationResponse protoDeleteOperationResponse;
+ protoDeleteOperationResponse.SetStatus(static_cast<NProto::EOperationStatus>(deleteOperationResponse.Status));
+ return protoDeleteOperationResponse;
+}
+
+TDeleteOperationResponse DeleteOperationResponseFromProto(const NProto::TDeleteOperationResponse& protoDeleteOperationResponse) {
+ return TDeleteOperationResponse{.Status = static_cast<EOperationStatus>(protoDeleteOperationResponse.GetStatus())};
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.h b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.h
new file mode 100644
index 0000000000..d09034b078
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
+#include <yt/yql/providers/yt/fmr/proto/coordinator.pb.h>
+#include <yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h>
+
+namespace NYql::NFmr {
+
+NProto::THeartbeatRequest HeartbeatRequestToProto(const THeartbeatRequest& heartbeatRequest);
+
+THeartbeatRequest HeartbeatRequestFromProto(const NProto::THeartbeatRequest protoHeartbeatRequest);
+
+NProto::THeartbeatResponse HeartbeatResponseToProto(const THeartbeatResponse& heartbeatResponse);
+
+THeartbeatResponse HeartbeatResponseFromProto(const NProto::THeartbeatResponse& protoHeartbeatResponse);
+
+NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperationRequest& startOperationRequest);
+
+TStartOperationRequest StartOperationRequestFromProto(const NProto::TStartOperationRequest& protoStartOperationRequest);
+
+NProto::TStartOperationResponse StartOperationResponseToProto(const TStartOperationResponse& startOperationResponse);
+
+TStartOperationResponse StartOperationResponseFromProto(const NProto::TStartOperationResponse& protoStartOperationResponse);
+
+NProto::TGetOperationResponse GetOperationResponseToProto(const TGetOperationResponse& getOperationResponse);
+
+TGetOperationResponse GetOperationResponseFromProto(const NProto::TGetOperationResponse protoGetOperationReponse);
+
+NProto::TDeleteOperationResponse DeleteOperationResponseToProto(const TDeleteOperationResponse& deleteOperationResponse);
+
+TDeleteOperationResponse DeleteOperationResponseFromProto(const NProto::TDeleteOperationResponse& protoDeleteOperationResponse);
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/proto/coordinator.proto b/yt/yql/providers/yt/fmr/proto/coordinator.proto
index 930b4b3bd0..33990caa28 100644
--- a/yt/yql/providers/yt/fmr/proto/coordinator.proto
+++ b/yt/yql/providers/yt/fmr/proto/coordinator.proto
@@ -29,19 +29,11 @@ message TStartOperationResponse {
string OperationId = 2;
}
-message TGetOperationRequest {
- string OperationId = 1;
-}
-
message TGetOperationResponse {
EOperationStatus Status = 1;
repeated TFmrError ErrorMessages = 2;
}
-message TDeleteOperationRequest {
- string OperationId = 1;
-}
-
message TDeleteOperationResponse {
EOperationStatus Status = 1;
}
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
index 48dcad24c7..21674b5fc4 100644
--- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
@@ -1 +1,215 @@
#include "yql_yt_request_proto_helpers.h"
+
+namespace NYql::NFmr {
+
+NProto::TFmrError FmrErrorToProto(const TFmrError& error) {
+ NProto::TFmrError protoError;
+ protoError.SetComponent(static_cast<NProto::EFmrComponent>(error.Component));
+ protoError.SetErrorMessage(error.ErrorMessage);
+ if (error.WorkerId) {
+ protoError.SetWorkerId(*error.WorkerId);
+ }
+ if (error.TaskId) {
+ protoError.SetTaskId(*error.TaskId);
+ }
+ if (error.OperationId) {
+ protoError.SetOperationId(*error.OperationId);
+ }
+ return protoError;
+}
+
+TFmrError FmrErrorFromProto(const NProto::TFmrError& protoError) {
+ TFmrError fmrError;
+ fmrError.Component = static_cast<EFmrComponent>(protoError.GetComponent());
+ fmrError.ErrorMessage = protoError.GetErrorMessage();
+ if (protoError.HasWorkerId()) {
+ fmrError.WorkerId = protoError.GetWorkerId();
+ }
+ if (protoError.HasTaskId()) {
+ fmrError.TaskId = protoError.GetTaskId();
+ }
+ if (protoError.HasOperationId()) {
+ fmrError.OperationId = protoError.GetOperationId();
+ }
+ return fmrError;
+}
+
+NProto::TYtTableRef YtTableRefToProto(const TYtTableRef& ytTableRef) {
+ NProto::TYtTableRef protoYtTableRef;
+ protoYtTableRef.SetPath(ytTableRef.Path);
+ protoYtTableRef.SetCluster(ytTableRef.Cluster);
+ protoYtTableRef.SetTransactionId(ytTableRef.TransactionId);
+ return protoYtTableRef;
+}
+
+TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef) {
+ TYtTableRef ytTableRef;
+ ytTableRef.Path = protoYtTableRef.GetPath();
+ ytTableRef.Cluster = protoYtTableRef.GetCluster();
+ ytTableRef.TransactionId = protoYtTableRef.GetTransactionId();
+ return ytTableRef;
+}
+
+NProto::TFmrTableRef FmrTableRefToProto(const TFmrTableRef& fmrTableRef) {
+ NProto::TFmrTableRef protoFmrTableRef;
+ protoFmrTableRef.SetTableId(fmrTableRef.TableId);
+ return protoFmrTableRef;
+}
+
+TFmrTableRef FmrTableRefFromProto(const NProto::TFmrTableRef protoFmrTableRef) {
+ TFmrTableRef fmrTableRef;
+ fmrTableRef.TableId = protoFmrTableRef.GetTableId();
+ return fmrTableRef;
+}
+
+NProto::TTableRef TableRefToProto(const TTableRef& tableRef) {
+ NProto::TTableRef protoTableRef;
+ if (auto* ytTableRefPtr = std::get_if<TYtTableRef>(&tableRef.TableRef)) {
+ NProto::TYtTableRef protoYtTableRef = YtTableRefToProto(*ytTableRefPtr);
+ protoTableRef.MutableYtTableRef()->Swap(&protoYtTableRef);
+ } else {
+ auto* fmrTableRefPtr = std::get_if<TFmrTableRef>(&tableRef.TableRef);
+ NProto::TFmrTableRef protoFmrTableRef = FmrTableRefToProto(*fmrTableRefPtr);
+ protoTableRef.MutableFmrTableRef()->Swap(&protoFmrTableRef);
+ }
+ return protoTableRef;
+}
+
+TTableRef TableRefFromProto(const NProto::TTableRef& protoTableRef) {
+ std::variant<TYtTableRef, TFmrTableRef> tableRef;
+ if (protoTableRef.HasYtTableRef()) {
+ tableRef = YtTableRefFromProto(protoTableRef.GetYtTableRef());
+ } else {
+ tableRef = FmrTableRefFromProto(protoTableRef.GetFmrTableRef());
+ }
+ return {tableRef};
+}
+
+NProto::TUploadTaskParams UploadTaskParamsToProto(const TUploadTaskParams& uploadTaskParams) {
+ NProto::TUploadTaskParams protoUploadTaskParams;
+ auto input = FmrTableRefToProto(uploadTaskParams.Input);
+ auto output = YtTableRefToProto(uploadTaskParams.Output);
+ protoUploadTaskParams.MutableInput()->Swap(&input);
+ protoUploadTaskParams.MutableOutput()->Swap(&output);
+ return protoUploadTaskParams;
+}
+
+TUploadTaskParams UploadTaskParamsFromProto(const NProto::TUploadTaskParams& protoUploadTaskParams) {
+ TUploadTaskParams uploadTaskParams;
+ uploadTaskParams.Input = FmrTableRefFromProto(protoUploadTaskParams.GetInput());
+ uploadTaskParams.Output = YtTableRefFromProto(protoUploadTaskParams.GetOutput());
+ return uploadTaskParams;
+}
+
+NProto::TDownloadTaskParams DownloadTaskParamsToProto(const TDownloadTaskParams& downloadTaskParams) {
+ NProto::TDownloadTaskParams protoDownloadTaskParams;
+ auto input = YtTableRefToProto(downloadTaskParams.Input);
+ auto output = FmrTableRefToProto(downloadTaskParams.Output);
+ protoDownloadTaskParams.MutableInput()->Swap(&input);
+ protoDownloadTaskParams.MutableOutput()->Swap(&output);
+ return protoDownloadTaskParams;
+}
+
+TDownloadTaskParams DownloadTaskParamsFromProto(const NProto::TDownloadTaskParams& protoDownloadTaskParams) {
+ TDownloadTaskParams downloadTaskParams;
+ downloadTaskParams.Input = YtTableRefFromProto(protoDownloadTaskParams.GetInput());
+ downloadTaskParams.Output = FmrTableRefFromProto(protoDownloadTaskParams.GetOutput());
+ return downloadTaskParams;
+}
+
+NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTaskParams) {
+ NProto::TMergeTaskParams protoMergeTaskParams;
+ std::vector<NProto::TTableRef> inputTables;
+ for (size_t i = 0; i < mergeTaskParams.Input.size(); ++i) {
+ auto inputTable = TableRefToProto(mergeTaskParams.Input[i]);
+ auto* curInput = protoMergeTaskParams.AddInput();
+ curInput->Swap(&inputTable);
+ }
+ auto outputTable = FmrTableRefToProto(mergeTaskParams.Output);
+ protoMergeTaskParams.MutableOutput()->Swap(&outputTable);
+ return protoMergeTaskParams;
+}
+
+TMergeTaskParams MergeTaskParamsFromProto(const NProto::TMergeTaskParams& protoMergeTaskParams) {
+ TMergeTaskParams mergeTaskParams;
+ std::vector<TTableRef> input;
+ for (size_t i = 0; i < protoMergeTaskParams.InputSize(); ++i) {
+ TTableRef inputTable = TableRefFromProto(protoMergeTaskParams.GetInput(i));
+ input.emplace_back(inputTable);
+ }
+ mergeTaskParams.Input = input;
+ mergeTaskParams.Output = FmrTableRefFromProto(protoMergeTaskParams.GetOutput());
+ return mergeTaskParams;
+}
+
+NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams) {
+ NProto::TTaskParams protoTaskParams;
+ if (auto* uploadTaskParamsPtr = std::get_if<TUploadTaskParams>(&taskParams)) {
+ NProto::TUploadTaskParams protoUploadTaskParams = UploadTaskParamsToProto(*uploadTaskParamsPtr);
+ protoTaskParams.MutableUploadTaskParams()->Swap(&protoUploadTaskParams);
+ } else if (auto* downloadTaskParamsPtr = std::get_if<TDownloadTaskParams>(&taskParams)) {
+ NProto::TDownloadTaskParams protoDownloadTaskParams = DownloadTaskParamsToProto(*downloadTaskParamsPtr);
+ protoTaskParams.MutableDownloadTaskParams()->Swap(&protoDownloadTaskParams);
+ } else {
+ auto* mergeTaskParamsPtr = std::get_if<TMergeTaskParams>(&taskParams);
+ NProto::TMergeTaskParams protoMergeTaskParams = MergeTaskParamsToProto(*mergeTaskParamsPtr);
+ protoTaskParams.MutableMergeTaskParams()->Swap(&protoMergeTaskParams);
+ }
+ return protoTaskParams;
+}
+
+TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) {
+ TTaskParams taskParams;
+ if (protoTaskParams.HasDownloadTaskParams()) {
+ taskParams = DownloadTaskParamsFromProto(protoTaskParams.GetDownloadTaskParams());
+ } else if (protoTaskParams.HasUploadTaskParams()) {
+ taskParams = UploadTaskParamsFromProto(protoTaskParams.GetUploadTaskParams());
+ } else {
+ taskParams = MergeTaskParamsFromProto(protoTaskParams.GetMergeTaskParams());
+ }
+ return taskParams;
+}
+
+NProto::TTask TaskToProto(const TTask& task) {
+ NProto::TTask protoTask;
+ protoTask.SetTaskType(static_cast<NProto::ETaskType>(task.TaskType));
+ protoTask.SetTaskId(task.TaskId);
+ auto taskParams = TaskParamsToProto(task.TaskParams);
+ protoTask.MutableTaskParams()->Swap(&taskParams);
+ protoTask.SetSessionId(task.SessionId);
+ protoTask.SetNumRetries(task.NumRetries);
+ return protoTask;
+}
+
+TTask TaskFromProto(const NProto::TTask& protoTask) {
+ TTask task;
+ task.TaskType = static_cast<ETaskType>(protoTask.GetTaskType());
+ task.TaskId = protoTask.GetTaskId();
+ task.TaskParams = TaskParamsFromProto(protoTask.GetTaskParams());
+ task.SessionId = protoTask.GetSessionId();
+ task.NumRetries = protoTask.GetNumRetries();
+ return task;
+}
+
+NProto::TTaskState TaskStateToProto(const TTaskState& taskState) {
+ NProto::TTaskState protoTaskState;
+ protoTaskState.SetTaskStatus(static_cast<NProto::ETaskStatus>(taskState.TaskStatus));
+ protoTaskState.SetTaskId(taskState.TaskId);
+ if (taskState.TaskErrorMessage) {
+ auto fmrError = FmrErrorToProto(*taskState.TaskErrorMessage);
+ protoTaskState.MutableTaskErrorMessage()->Swap(&fmrError);
+ }
+ return protoTaskState;
+}
+
+TTaskState TaskStateFromProto(const NProto::TTaskState& protoTaskState) {
+ TTaskState taskState;
+ taskState.TaskStatus = static_cast<ETaskStatus>(protoTaskState.GetTaskStatus());
+ taskState.TaskId = protoTaskState.GetTaskId();
+ if (protoTaskState.HasTaskErrorMessage()) {
+ taskState.TaskErrorMessage = FmrErrorFromProto(protoTaskState.GetTaskErrorMessage());
+ }
+ return taskState;
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h
index abfa1b901c..e886e91901 100644
--- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h
@@ -1,219 +1,48 @@
#pragma once
-#include <variant>
#include <yt/yql/providers/yt/fmr/proto/request_options.pb.h>
#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
namespace NYql::NFmr {
-NProto::TFmrError FmrErrorToProto(const TFmrError& error) {
- NProto::TFmrError protoError;
- protoError.SetComponent(static_cast<NProto::EFmrComponent>(error.Component));
- protoError.SetErrorMessage(error.ErrorMessage);
- if (error.WorkerId) {
- protoError.SetWorkerId(*error.WorkerId);
- }
- if (error.TaskId) {
- protoError.SetTaskId(*error.TaskId);
- }
- if (error.OperationId) {
- protoError.SetOperationId(*error.OperationId);
- }
- return protoError;
-}
-
-TFmrError FmrErrorFromProto(const NProto::TFmrError& protoError) {
- TFmrError fmrError;
- fmrError.Component = static_cast<EFmrComponent>(protoError.GetComponent());
- fmrError.ErrorMessage = protoError.GetErrorMessage();
- if (protoError.HasWorkerId()) {
- fmrError.WorkerId = protoError.GetWorkerId();
- }
- if (protoError.HasTaskId()) {
- fmrError.TaskId = protoError.GetTaskId();
- }
- if (protoError.HasOperationId()) {
- fmrError.OperationId = protoError.GetOperationId();
- }
- return fmrError;
-}
-
-NProto::TYtTableRef YtTableRefToProto(const TYtTableRef& ytTableRef) {
- NProto::TYtTableRef protoYtTableRef;
- protoYtTableRef.SetPath(ytTableRef.Path);
- protoYtTableRef.SetCluster(ytTableRef.Cluster);
- protoYtTableRef.SetTransactionId(ytTableRef.TransactionId);
- return protoYtTableRef;
-}
-
-TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef) {
- TYtTableRef ytTableRef;
- ytTableRef.Path = protoYtTableRef.GetPath();
- ytTableRef.Cluster = protoYtTableRef.GetCluster();
- ytTableRef.TransactionId = protoYtTableRef.GetTransactionId();
- return ytTableRef;
-}
-
-NProto::TFmrTableRef FmrTableRefToProto(const TFmrTableRef& fmrTableRef) {
- NProto::TFmrTableRef protoFmrTableRef;
- protoFmrTableRef.SetTableId(fmrTableRef.TableId);
- return protoFmrTableRef;
-}
-
-TFmrTableRef FmrTableRefFromProto(const NProto::TFmrTableRef protoFmrTableRef) {
- TFmrTableRef fmrTableRef;
- fmrTableRef.TableId = protoFmrTableRef.GetTableId();
- return fmrTableRef;
-}
-
-NProto::TTableRef TableRefToProto(const TTableRef& tableRef) {
- NProto::TTableRef protoTableRef;
- if (auto* ytTableRefPtr = std::get_if<TYtTableRef>(&tableRef.TableRef)) {
- NProto::TYtTableRef protoYtTableRef = YtTableRefToProto(*ytTableRefPtr);
- protoTableRef.MutableYtTableRef()->Swap(&protoYtTableRef);
- } else {
- auto* fmrTableRefPtr = std::get_if<TFmrTableRef>(&tableRef.TableRef);
- NProto::TFmrTableRef protoFmrTableRef = FmrTableRefToProto(*fmrTableRefPtr);
- protoTableRef.MutableFmrTableRef()->Swap(&protoFmrTableRef);
- }
- return protoTableRef;
-}
-
-TTableRef TableRefFromProto(const NProto::TTableRef& protoTableRef) {
- std::variant<TYtTableRef, TFmrTableRef> tableRef;
- if (protoTableRef.HasYtTableRef()) {
- tableRef = YtTableRefFromProto(protoTableRef.GetYtTableRef());
- } else {
- tableRef = FmrTableRefFromProto(protoTableRef.GetFmrTableRef());
- }
- return {tableRef};
-}
-
-NProto::TUploadTaskParams UploadTaskParamsToProto(const TUploadTaskParams& uploadTaskParams) {
- NProto::TUploadTaskParams protoUploadTaskParams;
- auto input = FmrTableRefToProto(uploadTaskParams.Input);
- auto output = YtTableRefToProto(uploadTaskParams.Output);
- protoUploadTaskParams.MutableInput()->Swap(&input);
- protoUploadTaskParams.MutableOutput()->Swap(&output);
- return protoUploadTaskParams;
-}
-
-TUploadTaskParams UploadTaskParamsFromProto(const NProto::TUploadTaskParams& protoUploadTaskParams) {
- TUploadTaskParams uploadTaskParams;
- uploadTaskParams.Input = FmrTableRefFromProto(protoUploadTaskParams.GetInput());
- uploadTaskParams.Output = YtTableRefFromProto(protoUploadTaskParams.GetOutput());
- return uploadTaskParams;
-}
-
-NProto::TDownloadTaskParams DownloadTaskParamsToProto(const TDownloadTaskParams& downloadTaskParams) {
- NProto::TDownloadTaskParams protoDownloadTaskParams;
- auto input = YtTableRefToProto(downloadTaskParams.Input);
- auto output = FmrTableRefToProto(downloadTaskParams.Output);
- protoDownloadTaskParams.MutableInput()->Swap(&input);
- protoDownloadTaskParams.MutableOutput()->Swap(&output);
- return protoDownloadTaskParams;
-}
-
-TDownloadTaskParams DownloadTaskParamsFromProto(const NProto::TDownloadTaskParams& protoDownloadTaskParams) {
- TDownloadTaskParams downloadTaskParams;
- downloadTaskParams.Input = YtTableRefFromProto(protoDownloadTaskParams.GetInput());
- downloadTaskParams.Output = FmrTableRefFromProto(protoDownloadTaskParams.GetOutput());
- return downloadTaskParams;
-}
-
-NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTaskParams) {
- NProto::TMergeTaskParams protoMergeTaskParams;
- std::vector<NProto::TTableRef> inputTables;
- for (size_t i = 0; i < mergeTaskParams.Input.size(); ++i) {
- auto inputTable = TableRefToProto(mergeTaskParams.Input[i]);
- auto* curInput = protoMergeTaskParams.AddInput();
- curInput->Swap(&inputTable);
- }
- auto outputTable = FmrTableRefToProto(mergeTaskParams.Output);
- protoMergeTaskParams.MutableOutput()->Swap(&outputTable);
- return protoMergeTaskParams;
-}
-
-TMergeTaskParams MergeTaskParamsFromProto(const NProto::TMergeTaskParams& protoMergeTaskParams) {
- TMergeTaskParams mergeTaskParams;
- std::vector<TTableRef> input;
- for (size_t i = 0; i < protoMergeTaskParams.InputSize(); ++i) {
- TTableRef inputTable = TableRefFromProto(protoMergeTaskParams.GetInput(i));
- input.emplace_back(inputTable);
- }
- mergeTaskParams.Input = input;
- mergeTaskParams.Output = FmrTableRefFromProto(protoMergeTaskParams.GetOutput());
- return mergeTaskParams;
-}
-
-NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams) {
- NProto::TTaskParams protoTaskParams;
- if (auto* uploadTaskParamsPtr = std::get_if<TUploadTaskParams>(&taskParams)) {
- NProto::TUploadTaskParams protoUploadTaskParams = UploadTaskParamsToProto(*uploadTaskParamsPtr);
- protoTaskParams.MutableUploadTaskParams()->Swap(&protoUploadTaskParams);
- } else if (auto* downloadTaskParamsPtr = std::get_if<TDownloadTaskParams>(&taskParams)) {
- NProto::TDownloadTaskParams protoDownloadTaskParams = DownloadTaskParamsToProto(*downloadTaskParamsPtr);
- protoTaskParams.MutableDownloadTaskParams()->Swap(&protoDownloadTaskParams);
- } else {
- auto* mergeTaskParamsPtr = std::get_if<TMergeTaskParams>(&taskParams);
- NProto::TMergeTaskParams protoMergeTaskParams = MergeTaskParamsToProto(*mergeTaskParamsPtr);
- protoTaskParams.MutableMergeTaskParams()->Swap(&protoMergeTaskParams);
- }
- return protoTaskParams;
-}
-
-TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) {
- TTaskParams taskParams;
- if (protoTaskParams.HasDownloadTaskParams()) {
- taskParams = DownloadTaskParamsFromProto(protoTaskParams.GetDownloadTaskParams());
- } else if (protoTaskParams.HasUploadTaskParams()) {
- taskParams = UploadTaskParamsFromProto(protoTaskParams.GetUploadTaskParams());
- } else {
- taskParams = MergeTaskParamsFromProto(protoTaskParams.GetMergeTaskParams());
- }
- return taskParams;
-}
-
-NProto::TTask TaskToProto(const TTask& task) {
- NProto::TTask protoTask;
- protoTask.SetTaskType(static_cast<NProto::ETaskType>(task.TaskType));
- protoTask.SetTaskId(task.TaskId);
- auto taskParams = TaskParamsToProto(task.TaskParams);
- protoTask.MutableTaskParams()->Swap(&taskParams);
- protoTask.SetSessionId(task.SessionId);
- protoTask.SetNumRetries(task.NumRetries);
- return protoTask;
-}
-
-TTask TaskFromProto(const NProto::TTask& protoTask) {
- TTask task;
- task.TaskType = static_cast<ETaskType>(protoTask.GetTaskType());
- task.TaskId = protoTask.GetTaskId();
- task.TaskParams = TaskParamsFromProto(protoTask.GetTaskParams());
- task.SessionId = protoTask.GetSessionId();
- task.NumRetries = protoTask.GetNumRetries();
- return task;
-}
-
-NProto::TTaskState TaskStateToProto(const TTaskState& taskState) {
- NProto::TTaskState protoTaskState;
- protoTaskState.SetTaskStatus(static_cast<NProto::ETaskStatus>(taskState.TaskStatus));
- protoTaskState.SetTaskId(taskState.TaskId);
- if (taskState.TaskErrorMessage) {
- auto fmrError = FmrErrorToProto(*taskState.TaskErrorMessage);
- protoTaskState.MutableTaskErrorMessage()->Swap(&fmrError);
- }
- return protoTaskState;
-}
-
-TTaskState TaskStateFromProto(const NProto::TTaskState& protoTaskState) {
- TTaskState taskState;
- taskState.TaskStatus = static_cast<ETaskStatus>(protoTaskState.GetTaskStatus());
- taskState.TaskId = protoTaskState.GetTaskId();
- if (protoTaskState.HasTaskErrorMessage()) {
- taskState.TaskErrorMessage = FmrErrorFromProto(protoTaskState.GetTaskErrorMessage());
- }
- return taskState;
-}
+NProto::TFmrError FmrErrorToProto(const TFmrError& error);
+
+TFmrError FmrErrorFromProto(const NProto::TFmrError& protoError);
+
+NProto::TYtTableRef YtTableRefToProto(const TYtTableRef& ytTableRef);
+
+TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef);
+
+NProto::TFmrTableRef FmrTableRefToProto(const TFmrTableRef& fmrTableRef);
+
+TFmrTableRef FmrTableRefFromProto(const NProto::TFmrTableRef protoFmrTableRef);
+
+NProto::TTableRef TableRefToProto(const TTableRef& tableRef);
+
+TTableRef TableRefFromProto(const NProto::TTableRef& protoTableRef);
+
+NProto::TUploadTaskParams UploadTaskParamsToProto(const TUploadTaskParams& uploadTaskParams);
+
+TUploadTaskParams UploadTaskParamsFromProto(const NProto::TUploadTaskParams& protoUploadTaskParams);
+
+NProto::TDownloadTaskParams DownloadTaskParamsToProto(const TDownloadTaskParams& downloadTaskParams);
+
+TDownloadTaskParams DownloadTaskParamsFromProto(const NProto::TDownloadTaskParams& protoDownloadTaskParams);
+
+NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTaskParams);
+
+TMergeTaskParams MergeTaskParamsFromProto(const NProto::TMergeTaskParams& protoMergeTaskParams);
+
+NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams);
+
+TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams);
+
+NProto::TTask TaskToProto(const TTask& task);
+
+TTask TaskFromProto(const NProto::TTask& protoTask);
+
+NProto::TTaskState TaskStateToProto(const TTaskState& taskState);
+
+TTaskState TaskStateFromProto(const NProto::TTaskState& protoTaskState);
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make
index 3e12787010..790f845c3e 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make
@@ -5,7 +5,6 @@ SRCS(
)
PEERDIR(
- library/cpp/yt/assert
yt/yql/providers/yt/fmr/coordinator/impl
yt/yql/providers/yt/fmr/job_factory/impl
yt/yql/providers/yt/fmr/worker/impl
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ya.make b/yt/yql/providers/yt/fmr/worker/impl/ya.make
index 28cfcbea66..9eb66cb8dd 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/ya.make
+++ b/yt/yql/providers/yt/fmr/worker/impl/ya.make
@@ -7,7 +7,8 @@ SRCS(
PEERDIR(
library/cpp/random_provider
library/cpp/threading/future
- yt/yql/providers/yt/fmr/worker/interface
+ yt/yql/providers/yt/fmr/coordinator/interface
+ yt/yql/providers/yt/fmr/job_factory/interface
yql/essentials/utils
)
diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
index 9502faf806..a1a3a186fe 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
+++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
@@ -133,7 +133,7 @@ private:
} // namespace
IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings) {
- return MakeIntrusive<TFmrWorker>(coordinator, jobFactory, settings);
+ return MakeHolder<TFmrWorker>(coordinator, jobFactory, settings);
}
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h
index cbeff4bbf5..bff6ddff9e 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h
+++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h
@@ -1,14 +1,18 @@
#include <library/cpp/random_provider/random_provider.h>
-#include <yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h>
+#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
+#include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h>
+#include <yql/essentials/utils/runnable.h>
namespace NYql::NFmr {
struct TFmrWorkerSettings {
ui32 WorkerId;
- TIntrusivePtr<IRandomProvider> RandomProvider;
+ TIntrusivePtr<IRandomProvider> RandomProvider = CreateDefaultRandomProvider();
TDuration TimeToSleepBetweenRequests = TDuration::Seconds(1);
};
-IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator,IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings);
+using IFmrWorker = IRunnable;
+
+IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings);
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/worker/interface/ya.make b/yt/yql/providers/yt/fmr/worker/interface/ya.make
deleted file mode 100644
index 2042b984d0..0000000000
--- a/yt/yql/providers/yt/fmr/worker/interface/ya.make
+++ /dev/null
@@ -1,15 +0,0 @@
-LIBRARY()
-
-SRCS(
- yql_yt_worker.cpp
-)
-
-PEERDIR(
- library/cpp/threading/future
- yt/yql/providers/yt/fmr/coordinator/interface
- yt/yql/providers/yt/fmr/job_factory/interface
-)
-
-YQL_LAST_ABI_VERSION()
-
-END()
diff --git a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.cpp b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.cpp
deleted file mode 100644
index 4c6001a217..0000000000
--- a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.cpp
+++ /dev/null
@@ -1 +0,0 @@
-#include "yql_yt_worker.h"
diff --git a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h
deleted file mode 100644
index 751324e795..0000000000
--- a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h
+++ /dev/null
@@ -1,19 +0,0 @@
-#include <library/cpp/threading/future/core/future.h>
-
-#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
-#include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h>
-
-namespace NYql::NFmr {
-
-class IFmrWorker: public TThrRefBase {
-public:
- using TPtr = TIntrusivePtr<IFmrWorker>;
-
- virtual ~IFmrWorker() = default;
-
- virtual void Start() = 0;
-
- virtual void Stop() = 0;
-};
-
-} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
index 039b3747d1..db00f2ff59 100644
--- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
+++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
@@ -38,11 +38,12 @@ public:
auto getOperationStatusesFunc = [&] {
while (!StopFmrGateway_) {
with_lock(SessionStates_->Mutex) {
- auto checkOperationStatuses = [&] <typename T> (std::unordered_map<TString, TPromise<T>>& operationStatuses) {
- std::vector<TString> completedOperationIds;
+ auto checkOperationStatuses = [&] <typename T> (std::unordered_map<TString, TPromise<T>>& operationStatuses, const TString& sessionId) {
for (auto& [operationId, promise]: operationStatuses) {
+ YQL_CLOG(DEBUG, FastMapReduce) << "Sending get operation request to coordinator with operationId: " << operationId;
+
auto getOperationFuture = Coordinator_->GetOperation({operationId});
- getOperationFuture.Subscribe([&, operationId] (const auto& getFuture) {
+ getOperationFuture.Subscribe([&, operationId, sessionId] (const auto& getFuture) {
auto getOperationResult = getFuture.GetValueSync();
auto getOperationStatus = getOperationResult.Status;
auto operationErrorMessages = getOperationResult.ErrorMessages;
@@ -52,22 +53,30 @@ public:
// operation finished, set value in future returned in Publish / Download
bool hasCompletedSuccessfully = getOperationStatus == EOperationStatus::Completed;
SendOperationCompletionSignal(promise, hasCompletedSuccessfully, operationErrorMessages);
- completedOperationIds.emplace_back(operationId);
+ YQL_CLOG(DEBUG, FastMapReduce) << "Sending delete operation request to coordinator with operationId: " << operationId;
+ auto deleteOperationFuture = Coordinator_->DeleteOperation({operationId});
+ deleteOperationFuture.Subscribe([&, sessionId, operationId] (const auto& deleteFuture) {
+ auto deleteOperationResult = deleteFuture.GetValueSync();
+ auto deleteOperationStatus = deleteOperationResult.Status;
+ YQL_ENSURE(deleteOperationStatus == EOperationStatus::Aborted || deleteOperationStatus == EOperationStatus::NotFound);
+ with_lock(SessionStates_->Mutex) {
+ YQL_ENSURE( SessionStates_->Sessions.contains(sessionId));
+ auto& sessionInfo = SessionStates_->Sessions[sessionId];
+ auto& operationStates = sessionInfo.OperationStates;
+ operationStates.DownloadOperationStatuses.erase(operationId);
+ operationStates.UploadOperationStatuses.erase(operationId);
+ }
+ });
}
}
});
}
-
- for (auto& operationId: completedOperationIds) {
- Coordinator_->DeleteOperation({operationId}).GetValueSync();
- operationStatuses.erase(operationId);
- }
};
- for (auto& [sessionId, sessionInfo]: SessionStates_->Sessions) {
+ for (auto [sessionId, sessionInfo]: SessionStates_->Sessions) {
auto& operationStates = sessionInfo.OperationStates;
- checkOperationStatuses(operationStates.DownloadOperationStatuses);
- checkOperationStatuses(operationStates.UploadOperationStatuses);
+ checkOperationStatuses(operationStates.DownloadOperationStatuses, sessionId);
+ checkOperationStatuses(operationStates.UploadOperationStatuses, sessionId);
}
}
Sleep(TimeToSleepBetweenGetOperationRequests_);
@@ -82,6 +91,7 @@ public:
}
TFuture<TPublishResult> Publish(const TExprNode::TPtr& node, TExprContext& ctx, TPublishOptions&& options) final {
+ YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
if (!Coordinator_) {
return Slave_->Publish(node, ctx, std::move(options));
}
@@ -98,7 +108,6 @@ public:
TFuture<void> downloadedSuccessfully;
TString sessionId = options.SessionId();
- YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__, sessionId);
with_lock(SessionStates_->Mutex) {
auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses;
@@ -130,9 +139,11 @@ public:
auto promise = NewPromise<TPublishResult>();
auto future = promise.GetFuture();
- auto startOperationResponseFuture = Coordinator_->StartOperation(uploadRequest);
- startOperationResponseFuture.Subscribe([this, promise = std::move(promise), &sessionId] (const auto& startFuture) {
- TString operationId = startFuture.GetValueSync().OperationId;
+ YQL_CLOG(DEBUG, FastMapReduce) << "Starting upload to yt table: " << cluster + "." + outputPath;
+ auto uploadOperationResponseFuture = Coordinator_->StartOperation(uploadRequest);
+ uploadOperationResponseFuture.Subscribe([this, promise = std::move(promise), sessionId] (const auto& uploadFuture) {
+ TStartOperationResponse startOperationResponse = uploadFuture.GetValueSync();
+ TString operationId = startOperationResponse.OperationId;
with_lock(SessionStates_->Mutex) {
YQL_ENSURE(SessionStates_->Sessions.contains(sessionId));
auto& operationStates = SessionStates_->Sessions[sessionId].OperationStates;
@@ -156,14 +167,15 @@ public:
.TaskType = ETaskType::Download, .TaskParams = downloadTaskParams, .SessionId = sessionId, .IdempotencyKey=idempotencyKey, .NumRetries=1
};
- YQL_CLOG(INFO, FastMapReduce) << "Downloading table with cluster " << ytTableRef.Cluster << " and path " << ytTableRef.Path << " from yt to fmr";
+ YQL_CLOG(DEBUG, FastMapReduce) << "Starting download to from yt table: " << fmrTableId;
auto promise = NewPromise<TDownloadTableToFmrResult>();
auto future = promise.GetFuture();
- auto startOperationResponseFuture = Coordinator_->StartOperation(downloadRequest);
- startOperationResponseFuture.Subscribe([this, promise = std::move(promise), &sessionId] (const auto& startFuture) {
- TString operationId = startFuture.GetValueSync().OperationId;
+ auto downloadOperationResponseFuture = Coordinator_->StartOperation(downloadRequest);
+ downloadOperationResponseFuture.Subscribe([this, promise = std::move(promise), sessionId] (const auto& downloadFuture) {
+ TStartOperationResponse downloadOperationResponse = downloadFuture.GetValueSync();
+ TString operationId = downloadOperationResponse.OperationId;
with_lock(SessionStates_->Mutex) {
auto& operationStates = SessionStates_->Sessions[sessionId].OperationStates;
auto& downloadOperationStatuses = operationStates.DownloadOperationStatuses;
@@ -219,13 +231,9 @@ public:
cancelOperationsFutures.emplace_back(Coordinator_->DeleteOperation({operationId}));
}
NThreading::WaitAll(cancelOperationsFutures).GetValueSync();
- std::vector<TFuture<T>> resultFutures;
-
for (auto& [operationId, promise]: operationStatuses) {
SendOperationCompletionSignal(promise, false);
- resultFutures.emplace_back(promise.GetFuture());
}
- NThreading::WaitAll(resultFutures).GetValueSync();
};
cancelOperationsFunc(operationStates.DownloadOperationStatuses);
@@ -265,7 +273,7 @@ private:
template <std::derived_from<NCommon::TOperationResult> T>
void SendOperationCompletionSignal(TPromise<T> promise, bool completedSuccessfully = false, const std::vector<TFmrError>& errorMessages = {}) {
YQL_ENSURE(!promise.HasValue());
- T commonOperationResult;
+ T commonOperationResult{};
if (completedSuccessfully) {
commonOperationResult.SetSuccess();
} else if (!errorMessages.empty()) {
@@ -279,7 +287,7 @@ private:
} // namespace
IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave, IFmrCoordinator::TPtr coordinator, const TFmrYtGatewaySettings& settings) {
- return MakeIntrusive<TFmrYtGateway>(std::move(slave), std::move(coordinator), settings);
+ return MakeIntrusive<TFmrYtGateway>(std::move(slave), coordinator, settings);
}
} // namespace NYql::NFmr
diff --git a/yt/yql/tools/ytrun/lib/ya.make b/yt/yql/tools/ytrun/lib/ya.make
index 93b20d7ed0..007272ae8e 100644
--- a/yt/yql/tools/ytrun/lib/ya.make
+++ b/yt/yql/tools/ytrun/lib/ya.make
@@ -6,6 +6,7 @@ SRCS(
PEERDIR(
yt/yql/providers/yt/provider
+ yt/yql/providers/yt/fmr/coordinator/client
yt/yql/providers/yt/fmr/coordinator/impl
yt/yql/providers/yt/fmr/job_factory/impl
yt/yql/providers/yt/fmr/worker/impl
diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
index ed5a86fc0b..9726d00d5a 100644
--- a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
+++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
@@ -8,6 +8,7 @@
#include <yt/yql/providers/yt/lib/log/yt_logger.h>
#include <yt/yql/providers/yt/gateway/native/yql_yt_native.h>
#include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h>
+#include <yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h>
#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
@@ -113,6 +114,13 @@ TYtRunTool::TYtRunTool(TString name)
.Optional()
.NoArgument()
.SetFlag(&GetRunOptions().UseMetaFromGrpah);
+ opts.AddLongOption("fmr-coordinator-server-url", "Fast map reduce coordinator server url")
+ .Optional()
+ .StoreResult(&FmrCoordinatorServerUrl_);
+ opts.AddLongOption("disable-local-fmr-worker", "Disable local fast map reduce worker")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&DisableLocalFmrWorker_);
});
GetRunOptions().AddOptHandler([this](const NLastGetopt::TOptsParseResult& res) {
@@ -171,27 +179,40 @@ IYtGateway::TPtr TYtRunTool::CreateYtGateway() {
services.FunctionRegistry = GetFuncRegistry().Get();
services.FileStorage = GetFileStorage();
services.Config = std::make_shared<TYtGatewayConfig>(GetRunOptions().GatewaysConfig->GetYt());
- auto ytGateway = CreateYtNativeGateway(services);
+ auto ytGateway = CreateYtNativeGateway(services);
if (!GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName)) {
return ytGateway;
}
auto coordinator = NFmr::MakeFmrCoordinator();
- auto func = [&] (NFmr::TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
- while (!cancelFlag->load()) {
- Sleep(TDuration::Seconds(3));
- return NFmr::ETaskStatus::Completed;
+ if (!FmrCoordinatorServerUrl_.empty()) {
+ NFmr::TFmrCoordinatorClientSettings coordinatorClientSettings;
+ THttpURL parsedUrl;
+ if (parsedUrl.Parse(FmrCoordinatorServerUrl_) != THttpURL::ParsedOK) {
+ ythrow yexception() << "Invalid fast map reduce coordinator server url passed in parameters";
}
- return NFmr::ETaskStatus::Aborted;
- }; // TODO - use function which actually calls Downloader/Uploader based on task params
-
- NFmr::TFmrJobFactorySettings settings{.Function=func};
- auto jobFactory = MakeFmrJobFactory(settings);
- NFmr::TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDefaultRandomProvider(),
- .TimeToSleepBetweenRequests=TDuration::Seconds(1)};
- FmrWorker_ = MakeFmrWorker(coordinator, jobFactory, workerSettings);
- FmrWorker_->Start();
- return CreateYtFmrGateway(ytGateway, coordinator);
+ coordinatorClientSettings.Port = parsedUrl.GetPort();
+ coordinatorClientSettings.Host = parsedUrl.GetHost();
+ coordinator = NFmr::MakeFmrCoordinatorClient(coordinatorClientSettings);
+ }
+
+ if (!DisableLocalFmrWorker_) {
+ auto func = [&] (NFmr::TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ while (!cancelFlag->load()) {
+ Sleep(TDuration::Seconds(3));
+ return NFmr::ETaskStatus::Completed;
+ }
+ return NFmr::ETaskStatus::Aborted;
+ }; // TODO - use function which actually calls Downloader/Uploader based on task params
+
+ NFmr::TFmrJobFactorySettings settings{.Function=func};
+ auto jobFactory = MakeFmrJobFactory(settings);
+ NFmr::TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDefaultRandomProvider(),
+ .TimeToSleepBetweenRequests=TDuration::Seconds(1)};
+ FmrWorker_ = MakeFmrWorker(coordinator, jobFactory, workerSettings);
+ FmrWorker_->Start();
+ }
+ return NFmr::CreateYtFmrGateway(ytGateway, coordinator);
}
IOptimizerFactory::TPtr TYtRunTool::CreateCboFactory() {
diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.h b/yt/yql/tools/ytrun/lib/ytrun_lib.h
index 96c62a2c33..cb72e40304 100644
--- a/yt/yql/tools/ytrun/lib/ytrun_lib.h
+++ b/yt/yql/tools/ytrun/lib/ytrun_lib.h
@@ -34,6 +34,8 @@ protected:
bool KeepTemp_ = false;
TString DefYtServer_;
NFmr::IFmrWorker::TPtr FmrWorker_;
+ TString FmrCoordinatorServerUrl_;
+ bool DisableLocalFmrWorker_ = false;
};
} // NYql