diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-02-11 17:46:25 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-02-11 18:07:00 +0300 |
commit | 8b363a039a2bc94a03da342d5cffe4b723e63a21 (patch) | |
tree | f6beda764d8c16b1712c9369875b4475ef7cbe3d | |
parent | 749393b0956db42603f026990b66966bf84bffa1 (diff) | |
download | ydb-8b363a039a2bc94a03da342d5cffe4b723e63a21.tar.gz |
Intermediate changes
commit_hash:d8d82bf12a346219a4f7637796a6d71aab5fda3c
26 files changed, 444 insertions, 44 deletions
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp index 9ba6dfed78..a934fcc37b 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp @@ -8,7 +8,7 @@ #include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> #include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h> -namespace NYql { +namespace NYql::NFmr { class TFmrWorkerProxy: public IFmrWorker { public: @@ -311,4 +311,4 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { } } -} // namspace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp index b04814c354..9205eae70d 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp @@ -3,7 +3,7 @@ #include <yql/essentials/utils/yql_panic.h> #include "yql_yt_coordinator_impl.h" -namespace NYql { +namespace NYql::NFmr { namespace { @@ -243,4 +243,4 @@ IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings return MakeIntrusive<TFmrCoordinator>(settings); } -} // namespace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h index 39ff6b1ae6..5a8f6eee55 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h @@ -4,7 +4,7 @@ #include <util/generic/queue.h> #include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> -namespace NYql { +namespace NYql::NFmr { struct TFmrCoordinatorSettings { ui32 WorkersNum; // Not supported yet @@ -15,4 +15,4 @@ struct TFmrCoordinatorSettings { IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings = {.WorkersNum = 1, .RandomProvider = CreateDeterministicRandomProvider(2)}); -} // namspace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h index 306ccd7db7..8be5d757c2 100644 --- a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h +++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h @@ -5,7 +5,7 @@ #include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> -namespace NYql { +namespace NYql::NFmr { struct THeartbeatRequest { ui32 WorkerId; @@ -65,4 +65,4 @@ public: virtual NThreading::TFuture<THeartbeatResponse> SendHeartbeatResponse(const THeartbeatRequest& request) = 0; }; -} // namspace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make index 597cdac1a4..3beaf40ddf 100644 --- a/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make +++ b/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make @@ -5,7 +5,6 @@ SRCS( ) PEERDIR( - library/cpp/yt/assert yt/yql/providers/yt/fmr/job_factory/interface yt/yql/providers/yt/fmr/job_factory/impl ) diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp index 89369f7e60..89e95debdd 100644 --- a/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp +++ b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp @@ -7,7 +7,7 @@ #include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> #include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> -namespace NYql { +namespace NYql::NFmr { Y_UNIT_TEST_SUITE(FmrFactoryTests) { Y_UNIT_TEST(GetSuccessfulTaskResult) { @@ -61,4 +61,4 @@ Y_UNIT_TEST_SUITE(FmrFactoryTests) { } } -} // namspace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp index f2762857b0..af154cb442 100644 --- a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp +++ b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp @@ -2,7 +2,7 @@ #include <yql/essentials/utils/log/log.h> #include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> -namespace NYql { +namespace NYql::NFmr { class TFmrJobFactory: public IFmrJobFactory { public: @@ -59,4 +59,4 @@ TFmrJobFactory::TPtr MakeFmrJobFactory(const TFmrJobFactorySettings& settings) { return MakeIntrusive<TFmrJobFactory>(settings); } -} // namespace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h index 00a29f95a6..24c5fb5e9f 100644 --- a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h +++ b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h @@ -4,7 +4,7 @@ #include <util/thread/pool.h> #include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h> -namespace NYql { +namespace NYql::NFmr { struct TFmrJobFactorySettings { ui32 NumThreads = 3; @@ -13,4 +13,4 @@ struct TFmrJobFactorySettings { IFmrJobFactory::TPtr MakeFmrJobFactory(const TFmrJobFactorySettings& settings); -} // namepspace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h index 387286687b..d7c29fc818 100644 --- a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h +++ b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h @@ -3,7 +3,7 @@ #include <library/cpp/threading/future/core/future.h> #include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> -namespace NYql { +namespace NYql::NFmr { class IFmrJobFactory: public TThrRefBase { public: @@ -14,4 +14,4 @@ public: virtual NThreading::TFuture<TTaskResult::TPtr> StartJob(TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) = 0; }; -} // namspace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/proto/coordinator.proto b/yt/yql/providers/yt/fmr/proto/coordinator.proto new file mode 100644 index 0000000000..930b4b3bd0 --- /dev/null +++ b/yt/yql/providers/yt/fmr/proto/coordinator.proto @@ -0,0 +1,47 @@ +syntax = "proto3"; + +package NYql.NFmr.NProto; + +import "yt/yql/providers/yt/fmr/proto/request_options.proto"; + +message THeartbeatRequest { + uint32 WorkerId = 1; + string VolatileId = 2; + repeated TTaskState TaskStates = 3; + TStatistics Statistics = 4; +} + +message THeartbeatResponse { + repeated TTask TasksToRun = 1; + repeated string TaskToDeleteIds = 2; +} + +message TStartOperationRequest { + ETaskType TaskType = 1; + TTaskParams TaskParams = 2; + string SessionId = 3; + optional string IdempotencyKey = 4; + uint32 NumRetries = 5; +} + +message TStartOperationResponse { + EOperationStatus Status = 1; + string OperationId = 2; +} + +message TGetOperationRequest { + string OperationId = 1; +} + +message TGetOperationResponse { + EOperationStatus Status = 1; + repeated TFmrError ErrorMessages = 2; +} + +message TDeleteOperationRequest { + string OperationId = 1; +} + +message TDeleteOperationResponse { + EOperationStatus Status = 1; +} diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto new file mode 100644 index 0000000000..e37583a0d2 --- /dev/null +++ b/yt/yql/providers/yt/fmr/proto/request_options.proto @@ -0,0 +1,101 @@ +syntax = "proto3"; + +package NYql.NFmr.NProto; + +enum EOperationStatus { + OPERATION_UNKNOWN = 0; + OPERATION_ACCEPTED = 1; + OPERATION_IN_PROGRESS = 2; + OPERATION_FAILED = 3; + OPERATION_COMPLETED = 4; + OPERATION_ABORTED = 5; + OPERATION_NOT_FOUND = 6; +} + +enum ETaskStatus { + TASK_UNKNOWN = 0; + TASK_ACCEPTED = 1; + TASK_IN_PROGRESS = 2; + TASK_FAILED = 3; + TASK_COMPLETED = 4; + TASK_ABORTED = 5; +} + +enum ETaskType { + TASK_TYPE_UNKNOWN = 0; + TASK_TYPE_DOWNLOAD = 1; + TASK_TYPE_UPLOAD = 2; + TASK_TYPE_MERGE = 3; +} + +enum EFmrComponent { + COMPONENT_UNKNOWN = 0; + COMPONENT_COORDINATOR = 1; + COMPONENT_WORKER = 2; + COMPONENT_JOB = 3; +} + +message TFmrError { + EFmrComponent Component = 1; + string ErrorMessage = 2; + optional uint32 WorkerId = 3; + optional string TaskId = 4; + optional string OperationId = 5; +} + +message TStatistics {} + +message TYtTableRef { + string Path = 1; + string Cluster = 2; + string TransactionId = 3; +} + +message TFmrTableRef { + string TableId = 1; +} + +message TTableRef { + oneof TableRef { + TYtTableRef YtTableRef = 1; + TFmrTableRef FmrTableRef = 2; + } +} + +message TUploadTaskParams { + TFmrTableRef Input = 1; + TYtTableRef Output = 2; +} + +message TDownloadTaskParams { + TYtTableRef Input = 1; + TFmrTableRef Output = 2; +} + +message TMergeTaskParams { + repeated TTableRef Input = 1; + TFmrTableRef Output = 2; +} + +message TTaskParams { + oneof TaskParams { + TUploadTaskParams UploadTaskParams = 1; + TDownloadTaskParams DownloadTaskParams = 2; + TMergeTaskParams MergeTaskParams = 3; + } +} + +message TTask { + ETaskType TaskType = 1; + string TaskId = 2; + TTaskParams TaskParams = 3; + string SessionId = 4; + optional uint32 NumRetries = 5; +} + +message TTaskState { + ETaskStatus TaskStatus = 1; + string TaskId = 2; + optional TFmrError TaskErrorMessage = 3; +} + diff --git a/yt/yql/providers/yt/fmr/proto/ya.make b/yt/yql/providers/yt/fmr/proto/ya.make new file mode 100644 index 0000000000..c581480e97 --- /dev/null +++ b/yt/yql/providers/yt/fmr/proto/ya.make @@ -0,0 +1,10 @@ +PROTO_LIBRARY() + +SRCS( + coordinator.proto + request_options.proto +) + +EXCLUDE_TAGS(GO_PROTO) + +END() diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make b/yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make new file mode 100644 index 0000000000..36f1b3ebca --- /dev/null +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + yql_yt_request_proto_helpers.cpp +) + +PEERDIR( + yt/yql/providers/yt/fmr/proto +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp new file mode 100644 index 0000000000..48dcad24c7 --- /dev/null +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp @@ -0,0 +1 @@ +#include "yql_yt_request_proto_helpers.h" diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h new file mode 100644 index 0000000000..abfa1b901c --- /dev/null +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h @@ -0,0 +1,219 @@ +#pragma once + +#include <variant> +#include <yt/yql/providers/yt/fmr/proto/request_options.pb.h> +#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> + +namespace NYql::NFmr { + +NProto::TFmrError FmrErrorToProto(const TFmrError& error) { + NProto::TFmrError protoError; + protoError.SetComponent(static_cast<NProto::EFmrComponent>(error.Component)); + protoError.SetErrorMessage(error.ErrorMessage); + if (error.WorkerId) { + protoError.SetWorkerId(*error.WorkerId); + } + if (error.TaskId) { + protoError.SetTaskId(*error.TaskId); + } + if (error.OperationId) { + protoError.SetOperationId(*error.OperationId); + } + return protoError; +} + +TFmrError FmrErrorFromProto(const NProto::TFmrError& protoError) { + TFmrError fmrError; + fmrError.Component = static_cast<EFmrComponent>(protoError.GetComponent()); + fmrError.ErrorMessage = protoError.GetErrorMessage(); + if (protoError.HasWorkerId()) { + fmrError.WorkerId = protoError.GetWorkerId(); + } + if (protoError.HasTaskId()) { + fmrError.TaskId = protoError.GetTaskId(); + } + if (protoError.HasOperationId()) { + fmrError.OperationId = protoError.GetOperationId(); + } + return fmrError; +} + +NProto::TYtTableRef YtTableRefToProto(const TYtTableRef& ytTableRef) { + NProto::TYtTableRef protoYtTableRef; + protoYtTableRef.SetPath(ytTableRef.Path); + protoYtTableRef.SetCluster(ytTableRef.Cluster); + protoYtTableRef.SetTransactionId(ytTableRef.TransactionId); + return protoYtTableRef; +} + +TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef) { + TYtTableRef ytTableRef; + ytTableRef.Path = protoYtTableRef.GetPath(); + ytTableRef.Cluster = protoYtTableRef.GetCluster(); + ytTableRef.TransactionId = protoYtTableRef.GetTransactionId(); + return ytTableRef; +} + +NProto::TFmrTableRef FmrTableRefToProto(const TFmrTableRef& fmrTableRef) { + NProto::TFmrTableRef protoFmrTableRef; + protoFmrTableRef.SetTableId(fmrTableRef.TableId); + return protoFmrTableRef; +} + +TFmrTableRef FmrTableRefFromProto(const NProto::TFmrTableRef protoFmrTableRef) { + TFmrTableRef fmrTableRef; + fmrTableRef.TableId = protoFmrTableRef.GetTableId(); + return fmrTableRef; +} + +NProto::TTableRef TableRefToProto(const TTableRef& tableRef) { + NProto::TTableRef protoTableRef; + if (auto* ytTableRefPtr = std::get_if<TYtTableRef>(&tableRef.TableRef)) { + NProto::TYtTableRef protoYtTableRef = YtTableRefToProto(*ytTableRefPtr); + protoTableRef.MutableYtTableRef()->Swap(&protoYtTableRef); + } else { + auto* fmrTableRefPtr = std::get_if<TFmrTableRef>(&tableRef.TableRef); + NProto::TFmrTableRef protoFmrTableRef = FmrTableRefToProto(*fmrTableRefPtr); + protoTableRef.MutableFmrTableRef()->Swap(&protoFmrTableRef); + } + return protoTableRef; +} + +TTableRef TableRefFromProto(const NProto::TTableRef& protoTableRef) { + std::variant<TYtTableRef, TFmrTableRef> tableRef; + if (protoTableRef.HasYtTableRef()) { + tableRef = YtTableRefFromProto(protoTableRef.GetYtTableRef()); + } else { + tableRef = FmrTableRefFromProto(protoTableRef.GetFmrTableRef()); + } + return {tableRef}; +} + +NProto::TUploadTaskParams UploadTaskParamsToProto(const TUploadTaskParams& uploadTaskParams) { + NProto::TUploadTaskParams protoUploadTaskParams; + auto input = FmrTableRefToProto(uploadTaskParams.Input); + auto output = YtTableRefToProto(uploadTaskParams.Output); + protoUploadTaskParams.MutableInput()->Swap(&input); + protoUploadTaskParams.MutableOutput()->Swap(&output); + return protoUploadTaskParams; +} + +TUploadTaskParams UploadTaskParamsFromProto(const NProto::TUploadTaskParams& protoUploadTaskParams) { + TUploadTaskParams uploadTaskParams; + uploadTaskParams.Input = FmrTableRefFromProto(protoUploadTaskParams.GetInput()); + uploadTaskParams.Output = YtTableRefFromProto(protoUploadTaskParams.GetOutput()); + return uploadTaskParams; +} + +NProto::TDownloadTaskParams DownloadTaskParamsToProto(const TDownloadTaskParams& downloadTaskParams) { + NProto::TDownloadTaskParams protoDownloadTaskParams; + auto input = YtTableRefToProto(downloadTaskParams.Input); + auto output = FmrTableRefToProto(downloadTaskParams.Output); + protoDownloadTaskParams.MutableInput()->Swap(&input); + protoDownloadTaskParams.MutableOutput()->Swap(&output); + return protoDownloadTaskParams; +} + +TDownloadTaskParams DownloadTaskParamsFromProto(const NProto::TDownloadTaskParams& protoDownloadTaskParams) { + TDownloadTaskParams downloadTaskParams; + downloadTaskParams.Input = YtTableRefFromProto(protoDownloadTaskParams.GetInput()); + downloadTaskParams.Output = FmrTableRefFromProto(protoDownloadTaskParams.GetOutput()); + return downloadTaskParams; +} + +NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTaskParams) { + NProto::TMergeTaskParams protoMergeTaskParams; + std::vector<NProto::TTableRef> inputTables; + for (size_t i = 0; i < mergeTaskParams.Input.size(); ++i) { + auto inputTable = TableRefToProto(mergeTaskParams.Input[i]); + auto* curInput = protoMergeTaskParams.AddInput(); + curInput->Swap(&inputTable); + } + auto outputTable = FmrTableRefToProto(mergeTaskParams.Output); + protoMergeTaskParams.MutableOutput()->Swap(&outputTable); + return protoMergeTaskParams; +} + +TMergeTaskParams MergeTaskParamsFromProto(const NProto::TMergeTaskParams& protoMergeTaskParams) { + TMergeTaskParams mergeTaskParams; + std::vector<TTableRef> input; + for (size_t i = 0; i < protoMergeTaskParams.InputSize(); ++i) { + TTableRef inputTable = TableRefFromProto(protoMergeTaskParams.GetInput(i)); + input.emplace_back(inputTable); + } + mergeTaskParams.Input = input; + mergeTaskParams.Output = FmrTableRefFromProto(protoMergeTaskParams.GetOutput()); + return mergeTaskParams; +} + +NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams) { + NProto::TTaskParams protoTaskParams; + if (auto* uploadTaskParamsPtr = std::get_if<TUploadTaskParams>(&taskParams)) { + NProto::TUploadTaskParams protoUploadTaskParams = UploadTaskParamsToProto(*uploadTaskParamsPtr); + protoTaskParams.MutableUploadTaskParams()->Swap(&protoUploadTaskParams); + } else if (auto* downloadTaskParamsPtr = std::get_if<TDownloadTaskParams>(&taskParams)) { + NProto::TDownloadTaskParams protoDownloadTaskParams = DownloadTaskParamsToProto(*downloadTaskParamsPtr); + protoTaskParams.MutableDownloadTaskParams()->Swap(&protoDownloadTaskParams); + } else { + auto* mergeTaskParamsPtr = std::get_if<TMergeTaskParams>(&taskParams); + NProto::TMergeTaskParams protoMergeTaskParams = MergeTaskParamsToProto(*mergeTaskParamsPtr); + protoTaskParams.MutableMergeTaskParams()->Swap(&protoMergeTaskParams); + } + return protoTaskParams; +} + +TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) { + TTaskParams taskParams; + if (protoTaskParams.HasDownloadTaskParams()) { + taskParams = DownloadTaskParamsFromProto(protoTaskParams.GetDownloadTaskParams()); + } else if (protoTaskParams.HasUploadTaskParams()) { + taskParams = UploadTaskParamsFromProto(protoTaskParams.GetUploadTaskParams()); + } else { + taskParams = MergeTaskParamsFromProto(protoTaskParams.GetMergeTaskParams()); + } + return taskParams; +} + +NProto::TTask TaskToProto(const TTask& task) { + NProto::TTask protoTask; + protoTask.SetTaskType(static_cast<NProto::ETaskType>(task.TaskType)); + protoTask.SetTaskId(task.TaskId); + auto taskParams = TaskParamsToProto(task.TaskParams); + protoTask.MutableTaskParams()->Swap(&taskParams); + protoTask.SetSessionId(task.SessionId); + protoTask.SetNumRetries(task.NumRetries); + return protoTask; +} + +TTask TaskFromProto(const NProto::TTask& protoTask) { + TTask task; + task.TaskType = static_cast<ETaskType>(protoTask.GetTaskType()); + task.TaskId = protoTask.GetTaskId(); + task.TaskParams = TaskParamsFromProto(protoTask.GetTaskParams()); + task.SessionId = protoTask.GetSessionId(); + task.NumRetries = protoTask.GetNumRetries(); + return task; +} + +NProto::TTaskState TaskStateToProto(const TTaskState& taskState) { + NProto::TTaskState protoTaskState; + protoTaskState.SetTaskStatus(static_cast<NProto::ETaskStatus>(taskState.TaskStatus)); + protoTaskState.SetTaskId(taskState.TaskId); + if (taskState.TaskErrorMessage) { + auto fmrError = FmrErrorToProto(*taskState.TaskErrorMessage); + protoTaskState.MutableTaskErrorMessage()->Swap(&fmrError); + } + return protoTaskState; +} + +TTaskState TaskStateFromProto(const NProto::TTaskState& protoTaskState) { + TTaskState taskState; + taskState.TaskStatus = static_cast<ETaskStatus>(protoTaskState.GetTaskStatus()); + taskState.TaskId = protoTaskState.GetTaskId(); + if (protoTaskState.HasTaskErrorMessage()) { + taskState.TaskErrorMessage = FmrErrorFromProto(protoTaskState.GetTaskErrorMessage()); + } + return taskState; +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/request_options/ya.make b/yt/yql/providers/yt/fmr/request_options/ya.make index 0010183a60..9e330848c2 100644 --- a/yt/yql/providers/yt/fmr/request_options/ya.make +++ b/yt/yql/providers/yt/fmr/request_options/ya.make @@ -13,3 +13,5 @@ YQL_LAST_ABI_VERSION() GENERATE_ENUM_SERIALIZATION(yql_yt_request_options.h) END() + +RECURSE(proto_helpers) diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp index 1e59e0dc8d..c73791bee4 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp @@ -1,6 +1,6 @@ #include "yql_yt_request_options.h" -namespace NYql { +namespace NYql::NFmr { TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId) { return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId); @@ -14,14 +14,14 @@ TTaskResult::TPtr MakeTaskResult(ETaskStatus taskStatus, const TMaybe<TFmrError> return MakeIntrusive<TTaskResult>(taskStatus, taskErrorMessage); } -} // namepsace NYql +} // namespace NYql::NFmr template<> -void Out<NYql::TFmrError>(IOutputStream& out, const NYql::TFmrError& error) { +void Out<NYql::NFmr::TFmrError>(IOutputStream& out, const NYql::NFmr::TFmrError& error) { out << "FmrError[" << error.Component << "]"; - if (error.Component == NYql::EFmrComponent::Worker) { + if (error.Component == NYql::NFmr::EFmrComponent::Worker) { out << "(TaskId: " << error.TaskId << " WorkerId: " << error.WorkerId << ") "; - } else if (error.Component == NYql::EFmrComponent::Coordinator) { + } else if (error.Component == NYql::NFmr::EFmrComponent::Coordinator) { out << "(OperationId: " << error.OperationId <<") "; } out << error.ErrorMessage; diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h index d8f21e5f1e..9d69d008b2 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h @@ -4,9 +4,10 @@ #include <util/generic/string.h> #include <vector> -namespace NYql { +namespace NYql::NFmr { enum class EOperationStatus { + Unknown, Accepted, InProgress, Failed, @@ -16,6 +17,7 @@ enum class EOperationStatus { }; enum class ETaskStatus { + Unknown, Accepted, InProgress, Failed, @@ -24,12 +26,14 @@ enum class ETaskStatus { }; enum class ETaskType { + Unknown, Download, Upload, Merge }; enum class EFmrComponent { + Unknown, Coordinator, Worker, Job @@ -78,6 +82,8 @@ struct TMergeTaskParams { using TTaskParams = std::variant<TUploadTaskParams, TDownloadTaskParams, TMergeTaskParams>; struct TTask: public TThrRefBase { + TTask() = default; + TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, ui32 numRetries = 1) : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), NumRetries(numRetries) { @@ -85,7 +91,7 @@ struct TTask: public TThrRefBase { ETaskType TaskType; TString TaskId; - TTaskParams TaskParams; + TTaskParams TaskParams = {}; TString SessionId; ui32 NumRetries; // Not supported yet @@ -93,6 +99,8 @@ struct TTask: public TThrRefBase { }; struct TTaskState: public TThrRefBase { + TTaskState() = default; + TTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& errorMessage = Nothing()) : TaskStatus(taskStatus), TaskId(taskId), TaskErrorMessage(errorMessage) { @@ -123,4 +131,4 @@ TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, co TTaskResult::TPtr MakeTaskResult(ETaskStatus taskStatus, const TMaybe<TFmrError>& taskErrorMessage = Nothing()); -} // namespace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp index 8077c9abea..c5af15bb3a 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp +++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp @@ -8,7 +8,7 @@ #include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h> #include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> -namespace NYql { +namespace NYql::NFmr { TDownloadTaskParams downloadTaskParams{ .Input = TYtTableRef{"Path","Cluster","TransactionId"}, @@ -98,4 +98,4 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) { } } -} // namspace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp index 00363e443f..9502faf806 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp +++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp @@ -5,7 +5,7 @@ #include <yql/essentials/utils/yql_panic.h> #include "yql_yt_worker_impl.h" -namespace NYql { +namespace NYql::NFmr { namespace { @@ -136,4 +136,4 @@ IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory return MakeIntrusive<TFmrWorker>(coordinator, jobFactory, settings); } -} // namspace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h index dc43652dea..cbeff4bbf5 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h +++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h @@ -1,7 +1,7 @@ #include <library/cpp/random_provider/random_provider.h> #include <yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h> -namespace NYql { +namespace NYql::NFmr { struct TFmrWorkerSettings { ui32 WorkerId; @@ -11,4 +11,4 @@ struct TFmrWorkerSettings { IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator,IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings); -} // namspace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h index d3d5690fbd..751324e795 100644 --- a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h +++ b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h @@ -3,7 +3,7 @@ #include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> #include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h> -namespace NYql { +namespace NYql::NFmr { class IFmrWorker: public TThrRefBase { public: @@ -16,4 +16,4 @@ public: virtual void Stop() = 0; }; -} // namspace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp index 266f3c8174..039b3747d1 100644 --- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp @@ -14,7 +14,7 @@ using namespace NThreading; using namespace NYql::NNodes; -namespace NYql { +namespace NYql::NFmr { namespace { @@ -282,4 +282,4 @@ IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave, IFmrCoordinator::TPt return MakeIntrusive<TFmrYtGateway>(std::move(slave), std::move(coordinator), settings); } -} // namspace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h index df82d50527..769bb24fe5 100644 --- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h @@ -3,7 +3,7 @@ #include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> #include <yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h> -namespace NYql { +namespace NYql::NFmr { struct TFmrYtGatewaySettings { TIntrusivePtr<IRandomProvider> RandomProvider; @@ -19,4 +19,4 @@ IYtGateway::TPtr CreateYtFmrGateway( } ); -} // namspace NYql +} // namespace NYql::NFmr diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp index c2064ebd85..ed5a86fc0b 100644 --- a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp +++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp @@ -176,18 +176,18 @@ IYtGateway::TPtr TYtRunTool::CreateYtGateway() { return ytGateway; } - auto coordinator = MakeFmrCoordinator(); - auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { + auto coordinator = NFmr::MakeFmrCoordinator(); + auto func = [&] (NFmr::TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { while (!cancelFlag->load()) { Sleep(TDuration::Seconds(3)); - return ETaskStatus::Completed; + return NFmr::ETaskStatus::Completed; } - return ETaskStatus::Aborted; + return NFmr::ETaskStatus::Aborted; }; // TODO - use function which actually calls Downloader/Uploader based on task params - TFmrJobFactorySettings settings{.Function=func}; + NFmr::TFmrJobFactorySettings settings{.Function=func}; auto jobFactory = MakeFmrJobFactory(settings); - TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDefaultRandomProvider(), + NFmr::TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDefaultRandomProvider(), .TimeToSleepBetweenRequests=TDuration::Seconds(1)}; FmrWorker_ = MakeFmrWorker(coordinator, jobFactory, workerSettings); FmrWorker_->Start(); diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.h b/yt/yql/tools/ytrun/lib/ytrun_lib.h index e724ce2ac6..96c62a2c33 100644 --- a/yt/yql/tools/ytrun/lib/ytrun_lib.h +++ b/yt/yql/tools/ytrun/lib/ytrun_lib.h @@ -33,7 +33,7 @@ protected: size_t NumThreads_ = 1; bool KeepTemp_ = false; TString DefYtServer_; - IFmrWorker::TPtr FmrWorker_; + NFmr::IFmrWorker::TPtr FmrWorker_; }; } // NYql |