diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-02-19 03:50:37 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-02-19 04:05:35 +0300 |
commit | a71ea8c4136c53816147a1478f0db6c28f6d7c89 (patch) | |
tree | 8b16fe508d3b903935393014d56881f64f8da419 | |
parent | afe5f174d1537a99ec81b46bd61861bc7f1a29ee (diff) | |
download | ydb-a71ea8c4136c53816147a1478f0db6c28f6d7c89.tar.gz |
Intermediate changes
commit_hash:e2f7171fc4070059f585443adfecfb9c7ca93e17
24 files changed, 649 insertions, 301 deletions
diff --git a/yql/essentials/utils/runnable.h b/yql/essentials/utils/runnable.h new file mode 100644 index 0000000000..8179aa7496 --- /dev/null +++ b/yql/essentials/utils/runnable.h @@ -0,0 +1,16 @@ +#include <util/generic/ptr.h> + +namespace NYql { + +class IRunnable: public TThrRefBase { +public: + using TPtr = THolder<IRunnable>; + + virtual ~IRunnable() = default; + + virtual void Start() = 0; + + virtual void Stop() = 0; +}; + +} // namespace NYql diff --git a/yql/essentials/utils/ya.make b/yql/essentials/utils/ya.make index 12634d6d64..25fdf1775d 100644 --- a/yql/essentials/utils/ya.make +++ b/yql/essentials/utils/ya.make @@ -29,6 +29,7 @@ SRCS( resetable_setting.h retry.cpp retry.h + runnable.h sort.cpp sort.h swap_bytes.cpp diff --git a/yt/yql/providers/yt/fmr/coordinator/client/ya.make b/yt/yql/providers/yt/fmr/coordinator/client/ya.make new file mode 100644 index 0000000000..589e7a8522 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/client/ya.make @@ -0,0 +1,18 @@ +LIBRARY() + +SRCS( + yql_yt_coordinator_client.cpp +) + +PEERDIR( + library/cpp/http/simple + library/cpp/threading/future + yt/yql/providers/yt/fmr/coordinator/interface + yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers + yt/yql/providers/yt/fmr/proto + yql/essentials/utils +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp b/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp new file mode 100644 index 0000000000..e128aece48 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp @@ -0,0 +1,83 @@ +#include <library/cpp/http/simple/http_client.h> + +#include <yt/yql/providers/yt/fmr/proto/coordinator.pb.h> +#include <yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.h> + +#include <yql/essentials/utils/yql_panic.h> + +#include "yql_yt_coordinator_client.h" + +namespace NYql::NFmr { + +namespace { + +class TFmrCoordinatorClient: public IFmrCoordinator { +public: + TFmrCoordinatorClient(const TFmrCoordinatorClientSettings& settings): Host_(settings.Host), Port_(settings.Port) + { + Headers_["Content-Type"] = "application/x-protobuf"; + } + + NThreading::TFuture<TStartOperationResponse> StartOperation(const TStartOperationRequest& startOperationRequest) override { + NProto::TStartOperationRequest protoStartOperationRequest = StartOperationRequestToProto(startOperationRequest); + TString startOperationRequestUrl = "/operation"; + auto httpClient = TKeepAliveHttpClient(Host_, Port_); + TStringStream outputStream; + + httpClient.DoPost(startOperationRequestUrl, protoStartOperationRequest.SerializeAsString(), &outputStream, Headers_); + TString serializedResponse = outputStream.ReadAll(); + NProto::TStartOperationResponse protoStartOperationResponse; + YQL_ENSURE(protoStartOperationResponse.ParseFromString(serializedResponse)); + return NThreading::MakeFuture(StartOperationResponseFromProto(protoStartOperationResponse)); + } + + NThreading::TFuture<TGetOperationResponse> GetOperation(const TGetOperationRequest& getOperationRequest) override { + TString getOperationRequestUrl = "/operation/" + getOperationRequest.OperationId; + auto httpClient = TKeepAliveHttpClient(Host_, Port_); + TStringStream outputStream; + + httpClient.DoGet(getOperationRequestUrl, &outputStream, Headers_); + TString serializedResponse = outputStream.ReadAll(); + NProto::TGetOperationResponse protoGetOperationResponse; + YQL_ENSURE(protoGetOperationResponse.ParseFromString(serializedResponse)); + return NThreading::MakeFuture(GetOperationResponseFromProto(protoGetOperationResponse)); + } + + NThreading::TFuture<TDeleteOperationResponse> DeleteOperation(const TDeleteOperationRequest& deleteOperationRequest) override { + TString deleteOperationRequestUrl = "/operation/" + deleteOperationRequest.OperationId; + auto httpClient = TKeepAliveHttpClient(Host_, Port_); + TStringStream outputStream; + + httpClient.DoRequest("DELETE", deleteOperationRequestUrl, "", &outputStream, Headers_); + TString serializedResponse = outputStream.ReadAll(); + NProto::TDeleteOperationResponse protoDeleteOperationResponse; + YQL_ENSURE(protoDeleteOperationResponse.ParseFromString(serializedResponse)); + return NThreading::MakeFuture(DeleteOperationResponseFromProto(protoDeleteOperationResponse)); + } + + NThreading::TFuture<THeartbeatResponse> SendHeartbeatResponse(const THeartbeatRequest& heartbeatRequest) override { + NProto::THeartbeatRequest protoSendHeartbeatRequest = HeartbeatRequestToProto(heartbeatRequest); + TString sendHearbeatRequestUrl = "/worker_heartbeat"; + auto httpClient = TKeepAliveHttpClient(Host_, Port_); + TStringStream outputStream; + + httpClient.DoPost(sendHearbeatRequestUrl, protoSendHeartbeatRequest.SerializeAsString(), &outputStream, Headers_); + TString serializedResponse = outputStream.ReadAll(); + NProto::THeartbeatResponse protoHeartbeatResponse; + YQL_ENSURE(protoHeartbeatResponse.ParseFromString(serializedResponse)); + return NThreading::MakeFuture(HeartbeatResponseFromProto(protoHeartbeatResponse)); + } + +private: + TString Host_; + ui16 Port_; + TSimpleHttpClient::THeaders Headers_; +}; + +} // namespace + +IFmrCoordinator::TPtr MakeFmrCoordinatorClient(const TFmrCoordinatorClientSettings& settings) { + return MakeIntrusive<TFmrCoordinatorClient>(settings); +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h b/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h new file mode 100644 index 0000000000..8c9b039cf7 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h @@ -0,0 +1,12 @@ +#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> + +namespace NYql::NFmr { + +struct TFmrCoordinatorClientSettings { + ui16 Port; + TString Host = "localhost"; +}; + +IFmrCoordinator::TPtr MakeFmrCoordinatorClient(const TFmrCoordinatorClientSettings& settings); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make index a661e2cb18..2f67f6d05e 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make @@ -5,7 +5,6 @@ SRCS( ) PEERDIR( - library/cpp/yt/assert yt/yql/providers/yt/fmr/coordinator/impl yt/yql/providers/yt/fmr/job_factory/impl yt/yql/providers/yt/fmr/worker/impl diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h index 5a8f6eee55..d8a526096a 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h @@ -1,3 +1,5 @@ +#pragma once + #include <library/cpp/random_provider/random_provider.h> #include <util/system/mutex.h> #include <util/system/guard.h> diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make new file mode 100644 index 0000000000..62df195306 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + yql_yt_coordinator_proto_helpers.cpp +) + +PEERDIR( + yt/yql/providers/yt/fmr/coordinator/interface + yt/yql/providers/yt/fmr/proto + yt/yql/providers/yt/fmr/request_options/proto_helpers +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp new file mode 100644 index 0000000000..85f7d5834a --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp @@ -0,0 +1,133 @@ +#include "yql_yt_coordinator_proto_helpers.h" + +namespace NYql::NFmr { + +NProto::THeartbeatRequest HeartbeatRequestToProto(const THeartbeatRequest& heartbeatRequest) { + NProto::THeartbeatRequest protoHeartbeatRequest; + protoHeartbeatRequest.SetWorkerId(heartbeatRequest.WorkerId); + protoHeartbeatRequest.SetVolatileId(heartbeatRequest.VolatileId); + std::vector<NProto::TTaskState> taskStates; + for (size_t i = 0; i < heartbeatRequest.TaskStates.size(); ++i) { + auto protoTaskState = TaskStateToProto(*heartbeatRequest.TaskStates[i]); + protoHeartbeatRequest.AddTaskStates(); + protoHeartbeatRequest.MutableTaskStates(i)->Swap(&protoTaskState); + } + return protoHeartbeatRequest; +} + +THeartbeatRequest HeartbeatRequestFromProto(const NProto::THeartbeatRequest protoHeartbeatRequest) { + THeartbeatRequest heartbeatRequest; + heartbeatRequest.WorkerId = protoHeartbeatRequest.GetWorkerId(); + heartbeatRequest.VolatileId = protoHeartbeatRequest.GetVolatileId(); + std::vector<TTaskState::TPtr> taskStates; + for (size_t i = 0; i < protoHeartbeatRequest.TaskStatesSize(); ++i) { + TTaskState curTaskState = TaskStateFromProto(protoHeartbeatRequest.GetTaskStates(i)); + taskStates.emplace_back(TIntrusivePtr<TTaskState>(new TTaskState(curTaskState))); + } + heartbeatRequest.TaskStates = taskStates; + return heartbeatRequest; +} + +NProto::THeartbeatResponse HeartbeatResponseToProto(const THeartbeatResponse& heartbeatResponse) { + NProto::THeartbeatResponse protoHeartbeatResponse; + for (size_t i = 0; i < heartbeatResponse.TasksToRun.size(); ++i) { + auto protoTask = TaskToProto(*heartbeatResponse.TasksToRun[i]); + auto * taskToRun = protoHeartbeatResponse.AddTasksToRun(); + taskToRun->Swap(&protoTask); + } + for (auto& id: heartbeatResponse.TaskToDeleteIds) { + protoHeartbeatResponse.AddTaskToDeleteIds(id); + } + return protoHeartbeatResponse; +} + +THeartbeatResponse HeartbeatResponseFromProto(const NProto::THeartbeatResponse& protoHeartbeatResponse) { + THeartbeatResponse heartbeatResponse; + std::vector<TTask::TPtr> tasksToRun; + std::unordered_set<TString> taskToDeleteIds; + for (size_t i = 0; i < protoHeartbeatResponse.TasksToRunSize(); ++i) { + TTask curTask = TaskFromProto(protoHeartbeatResponse.GetTasksToRun(i)); + tasksToRun.emplace_back(TIntrusivePtr<TTask>(new TTask(curTask))); + } + for (size_t i = 0; i < protoHeartbeatResponse.TaskToDeleteIdsSize(); ++i) { + taskToDeleteIds.emplace(protoHeartbeatResponse.GetTaskToDeleteIds(i)); + } + + heartbeatResponse.TasksToRun = tasksToRun; + heartbeatResponse.TaskToDeleteIds = taskToDeleteIds; + return heartbeatResponse; +} + +NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperationRequest& startOperationRequest) { + NProto::TStartOperationRequest protoStartOperationRequest; + protoStartOperationRequest.SetTaskType(static_cast<NProto::ETaskType>(startOperationRequest.TaskType)); + auto protoTaskParams = TaskParamsToProto(startOperationRequest.TaskParams); + protoStartOperationRequest.MutableTaskParams()->Swap(&protoTaskParams); + protoStartOperationRequest.SetSessionId(startOperationRequest.SessionId); + if (startOperationRequest.IdempotencyKey) { + protoStartOperationRequest.SetIdempotencyKey(*startOperationRequest.IdempotencyKey); + } + protoStartOperationRequest.SetNumRetries(startOperationRequest.NumRetries); + return protoStartOperationRequest; +} + +TStartOperationRequest StartOperationRequestFromProto(const NProto::TStartOperationRequest& protoStartOperationRequest) { + TStartOperationRequest startOperationRequest; + startOperationRequest.TaskType = static_cast<ETaskType>(protoStartOperationRequest.GetTaskType()); + startOperationRequest.TaskParams = TaskParamsFromProto(protoStartOperationRequest.GetTaskParams()); + startOperationRequest.SessionId = protoStartOperationRequest.GetSessionId(); + if (protoStartOperationRequest.HasIdempotencyKey()) { + startOperationRequest.IdempotencyKey = protoStartOperationRequest.GetIdempotencyKey(); + } + startOperationRequest.NumRetries = protoStartOperationRequest.GetNumRetries(); + return startOperationRequest; +} + +NProto::TStartOperationResponse StartOperationResponseToProto(const TStartOperationResponse& startOperationResponse) { + NProto::TStartOperationResponse protoStartOperationResponse; + protoStartOperationResponse.SetOperationId(startOperationResponse.OperationId); + protoStartOperationResponse.SetStatus(static_cast<NProto::EOperationStatus>(startOperationResponse.Status)); + return protoStartOperationResponse; +} + +TStartOperationResponse StartOperationResponseFromProto(const NProto::TStartOperationResponse& protoStartOperationResponse) { + return TStartOperationResponse{ + .Status = static_cast<EOperationStatus>(protoStartOperationResponse.GetStatus()), + .OperationId = protoStartOperationResponse.GetOperationId() + }; +} + +NProto::TGetOperationResponse GetOperationResponseToProto(const TGetOperationResponse& getOperationResponse) { + NProto::TGetOperationResponse protoGetOperationResponse; + protoGetOperationResponse.SetStatus(static_cast<NProto::EOperationStatus>(getOperationResponse.Status)); + for (auto& errorMessage: getOperationResponse.ErrorMessages) { + auto* curError = protoGetOperationResponse.AddErrorMessages(); + auto protoError = FmrErrorToProto(errorMessage); + curError->Swap(&protoError); + } + return protoGetOperationResponse; +} + +TGetOperationResponse GetOperationResponseFromProto(const NProto::TGetOperationResponse protoGetOperationReponse) { + TGetOperationResponse getOperationResponse; + getOperationResponse.Status = static_cast<EOperationStatus>(protoGetOperationReponse.GetStatus()); + std::vector<TFmrError> errorMessages; + for (size_t i = 0; i < protoGetOperationReponse.ErrorMessagesSize(); ++i) { + TFmrError errorMessage = FmrErrorFromProto(protoGetOperationReponse.GetErrorMessages(i)); + errorMessages.emplace_back(errorMessage); + } + getOperationResponse.ErrorMessages = errorMessages; + return getOperationResponse; +} + +NProto::TDeleteOperationResponse DeleteOperationResponseToProto(const TDeleteOperationResponse& deleteOperationResponse) { + NProto::TDeleteOperationResponse protoDeleteOperationResponse; + protoDeleteOperationResponse.SetStatus(static_cast<NProto::EOperationStatus>(deleteOperationResponse.Status)); + return protoDeleteOperationResponse; +} + +TDeleteOperationResponse DeleteOperationResponseFromProto(const NProto::TDeleteOperationResponse& protoDeleteOperationResponse) { + return TDeleteOperationResponse{.Status = static_cast<EOperationStatus>(protoDeleteOperationResponse.GetStatus())}; +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.h b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.h new file mode 100644 index 0000000000..d09034b078 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.h @@ -0,0 +1,33 @@ +#pragma once + +#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> +#include <yt/yql/providers/yt/fmr/proto/coordinator.pb.h> +#include <yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h> + +namespace NYql::NFmr { + +NProto::THeartbeatRequest HeartbeatRequestToProto(const THeartbeatRequest& heartbeatRequest); + +THeartbeatRequest HeartbeatRequestFromProto(const NProto::THeartbeatRequest protoHeartbeatRequest); + +NProto::THeartbeatResponse HeartbeatResponseToProto(const THeartbeatResponse& heartbeatResponse); + +THeartbeatResponse HeartbeatResponseFromProto(const NProto::THeartbeatResponse& protoHeartbeatResponse); + +NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperationRequest& startOperationRequest); + +TStartOperationRequest StartOperationRequestFromProto(const NProto::TStartOperationRequest& protoStartOperationRequest); + +NProto::TStartOperationResponse StartOperationResponseToProto(const TStartOperationResponse& startOperationResponse); + +TStartOperationResponse StartOperationResponseFromProto(const NProto::TStartOperationResponse& protoStartOperationResponse); + +NProto::TGetOperationResponse GetOperationResponseToProto(const TGetOperationResponse& getOperationResponse); + +TGetOperationResponse GetOperationResponseFromProto(const NProto::TGetOperationResponse protoGetOperationReponse); + +NProto::TDeleteOperationResponse DeleteOperationResponseToProto(const TDeleteOperationResponse& deleteOperationResponse); + +TDeleteOperationResponse DeleteOperationResponseFromProto(const NProto::TDeleteOperationResponse& protoDeleteOperationResponse); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/proto/coordinator.proto b/yt/yql/providers/yt/fmr/proto/coordinator.proto index 930b4b3bd0..33990caa28 100644 --- a/yt/yql/providers/yt/fmr/proto/coordinator.proto +++ b/yt/yql/providers/yt/fmr/proto/coordinator.proto @@ -29,19 +29,11 @@ message TStartOperationResponse { string OperationId = 2; } -message TGetOperationRequest { - string OperationId = 1; -} - message TGetOperationResponse { EOperationStatus Status = 1; repeated TFmrError ErrorMessages = 2; } -message TDeleteOperationRequest { - string OperationId = 1; -} - message TDeleteOperationResponse { EOperationStatus Status = 1; } diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp index 48dcad24c7..21674b5fc4 100644 --- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp @@ -1 +1,215 @@ #include "yql_yt_request_proto_helpers.h" + +namespace NYql::NFmr { + +NProto::TFmrError FmrErrorToProto(const TFmrError& error) { + NProto::TFmrError protoError; + protoError.SetComponent(static_cast<NProto::EFmrComponent>(error.Component)); + protoError.SetErrorMessage(error.ErrorMessage); + if (error.WorkerId) { + protoError.SetWorkerId(*error.WorkerId); + } + if (error.TaskId) { + protoError.SetTaskId(*error.TaskId); + } + if (error.OperationId) { + protoError.SetOperationId(*error.OperationId); + } + return protoError; +} + +TFmrError FmrErrorFromProto(const NProto::TFmrError& protoError) { + TFmrError fmrError; + fmrError.Component = static_cast<EFmrComponent>(protoError.GetComponent()); + fmrError.ErrorMessage = protoError.GetErrorMessage(); + if (protoError.HasWorkerId()) { + fmrError.WorkerId = protoError.GetWorkerId(); + } + if (protoError.HasTaskId()) { + fmrError.TaskId = protoError.GetTaskId(); + } + if (protoError.HasOperationId()) { + fmrError.OperationId = protoError.GetOperationId(); + } + return fmrError; +} + +NProto::TYtTableRef YtTableRefToProto(const TYtTableRef& ytTableRef) { + NProto::TYtTableRef protoYtTableRef; + protoYtTableRef.SetPath(ytTableRef.Path); + protoYtTableRef.SetCluster(ytTableRef.Cluster); + protoYtTableRef.SetTransactionId(ytTableRef.TransactionId); + return protoYtTableRef; +} + +TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef) { + TYtTableRef ytTableRef; + ytTableRef.Path = protoYtTableRef.GetPath(); + ytTableRef.Cluster = protoYtTableRef.GetCluster(); + ytTableRef.TransactionId = protoYtTableRef.GetTransactionId(); + return ytTableRef; +} + +NProto::TFmrTableRef FmrTableRefToProto(const TFmrTableRef& fmrTableRef) { + NProto::TFmrTableRef protoFmrTableRef; + protoFmrTableRef.SetTableId(fmrTableRef.TableId); + return protoFmrTableRef; +} + +TFmrTableRef FmrTableRefFromProto(const NProto::TFmrTableRef protoFmrTableRef) { + TFmrTableRef fmrTableRef; + fmrTableRef.TableId = protoFmrTableRef.GetTableId(); + return fmrTableRef; +} + +NProto::TTableRef TableRefToProto(const TTableRef& tableRef) { + NProto::TTableRef protoTableRef; + if (auto* ytTableRefPtr = std::get_if<TYtTableRef>(&tableRef.TableRef)) { + NProto::TYtTableRef protoYtTableRef = YtTableRefToProto(*ytTableRefPtr); + protoTableRef.MutableYtTableRef()->Swap(&protoYtTableRef); + } else { + auto* fmrTableRefPtr = std::get_if<TFmrTableRef>(&tableRef.TableRef); + NProto::TFmrTableRef protoFmrTableRef = FmrTableRefToProto(*fmrTableRefPtr); + protoTableRef.MutableFmrTableRef()->Swap(&protoFmrTableRef); + } + return protoTableRef; +} + +TTableRef TableRefFromProto(const NProto::TTableRef& protoTableRef) { + std::variant<TYtTableRef, TFmrTableRef> tableRef; + if (protoTableRef.HasYtTableRef()) { + tableRef = YtTableRefFromProto(protoTableRef.GetYtTableRef()); + } else { + tableRef = FmrTableRefFromProto(protoTableRef.GetFmrTableRef()); + } + return {tableRef}; +} + +NProto::TUploadTaskParams UploadTaskParamsToProto(const TUploadTaskParams& uploadTaskParams) { + NProto::TUploadTaskParams protoUploadTaskParams; + auto input = FmrTableRefToProto(uploadTaskParams.Input); + auto output = YtTableRefToProto(uploadTaskParams.Output); + protoUploadTaskParams.MutableInput()->Swap(&input); + protoUploadTaskParams.MutableOutput()->Swap(&output); + return protoUploadTaskParams; +} + +TUploadTaskParams UploadTaskParamsFromProto(const NProto::TUploadTaskParams& protoUploadTaskParams) { + TUploadTaskParams uploadTaskParams; + uploadTaskParams.Input = FmrTableRefFromProto(protoUploadTaskParams.GetInput()); + uploadTaskParams.Output = YtTableRefFromProto(protoUploadTaskParams.GetOutput()); + return uploadTaskParams; +} + +NProto::TDownloadTaskParams DownloadTaskParamsToProto(const TDownloadTaskParams& downloadTaskParams) { + NProto::TDownloadTaskParams protoDownloadTaskParams; + auto input = YtTableRefToProto(downloadTaskParams.Input); + auto output = FmrTableRefToProto(downloadTaskParams.Output); + protoDownloadTaskParams.MutableInput()->Swap(&input); + protoDownloadTaskParams.MutableOutput()->Swap(&output); + return protoDownloadTaskParams; +} + +TDownloadTaskParams DownloadTaskParamsFromProto(const NProto::TDownloadTaskParams& protoDownloadTaskParams) { + TDownloadTaskParams downloadTaskParams; + downloadTaskParams.Input = YtTableRefFromProto(protoDownloadTaskParams.GetInput()); + downloadTaskParams.Output = FmrTableRefFromProto(protoDownloadTaskParams.GetOutput()); + return downloadTaskParams; +} + +NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTaskParams) { + NProto::TMergeTaskParams protoMergeTaskParams; + std::vector<NProto::TTableRef> inputTables; + for (size_t i = 0; i < mergeTaskParams.Input.size(); ++i) { + auto inputTable = TableRefToProto(mergeTaskParams.Input[i]); + auto* curInput = protoMergeTaskParams.AddInput(); + curInput->Swap(&inputTable); + } + auto outputTable = FmrTableRefToProto(mergeTaskParams.Output); + protoMergeTaskParams.MutableOutput()->Swap(&outputTable); + return protoMergeTaskParams; +} + +TMergeTaskParams MergeTaskParamsFromProto(const NProto::TMergeTaskParams& protoMergeTaskParams) { + TMergeTaskParams mergeTaskParams; + std::vector<TTableRef> input; + for (size_t i = 0; i < protoMergeTaskParams.InputSize(); ++i) { + TTableRef inputTable = TableRefFromProto(protoMergeTaskParams.GetInput(i)); + input.emplace_back(inputTable); + } + mergeTaskParams.Input = input; + mergeTaskParams.Output = FmrTableRefFromProto(protoMergeTaskParams.GetOutput()); + return mergeTaskParams; +} + +NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams) { + NProto::TTaskParams protoTaskParams; + if (auto* uploadTaskParamsPtr = std::get_if<TUploadTaskParams>(&taskParams)) { + NProto::TUploadTaskParams protoUploadTaskParams = UploadTaskParamsToProto(*uploadTaskParamsPtr); + protoTaskParams.MutableUploadTaskParams()->Swap(&protoUploadTaskParams); + } else if (auto* downloadTaskParamsPtr = std::get_if<TDownloadTaskParams>(&taskParams)) { + NProto::TDownloadTaskParams protoDownloadTaskParams = DownloadTaskParamsToProto(*downloadTaskParamsPtr); + protoTaskParams.MutableDownloadTaskParams()->Swap(&protoDownloadTaskParams); + } else { + auto* mergeTaskParamsPtr = std::get_if<TMergeTaskParams>(&taskParams); + NProto::TMergeTaskParams protoMergeTaskParams = MergeTaskParamsToProto(*mergeTaskParamsPtr); + protoTaskParams.MutableMergeTaskParams()->Swap(&protoMergeTaskParams); + } + return protoTaskParams; +} + +TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) { + TTaskParams taskParams; + if (protoTaskParams.HasDownloadTaskParams()) { + taskParams = DownloadTaskParamsFromProto(protoTaskParams.GetDownloadTaskParams()); + } else if (protoTaskParams.HasUploadTaskParams()) { + taskParams = UploadTaskParamsFromProto(protoTaskParams.GetUploadTaskParams()); + } else { + taskParams = MergeTaskParamsFromProto(protoTaskParams.GetMergeTaskParams()); + } + return taskParams; +} + +NProto::TTask TaskToProto(const TTask& task) { + NProto::TTask protoTask; + protoTask.SetTaskType(static_cast<NProto::ETaskType>(task.TaskType)); + protoTask.SetTaskId(task.TaskId); + auto taskParams = TaskParamsToProto(task.TaskParams); + protoTask.MutableTaskParams()->Swap(&taskParams); + protoTask.SetSessionId(task.SessionId); + protoTask.SetNumRetries(task.NumRetries); + return protoTask; +} + +TTask TaskFromProto(const NProto::TTask& protoTask) { + TTask task; + task.TaskType = static_cast<ETaskType>(protoTask.GetTaskType()); + task.TaskId = protoTask.GetTaskId(); + task.TaskParams = TaskParamsFromProto(protoTask.GetTaskParams()); + task.SessionId = protoTask.GetSessionId(); + task.NumRetries = protoTask.GetNumRetries(); + return task; +} + +NProto::TTaskState TaskStateToProto(const TTaskState& taskState) { + NProto::TTaskState protoTaskState; + protoTaskState.SetTaskStatus(static_cast<NProto::ETaskStatus>(taskState.TaskStatus)); + protoTaskState.SetTaskId(taskState.TaskId); + if (taskState.TaskErrorMessage) { + auto fmrError = FmrErrorToProto(*taskState.TaskErrorMessage); + protoTaskState.MutableTaskErrorMessage()->Swap(&fmrError); + } + return protoTaskState; +} + +TTaskState TaskStateFromProto(const NProto::TTaskState& protoTaskState) { + TTaskState taskState; + taskState.TaskStatus = static_cast<ETaskStatus>(protoTaskState.GetTaskStatus()); + taskState.TaskId = protoTaskState.GetTaskId(); + if (protoTaskState.HasTaskErrorMessage()) { + taskState.TaskErrorMessage = FmrErrorFromProto(protoTaskState.GetTaskErrorMessage()); + } + return taskState; +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h index abfa1b901c..e886e91901 100644 --- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h @@ -1,219 +1,48 @@ #pragma once -#include <variant> #include <yt/yql/providers/yt/fmr/proto/request_options.pb.h> #include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> namespace NYql::NFmr { -NProto::TFmrError FmrErrorToProto(const TFmrError& error) { - NProto::TFmrError protoError; - protoError.SetComponent(static_cast<NProto::EFmrComponent>(error.Component)); - protoError.SetErrorMessage(error.ErrorMessage); - if (error.WorkerId) { - protoError.SetWorkerId(*error.WorkerId); - } - if (error.TaskId) { - protoError.SetTaskId(*error.TaskId); - } - if (error.OperationId) { - protoError.SetOperationId(*error.OperationId); - } - return protoError; -} - -TFmrError FmrErrorFromProto(const NProto::TFmrError& protoError) { - TFmrError fmrError; - fmrError.Component = static_cast<EFmrComponent>(protoError.GetComponent()); - fmrError.ErrorMessage = protoError.GetErrorMessage(); - if (protoError.HasWorkerId()) { - fmrError.WorkerId = protoError.GetWorkerId(); - } - if (protoError.HasTaskId()) { - fmrError.TaskId = protoError.GetTaskId(); - } - if (protoError.HasOperationId()) { - fmrError.OperationId = protoError.GetOperationId(); - } - return fmrError; -} - -NProto::TYtTableRef YtTableRefToProto(const TYtTableRef& ytTableRef) { - NProto::TYtTableRef protoYtTableRef; - protoYtTableRef.SetPath(ytTableRef.Path); - protoYtTableRef.SetCluster(ytTableRef.Cluster); - protoYtTableRef.SetTransactionId(ytTableRef.TransactionId); - return protoYtTableRef; -} - -TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef) { - TYtTableRef ytTableRef; - ytTableRef.Path = protoYtTableRef.GetPath(); - ytTableRef.Cluster = protoYtTableRef.GetCluster(); - ytTableRef.TransactionId = protoYtTableRef.GetTransactionId(); - return ytTableRef; -} - -NProto::TFmrTableRef FmrTableRefToProto(const TFmrTableRef& fmrTableRef) { - NProto::TFmrTableRef protoFmrTableRef; - protoFmrTableRef.SetTableId(fmrTableRef.TableId); - return protoFmrTableRef; -} - -TFmrTableRef FmrTableRefFromProto(const NProto::TFmrTableRef protoFmrTableRef) { - TFmrTableRef fmrTableRef; - fmrTableRef.TableId = protoFmrTableRef.GetTableId(); - return fmrTableRef; -} - -NProto::TTableRef TableRefToProto(const TTableRef& tableRef) { - NProto::TTableRef protoTableRef; - if (auto* ytTableRefPtr = std::get_if<TYtTableRef>(&tableRef.TableRef)) { - NProto::TYtTableRef protoYtTableRef = YtTableRefToProto(*ytTableRefPtr); - protoTableRef.MutableYtTableRef()->Swap(&protoYtTableRef); - } else { - auto* fmrTableRefPtr = std::get_if<TFmrTableRef>(&tableRef.TableRef); - NProto::TFmrTableRef protoFmrTableRef = FmrTableRefToProto(*fmrTableRefPtr); - protoTableRef.MutableFmrTableRef()->Swap(&protoFmrTableRef); - } - return protoTableRef; -} - -TTableRef TableRefFromProto(const NProto::TTableRef& protoTableRef) { - std::variant<TYtTableRef, TFmrTableRef> tableRef; - if (protoTableRef.HasYtTableRef()) { - tableRef = YtTableRefFromProto(protoTableRef.GetYtTableRef()); - } else { - tableRef = FmrTableRefFromProto(protoTableRef.GetFmrTableRef()); - } - return {tableRef}; -} - -NProto::TUploadTaskParams UploadTaskParamsToProto(const TUploadTaskParams& uploadTaskParams) { - NProto::TUploadTaskParams protoUploadTaskParams; - auto input = FmrTableRefToProto(uploadTaskParams.Input); - auto output = YtTableRefToProto(uploadTaskParams.Output); - protoUploadTaskParams.MutableInput()->Swap(&input); - protoUploadTaskParams.MutableOutput()->Swap(&output); - return protoUploadTaskParams; -} - -TUploadTaskParams UploadTaskParamsFromProto(const NProto::TUploadTaskParams& protoUploadTaskParams) { - TUploadTaskParams uploadTaskParams; - uploadTaskParams.Input = FmrTableRefFromProto(protoUploadTaskParams.GetInput()); - uploadTaskParams.Output = YtTableRefFromProto(protoUploadTaskParams.GetOutput()); - return uploadTaskParams; -} - -NProto::TDownloadTaskParams DownloadTaskParamsToProto(const TDownloadTaskParams& downloadTaskParams) { - NProto::TDownloadTaskParams protoDownloadTaskParams; - auto input = YtTableRefToProto(downloadTaskParams.Input); - auto output = FmrTableRefToProto(downloadTaskParams.Output); - protoDownloadTaskParams.MutableInput()->Swap(&input); - protoDownloadTaskParams.MutableOutput()->Swap(&output); - return protoDownloadTaskParams; -} - -TDownloadTaskParams DownloadTaskParamsFromProto(const NProto::TDownloadTaskParams& protoDownloadTaskParams) { - TDownloadTaskParams downloadTaskParams; - downloadTaskParams.Input = YtTableRefFromProto(protoDownloadTaskParams.GetInput()); - downloadTaskParams.Output = FmrTableRefFromProto(protoDownloadTaskParams.GetOutput()); - return downloadTaskParams; -} - -NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTaskParams) { - NProto::TMergeTaskParams protoMergeTaskParams; - std::vector<NProto::TTableRef> inputTables; - for (size_t i = 0; i < mergeTaskParams.Input.size(); ++i) { - auto inputTable = TableRefToProto(mergeTaskParams.Input[i]); - auto* curInput = protoMergeTaskParams.AddInput(); - curInput->Swap(&inputTable); - } - auto outputTable = FmrTableRefToProto(mergeTaskParams.Output); - protoMergeTaskParams.MutableOutput()->Swap(&outputTable); - return protoMergeTaskParams; -} - -TMergeTaskParams MergeTaskParamsFromProto(const NProto::TMergeTaskParams& protoMergeTaskParams) { - TMergeTaskParams mergeTaskParams; - std::vector<TTableRef> input; - for (size_t i = 0; i < protoMergeTaskParams.InputSize(); ++i) { - TTableRef inputTable = TableRefFromProto(protoMergeTaskParams.GetInput(i)); - input.emplace_back(inputTable); - } - mergeTaskParams.Input = input; - mergeTaskParams.Output = FmrTableRefFromProto(protoMergeTaskParams.GetOutput()); - return mergeTaskParams; -} - -NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams) { - NProto::TTaskParams protoTaskParams; - if (auto* uploadTaskParamsPtr = std::get_if<TUploadTaskParams>(&taskParams)) { - NProto::TUploadTaskParams protoUploadTaskParams = UploadTaskParamsToProto(*uploadTaskParamsPtr); - protoTaskParams.MutableUploadTaskParams()->Swap(&protoUploadTaskParams); - } else if (auto* downloadTaskParamsPtr = std::get_if<TDownloadTaskParams>(&taskParams)) { - NProto::TDownloadTaskParams protoDownloadTaskParams = DownloadTaskParamsToProto(*downloadTaskParamsPtr); - protoTaskParams.MutableDownloadTaskParams()->Swap(&protoDownloadTaskParams); - } else { - auto* mergeTaskParamsPtr = std::get_if<TMergeTaskParams>(&taskParams); - NProto::TMergeTaskParams protoMergeTaskParams = MergeTaskParamsToProto(*mergeTaskParamsPtr); - protoTaskParams.MutableMergeTaskParams()->Swap(&protoMergeTaskParams); - } - return protoTaskParams; -} - -TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) { - TTaskParams taskParams; - if (protoTaskParams.HasDownloadTaskParams()) { - taskParams = DownloadTaskParamsFromProto(protoTaskParams.GetDownloadTaskParams()); - } else if (protoTaskParams.HasUploadTaskParams()) { - taskParams = UploadTaskParamsFromProto(protoTaskParams.GetUploadTaskParams()); - } else { - taskParams = MergeTaskParamsFromProto(protoTaskParams.GetMergeTaskParams()); - } - return taskParams; -} - -NProto::TTask TaskToProto(const TTask& task) { - NProto::TTask protoTask; - protoTask.SetTaskType(static_cast<NProto::ETaskType>(task.TaskType)); - protoTask.SetTaskId(task.TaskId); - auto taskParams = TaskParamsToProto(task.TaskParams); - protoTask.MutableTaskParams()->Swap(&taskParams); - protoTask.SetSessionId(task.SessionId); - protoTask.SetNumRetries(task.NumRetries); - return protoTask; -} - -TTask TaskFromProto(const NProto::TTask& protoTask) { - TTask task; - task.TaskType = static_cast<ETaskType>(protoTask.GetTaskType()); - task.TaskId = protoTask.GetTaskId(); - task.TaskParams = TaskParamsFromProto(protoTask.GetTaskParams()); - task.SessionId = protoTask.GetSessionId(); - task.NumRetries = protoTask.GetNumRetries(); - return task; -} - -NProto::TTaskState TaskStateToProto(const TTaskState& taskState) { - NProto::TTaskState protoTaskState; - protoTaskState.SetTaskStatus(static_cast<NProto::ETaskStatus>(taskState.TaskStatus)); - protoTaskState.SetTaskId(taskState.TaskId); - if (taskState.TaskErrorMessage) { - auto fmrError = FmrErrorToProto(*taskState.TaskErrorMessage); - protoTaskState.MutableTaskErrorMessage()->Swap(&fmrError); - } - return protoTaskState; -} - -TTaskState TaskStateFromProto(const NProto::TTaskState& protoTaskState) { - TTaskState taskState; - taskState.TaskStatus = static_cast<ETaskStatus>(protoTaskState.GetTaskStatus()); - taskState.TaskId = protoTaskState.GetTaskId(); - if (protoTaskState.HasTaskErrorMessage()) { - taskState.TaskErrorMessage = FmrErrorFromProto(protoTaskState.GetTaskErrorMessage()); - } - return taskState; -} +NProto::TFmrError FmrErrorToProto(const TFmrError& error); + +TFmrError FmrErrorFromProto(const NProto::TFmrError& protoError); + +NProto::TYtTableRef YtTableRefToProto(const TYtTableRef& ytTableRef); + +TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef); + +NProto::TFmrTableRef FmrTableRefToProto(const TFmrTableRef& fmrTableRef); + +TFmrTableRef FmrTableRefFromProto(const NProto::TFmrTableRef protoFmrTableRef); + +NProto::TTableRef TableRefToProto(const TTableRef& tableRef); + +TTableRef TableRefFromProto(const NProto::TTableRef& protoTableRef); + +NProto::TUploadTaskParams UploadTaskParamsToProto(const TUploadTaskParams& uploadTaskParams); + +TUploadTaskParams UploadTaskParamsFromProto(const NProto::TUploadTaskParams& protoUploadTaskParams); + +NProto::TDownloadTaskParams DownloadTaskParamsToProto(const TDownloadTaskParams& downloadTaskParams); + +TDownloadTaskParams DownloadTaskParamsFromProto(const NProto::TDownloadTaskParams& protoDownloadTaskParams); + +NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTaskParams); + +TMergeTaskParams MergeTaskParamsFromProto(const NProto::TMergeTaskParams& protoMergeTaskParams); + +NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams); + +TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams); + +NProto::TTask TaskToProto(const TTask& task); + +TTask TaskFromProto(const NProto::TTask& protoTask); + +NProto::TTaskState TaskStateToProto(const TTaskState& taskState); + +TTaskState TaskStateFromProto(const NProto::TTaskState& protoTaskState); } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make index 3e12787010..790f845c3e 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make +++ b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make @@ -5,7 +5,6 @@ SRCS( ) PEERDIR( - library/cpp/yt/assert yt/yql/providers/yt/fmr/coordinator/impl yt/yql/providers/yt/fmr/job_factory/impl yt/yql/providers/yt/fmr/worker/impl diff --git a/yt/yql/providers/yt/fmr/worker/impl/ya.make b/yt/yql/providers/yt/fmr/worker/impl/ya.make index 28cfcbea66..9eb66cb8dd 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/ya.make +++ b/yt/yql/providers/yt/fmr/worker/impl/ya.make @@ -7,7 +7,8 @@ SRCS( PEERDIR( library/cpp/random_provider library/cpp/threading/future - yt/yql/providers/yt/fmr/worker/interface + yt/yql/providers/yt/fmr/coordinator/interface + yt/yql/providers/yt/fmr/job_factory/interface yql/essentials/utils ) diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp index 9502faf806..a1a3a186fe 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp +++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp @@ -133,7 +133,7 @@ private: } // namespace IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings) { - return MakeIntrusive<TFmrWorker>(coordinator, jobFactory, settings); + return MakeHolder<TFmrWorker>(coordinator, jobFactory, settings); } } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h index cbeff4bbf5..bff6ddff9e 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h +++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h @@ -1,14 +1,18 @@ #include <library/cpp/random_provider/random_provider.h> -#include <yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h> +#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> +#include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h> +#include <yql/essentials/utils/runnable.h> namespace NYql::NFmr { struct TFmrWorkerSettings { ui32 WorkerId; - TIntrusivePtr<IRandomProvider> RandomProvider; + TIntrusivePtr<IRandomProvider> RandomProvider = CreateDefaultRandomProvider(); TDuration TimeToSleepBetweenRequests = TDuration::Seconds(1); }; -IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator,IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings); +using IFmrWorker = IRunnable; + +IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings); } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/worker/interface/ya.make b/yt/yql/providers/yt/fmr/worker/interface/ya.make deleted file mode 100644 index 2042b984d0..0000000000 --- a/yt/yql/providers/yt/fmr/worker/interface/ya.make +++ /dev/null @@ -1,15 +0,0 @@ -LIBRARY() - -SRCS( - yql_yt_worker.cpp -) - -PEERDIR( - library/cpp/threading/future - yt/yql/providers/yt/fmr/coordinator/interface - yt/yql/providers/yt/fmr/job_factory/interface -) - -YQL_LAST_ABI_VERSION() - -END() diff --git a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.cpp b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.cpp deleted file mode 100644 index 4c6001a217..0000000000 --- a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.cpp +++ /dev/null @@ -1 +0,0 @@ -#include "yql_yt_worker.h" diff --git a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h deleted file mode 100644 index 751324e795..0000000000 --- a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h +++ /dev/null @@ -1,19 +0,0 @@ -#include <library/cpp/threading/future/core/future.h> - -#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> -#include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h> - -namespace NYql::NFmr { - -class IFmrWorker: public TThrRefBase { -public: - using TPtr = TIntrusivePtr<IFmrWorker>; - - virtual ~IFmrWorker() = default; - - virtual void Start() = 0; - - virtual void Stop() = 0; -}; - -} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp index 039b3747d1..db00f2ff59 100644 --- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp @@ -38,11 +38,12 @@ public: auto getOperationStatusesFunc = [&] { while (!StopFmrGateway_) { with_lock(SessionStates_->Mutex) { - auto checkOperationStatuses = [&] <typename T> (std::unordered_map<TString, TPromise<T>>& operationStatuses) { - std::vector<TString> completedOperationIds; + auto checkOperationStatuses = [&] <typename T> (std::unordered_map<TString, TPromise<T>>& operationStatuses, const TString& sessionId) { for (auto& [operationId, promise]: operationStatuses) { + YQL_CLOG(DEBUG, FastMapReduce) << "Sending get operation request to coordinator with operationId: " << operationId; + auto getOperationFuture = Coordinator_->GetOperation({operationId}); - getOperationFuture.Subscribe([&, operationId] (const auto& getFuture) { + getOperationFuture.Subscribe([&, operationId, sessionId] (const auto& getFuture) { auto getOperationResult = getFuture.GetValueSync(); auto getOperationStatus = getOperationResult.Status; auto operationErrorMessages = getOperationResult.ErrorMessages; @@ -52,22 +53,30 @@ public: // operation finished, set value in future returned in Publish / Download bool hasCompletedSuccessfully = getOperationStatus == EOperationStatus::Completed; SendOperationCompletionSignal(promise, hasCompletedSuccessfully, operationErrorMessages); - completedOperationIds.emplace_back(operationId); + YQL_CLOG(DEBUG, FastMapReduce) << "Sending delete operation request to coordinator with operationId: " << operationId; + auto deleteOperationFuture = Coordinator_->DeleteOperation({operationId}); + deleteOperationFuture.Subscribe([&, sessionId, operationId] (const auto& deleteFuture) { + auto deleteOperationResult = deleteFuture.GetValueSync(); + auto deleteOperationStatus = deleteOperationResult.Status; + YQL_ENSURE(deleteOperationStatus == EOperationStatus::Aborted || deleteOperationStatus == EOperationStatus::NotFound); + with_lock(SessionStates_->Mutex) { + YQL_ENSURE( SessionStates_->Sessions.contains(sessionId)); + auto& sessionInfo = SessionStates_->Sessions[sessionId]; + auto& operationStates = sessionInfo.OperationStates; + operationStates.DownloadOperationStatuses.erase(operationId); + operationStates.UploadOperationStatuses.erase(operationId); + } + }); } } }); } - - for (auto& operationId: completedOperationIds) { - Coordinator_->DeleteOperation({operationId}).GetValueSync(); - operationStatuses.erase(operationId); - } }; - for (auto& [sessionId, sessionInfo]: SessionStates_->Sessions) { + for (auto [sessionId, sessionInfo]: SessionStates_->Sessions) { auto& operationStates = sessionInfo.OperationStates; - checkOperationStatuses(operationStates.DownloadOperationStatuses); - checkOperationStatuses(operationStates.UploadOperationStatuses); + checkOperationStatuses(operationStates.DownloadOperationStatuses, sessionId); + checkOperationStatuses(operationStates.UploadOperationStatuses, sessionId); } } Sleep(TimeToSleepBetweenGetOperationRequests_); @@ -82,6 +91,7 @@ public: } TFuture<TPublishResult> Publish(const TExprNode::TPtr& node, TExprContext& ctx, TPublishOptions&& options) final { + YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); if (!Coordinator_) { return Slave_->Publish(node, ctx, std::move(options)); } @@ -98,7 +108,6 @@ public: TFuture<void> downloadedSuccessfully; TString sessionId = options.SessionId(); - YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__, sessionId); with_lock(SessionStates_->Mutex) { auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses; @@ -130,9 +139,11 @@ public: auto promise = NewPromise<TPublishResult>(); auto future = promise.GetFuture(); - auto startOperationResponseFuture = Coordinator_->StartOperation(uploadRequest); - startOperationResponseFuture.Subscribe([this, promise = std::move(promise), &sessionId] (const auto& startFuture) { - TString operationId = startFuture.GetValueSync().OperationId; + YQL_CLOG(DEBUG, FastMapReduce) << "Starting upload to yt table: " << cluster + "." + outputPath; + auto uploadOperationResponseFuture = Coordinator_->StartOperation(uploadRequest); + uploadOperationResponseFuture.Subscribe([this, promise = std::move(promise), sessionId] (const auto& uploadFuture) { + TStartOperationResponse startOperationResponse = uploadFuture.GetValueSync(); + TString operationId = startOperationResponse.OperationId; with_lock(SessionStates_->Mutex) { YQL_ENSURE(SessionStates_->Sessions.contains(sessionId)); auto& operationStates = SessionStates_->Sessions[sessionId].OperationStates; @@ -156,14 +167,15 @@ public: .TaskType = ETaskType::Download, .TaskParams = downloadTaskParams, .SessionId = sessionId, .IdempotencyKey=idempotencyKey, .NumRetries=1 }; - YQL_CLOG(INFO, FastMapReduce) << "Downloading table with cluster " << ytTableRef.Cluster << " and path " << ytTableRef.Path << " from yt to fmr"; + YQL_CLOG(DEBUG, FastMapReduce) << "Starting download to from yt table: " << fmrTableId; auto promise = NewPromise<TDownloadTableToFmrResult>(); auto future = promise.GetFuture(); - auto startOperationResponseFuture = Coordinator_->StartOperation(downloadRequest); - startOperationResponseFuture.Subscribe([this, promise = std::move(promise), &sessionId] (const auto& startFuture) { - TString operationId = startFuture.GetValueSync().OperationId; + auto downloadOperationResponseFuture = Coordinator_->StartOperation(downloadRequest); + downloadOperationResponseFuture.Subscribe([this, promise = std::move(promise), sessionId] (const auto& downloadFuture) { + TStartOperationResponse downloadOperationResponse = downloadFuture.GetValueSync(); + TString operationId = downloadOperationResponse.OperationId; with_lock(SessionStates_->Mutex) { auto& operationStates = SessionStates_->Sessions[sessionId].OperationStates; auto& downloadOperationStatuses = operationStates.DownloadOperationStatuses; @@ -219,13 +231,9 @@ public: cancelOperationsFutures.emplace_back(Coordinator_->DeleteOperation({operationId})); } NThreading::WaitAll(cancelOperationsFutures).GetValueSync(); - std::vector<TFuture<T>> resultFutures; - for (auto& [operationId, promise]: operationStatuses) { SendOperationCompletionSignal(promise, false); - resultFutures.emplace_back(promise.GetFuture()); } - NThreading::WaitAll(resultFutures).GetValueSync(); }; cancelOperationsFunc(operationStates.DownloadOperationStatuses); @@ -265,7 +273,7 @@ private: template <std::derived_from<NCommon::TOperationResult> T> void SendOperationCompletionSignal(TPromise<T> promise, bool completedSuccessfully = false, const std::vector<TFmrError>& errorMessages = {}) { YQL_ENSURE(!promise.HasValue()); - T commonOperationResult; + T commonOperationResult{}; if (completedSuccessfully) { commonOperationResult.SetSuccess(); } else if (!errorMessages.empty()) { @@ -279,7 +287,7 @@ private: } // namespace IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave, IFmrCoordinator::TPtr coordinator, const TFmrYtGatewaySettings& settings) { - return MakeIntrusive<TFmrYtGateway>(std::move(slave), std::move(coordinator), settings); + return MakeIntrusive<TFmrYtGateway>(std::move(slave), coordinator, settings); } } // namespace NYql::NFmr diff --git a/yt/yql/tools/ytrun/lib/ya.make b/yt/yql/tools/ytrun/lib/ya.make index 93b20d7ed0..007272ae8e 100644 --- a/yt/yql/tools/ytrun/lib/ya.make +++ b/yt/yql/tools/ytrun/lib/ya.make @@ -6,6 +6,7 @@ SRCS( PEERDIR( yt/yql/providers/yt/provider + yt/yql/providers/yt/fmr/coordinator/client yt/yql/providers/yt/fmr/coordinator/impl yt/yql/providers/yt/fmr/job_factory/impl yt/yql/providers/yt/fmr/worker/impl diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp index ed5a86fc0b..9726d00d5a 100644 --- a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp +++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp @@ -8,6 +8,7 @@ #include <yt/yql/providers/yt/lib/log/yt_logger.h> #include <yt/yql/providers/yt/gateway/native/yql_yt_native.h> #include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h> +#include <yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h> #include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h> #include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> #include <yql/essentials/providers/common/provider/yql_provider_names.h> @@ -113,6 +114,13 @@ TYtRunTool::TYtRunTool(TString name) .Optional() .NoArgument() .SetFlag(&GetRunOptions().UseMetaFromGrpah); + opts.AddLongOption("fmr-coordinator-server-url", "Fast map reduce coordinator server url") + .Optional() + .StoreResult(&FmrCoordinatorServerUrl_); + opts.AddLongOption("disable-local-fmr-worker", "Disable local fast map reduce worker") + .Optional() + .NoArgument() + .SetFlag(&DisableLocalFmrWorker_); }); GetRunOptions().AddOptHandler([this](const NLastGetopt::TOptsParseResult& res) { @@ -171,27 +179,40 @@ IYtGateway::TPtr TYtRunTool::CreateYtGateway() { services.FunctionRegistry = GetFuncRegistry().Get(); services.FileStorage = GetFileStorage(); services.Config = std::make_shared<TYtGatewayConfig>(GetRunOptions().GatewaysConfig->GetYt()); - auto ytGateway = CreateYtNativeGateway(services); + auto ytGateway = CreateYtNativeGateway(services); if (!GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName)) { return ytGateway; } auto coordinator = NFmr::MakeFmrCoordinator(); - auto func = [&] (NFmr::TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { - while (!cancelFlag->load()) { - Sleep(TDuration::Seconds(3)); - return NFmr::ETaskStatus::Completed; + if (!FmrCoordinatorServerUrl_.empty()) { + NFmr::TFmrCoordinatorClientSettings coordinatorClientSettings; + THttpURL parsedUrl; + if (parsedUrl.Parse(FmrCoordinatorServerUrl_) != THttpURL::ParsedOK) { + ythrow yexception() << "Invalid fast map reduce coordinator server url passed in parameters"; } - return NFmr::ETaskStatus::Aborted; - }; // TODO - use function which actually calls Downloader/Uploader based on task params - - NFmr::TFmrJobFactorySettings settings{.Function=func}; - auto jobFactory = MakeFmrJobFactory(settings); - NFmr::TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDefaultRandomProvider(), - .TimeToSleepBetweenRequests=TDuration::Seconds(1)}; - FmrWorker_ = MakeFmrWorker(coordinator, jobFactory, workerSettings); - FmrWorker_->Start(); - return CreateYtFmrGateway(ytGateway, coordinator); + coordinatorClientSettings.Port = parsedUrl.GetPort(); + coordinatorClientSettings.Host = parsedUrl.GetHost(); + coordinator = NFmr::MakeFmrCoordinatorClient(coordinatorClientSettings); + } + + if (!DisableLocalFmrWorker_) { + auto func = [&] (NFmr::TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { + while (!cancelFlag->load()) { + Sleep(TDuration::Seconds(3)); + return NFmr::ETaskStatus::Completed; + } + return NFmr::ETaskStatus::Aborted; + }; // TODO - use function which actually calls Downloader/Uploader based on task params + + NFmr::TFmrJobFactorySettings settings{.Function=func}; + auto jobFactory = MakeFmrJobFactory(settings); + NFmr::TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDefaultRandomProvider(), + .TimeToSleepBetweenRequests=TDuration::Seconds(1)}; + FmrWorker_ = MakeFmrWorker(coordinator, jobFactory, workerSettings); + FmrWorker_->Start(); + } + return NFmr::CreateYtFmrGateway(ytGateway, coordinator); } IOptimizerFactory::TPtr TYtRunTool::CreateCboFactory() { diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.h b/yt/yql/tools/ytrun/lib/ytrun_lib.h index 96c62a2c33..cb72e40304 100644 --- a/yt/yql/tools/ytrun/lib/ytrun_lib.h +++ b/yt/yql/tools/ytrun/lib/ytrun_lib.h @@ -34,6 +34,8 @@ protected: bool KeepTemp_ = false; TString DefYtServer_; NFmr::IFmrWorker::TPtr FmrWorker_; + TString FmrCoordinatorServerUrl_; + bool DisableLocalFmrWorker_ = false; }; } // NYql |