aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2025-02-11 17:46:25 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-02-11 18:07:00 +0300
commit8b363a039a2bc94a03da342d5cffe4b723e63a21 (patch)
treef6beda764d8c16b1712c9369875b4475ef7cbe3d
parent749393b0956db42603f026990b66966bf84bffa1 (diff)
downloadydb-8b363a039a2bc94a03da342d5cffe4b723e63a21.tar.gz
Intermediate changes
commit_hash:d8d82bf12a346219a4f7637796a6d71aab5fda3c
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp4
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp4
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h4
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h4
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp4
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp4
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h4
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h4
-rw-r--r--yt/yql/providers/yt/fmr/proto/coordinator.proto47
-rw-r--r--yt/yql/providers/yt/fmr/proto/request_options.proto101
-rw-r--r--yt/yql/providers/yt/fmr/proto/ya.make10
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make13
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp1
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h219
-rw-r--r--yt/yql/providers/yt/fmr/request_options/ya.make2
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp10
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h14
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp4
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp4
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h4
-rw-r--r--yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h4
-rw-r--r--yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp4
-rw-r--r--yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h4
-rw-r--r--yt/yql/tools/ytrun/lib/ytrun_lib.cpp12
-rw-r--r--yt/yql/tools/ytrun/lib/ytrun_lib.h2
26 files changed, 444 insertions, 44 deletions
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
index 9ba6dfed78..a934fcc37b 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
@@ -8,7 +8,7 @@
#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h>
-namespace NYql {
+namespace NYql::NFmr {
class TFmrWorkerProxy: public IFmrWorker {
public:
@@ -311,4 +311,4 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
}
}
-} // namspace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
index b04814c354..9205eae70d 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
@@ -3,7 +3,7 @@
#include <yql/essentials/utils/yql_panic.h>
#include "yql_yt_coordinator_impl.h"
-namespace NYql {
+namespace NYql::NFmr {
namespace {
@@ -243,4 +243,4 @@ IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings
return MakeIntrusive<TFmrCoordinator>(settings);
}
-} // namespace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
index 39ff6b1ae6..5a8f6eee55 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
@@ -4,7 +4,7 @@
#include <util/generic/queue.h>
#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
-namespace NYql {
+namespace NYql::NFmr {
struct TFmrCoordinatorSettings {
ui32 WorkersNum; // Not supported yet
@@ -15,4 +15,4 @@ struct TFmrCoordinatorSettings {
IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings = {.WorkersNum = 1, .RandomProvider = CreateDeterministicRandomProvider(2)});
-} // namspace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
index 306ccd7db7..8be5d757c2 100644
--- a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
@@ -5,7 +5,7 @@
#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
-namespace NYql {
+namespace NYql::NFmr {
struct THeartbeatRequest {
ui32 WorkerId;
@@ -65,4 +65,4 @@ public:
virtual NThreading::TFuture<THeartbeatResponse> SendHeartbeatResponse(const THeartbeatRequest& request) = 0;
};
-} // namspace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make
index 597cdac1a4..3beaf40ddf 100644
--- a/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make
@@ -5,7 +5,6 @@ SRCS(
)
PEERDIR(
- library/cpp/yt/assert
yt/yql/providers/yt/fmr/job_factory/interface
yt/yql/providers/yt/fmr/job_factory/impl
)
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp
index 89369f7e60..89e95debdd 100644
--- a/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp
@@ -7,7 +7,7 @@
#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
-namespace NYql {
+namespace NYql::NFmr {
Y_UNIT_TEST_SUITE(FmrFactoryTests) {
Y_UNIT_TEST(GetSuccessfulTaskResult) {
@@ -61,4 +61,4 @@ Y_UNIT_TEST_SUITE(FmrFactoryTests) {
}
}
-} // namspace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp
index f2762857b0..af154cb442 100644
--- a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp
@@ -2,7 +2,7 @@
#include <yql/essentials/utils/log/log.h>
#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
-namespace NYql {
+namespace NYql::NFmr {
class TFmrJobFactory: public IFmrJobFactory {
public:
@@ -59,4 +59,4 @@ TFmrJobFactory::TPtr MakeFmrJobFactory(const TFmrJobFactorySettings& settings) {
return MakeIntrusive<TFmrJobFactory>(settings);
}
-} // namespace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h
index 00a29f95a6..24c5fb5e9f 100644
--- a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h
@@ -4,7 +4,7 @@
#include <util/thread/pool.h>
#include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h>
-namespace NYql {
+namespace NYql::NFmr {
struct TFmrJobFactorySettings {
ui32 NumThreads = 3;
@@ -13,4 +13,4 @@ struct TFmrJobFactorySettings {
IFmrJobFactory::TPtr MakeFmrJobFactory(const TFmrJobFactorySettings& settings);
-} // namepspace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h
index 387286687b..d7c29fc818 100644
--- a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h
+++ b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h
@@ -3,7 +3,7 @@
#include <library/cpp/threading/future/core/future.h>
#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
-namespace NYql {
+namespace NYql::NFmr {
class IFmrJobFactory: public TThrRefBase {
public:
@@ -14,4 +14,4 @@ public:
virtual NThreading::TFuture<TTaskResult::TPtr> StartJob(TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) = 0;
};
-} // namspace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/proto/coordinator.proto b/yt/yql/providers/yt/fmr/proto/coordinator.proto
new file mode 100644
index 0000000000..930b4b3bd0
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/proto/coordinator.proto
@@ -0,0 +1,47 @@
+syntax = "proto3";
+
+package NYql.NFmr.NProto;
+
+import "yt/yql/providers/yt/fmr/proto/request_options.proto";
+
+message THeartbeatRequest {
+ uint32 WorkerId = 1;
+ string VolatileId = 2;
+ repeated TTaskState TaskStates = 3;
+ TStatistics Statistics = 4;
+}
+
+message THeartbeatResponse {
+ repeated TTask TasksToRun = 1;
+ repeated string TaskToDeleteIds = 2;
+}
+
+message TStartOperationRequest {
+ ETaskType TaskType = 1;
+ TTaskParams TaskParams = 2;
+ string SessionId = 3;
+ optional string IdempotencyKey = 4;
+ uint32 NumRetries = 5;
+}
+
+message TStartOperationResponse {
+ EOperationStatus Status = 1;
+ string OperationId = 2;
+}
+
+message TGetOperationRequest {
+ string OperationId = 1;
+}
+
+message TGetOperationResponse {
+ EOperationStatus Status = 1;
+ repeated TFmrError ErrorMessages = 2;
+}
+
+message TDeleteOperationRequest {
+ string OperationId = 1;
+}
+
+message TDeleteOperationResponse {
+ EOperationStatus Status = 1;
+}
diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto
new file mode 100644
index 0000000000..e37583a0d2
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/proto/request_options.proto
@@ -0,0 +1,101 @@
+syntax = "proto3";
+
+package NYql.NFmr.NProto;
+
+enum EOperationStatus {
+ OPERATION_UNKNOWN = 0;
+ OPERATION_ACCEPTED = 1;
+ OPERATION_IN_PROGRESS = 2;
+ OPERATION_FAILED = 3;
+ OPERATION_COMPLETED = 4;
+ OPERATION_ABORTED = 5;
+ OPERATION_NOT_FOUND = 6;
+}
+
+enum ETaskStatus {
+ TASK_UNKNOWN = 0;
+ TASK_ACCEPTED = 1;
+ TASK_IN_PROGRESS = 2;
+ TASK_FAILED = 3;
+ TASK_COMPLETED = 4;
+ TASK_ABORTED = 5;
+}
+
+enum ETaskType {
+ TASK_TYPE_UNKNOWN = 0;
+ TASK_TYPE_DOWNLOAD = 1;
+ TASK_TYPE_UPLOAD = 2;
+ TASK_TYPE_MERGE = 3;
+}
+
+enum EFmrComponent {
+ COMPONENT_UNKNOWN = 0;
+ COMPONENT_COORDINATOR = 1;
+ COMPONENT_WORKER = 2;
+ COMPONENT_JOB = 3;
+}
+
+message TFmrError {
+ EFmrComponent Component = 1;
+ string ErrorMessage = 2;
+ optional uint32 WorkerId = 3;
+ optional string TaskId = 4;
+ optional string OperationId = 5;
+}
+
+message TStatistics {}
+
+message TYtTableRef {
+ string Path = 1;
+ string Cluster = 2;
+ string TransactionId = 3;
+}
+
+message TFmrTableRef {
+ string TableId = 1;
+}
+
+message TTableRef {
+ oneof TableRef {
+ TYtTableRef YtTableRef = 1;
+ TFmrTableRef FmrTableRef = 2;
+ }
+}
+
+message TUploadTaskParams {
+ TFmrTableRef Input = 1;
+ TYtTableRef Output = 2;
+}
+
+message TDownloadTaskParams {
+ TYtTableRef Input = 1;
+ TFmrTableRef Output = 2;
+}
+
+message TMergeTaskParams {
+ repeated TTableRef Input = 1;
+ TFmrTableRef Output = 2;
+}
+
+message TTaskParams {
+ oneof TaskParams {
+ TUploadTaskParams UploadTaskParams = 1;
+ TDownloadTaskParams DownloadTaskParams = 2;
+ TMergeTaskParams MergeTaskParams = 3;
+ }
+}
+
+message TTask {
+ ETaskType TaskType = 1;
+ string TaskId = 2;
+ TTaskParams TaskParams = 3;
+ string SessionId = 4;
+ optional uint32 NumRetries = 5;
+}
+
+message TTaskState {
+ ETaskStatus TaskStatus = 1;
+ string TaskId = 2;
+ optional TFmrError TaskErrorMessage = 3;
+}
+
diff --git a/yt/yql/providers/yt/fmr/proto/ya.make b/yt/yql/providers/yt/fmr/proto/ya.make
new file mode 100644
index 0000000000..c581480e97
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/proto/ya.make
@@ -0,0 +1,10 @@
+PROTO_LIBRARY()
+
+SRCS(
+ coordinator.proto
+ request_options.proto
+)
+
+EXCLUDE_TAGS(GO_PROTO)
+
+END()
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make b/yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make
new file mode 100644
index 0000000000..36f1b3ebca
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_request_proto_helpers.cpp
+)
+
+PEERDIR(
+ yt/yql/providers/yt/fmr/proto
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
new file mode 100644
index 0000000000..48dcad24c7
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
@@ -0,0 +1 @@
+#include "yql_yt_request_proto_helpers.h"
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h
new file mode 100644
index 0000000000..abfa1b901c
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h
@@ -0,0 +1,219 @@
+#pragma once
+
+#include <variant>
+#include <yt/yql/providers/yt/fmr/proto/request_options.pb.h>
+#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
+
+namespace NYql::NFmr {
+
+NProto::TFmrError FmrErrorToProto(const TFmrError& error) {
+ NProto::TFmrError protoError;
+ protoError.SetComponent(static_cast<NProto::EFmrComponent>(error.Component));
+ protoError.SetErrorMessage(error.ErrorMessage);
+ if (error.WorkerId) {
+ protoError.SetWorkerId(*error.WorkerId);
+ }
+ if (error.TaskId) {
+ protoError.SetTaskId(*error.TaskId);
+ }
+ if (error.OperationId) {
+ protoError.SetOperationId(*error.OperationId);
+ }
+ return protoError;
+}
+
+TFmrError FmrErrorFromProto(const NProto::TFmrError& protoError) {
+ TFmrError fmrError;
+ fmrError.Component = static_cast<EFmrComponent>(protoError.GetComponent());
+ fmrError.ErrorMessage = protoError.GetErrorMessage();
+ if (protoError.HasWorkerId()) {
+ fmrError.WorkerId = protoError.GetWorkerId();
+ }
+ if (protoError.HasTaskId()) {
+ fmrError.TaskId = protoError.GetTaskId();
+ }
+ if (protoError.HasOperationId()) {
+ fmrError.OperationId = protoError.GetOperationId();
+ }
+ return fmrError;
+}
+
+NProto::TYtTableRef YtTableRefToProto(const TYtTableRef& ytTableRef) {
+ NProto::TYtTableRef protoYtTableRef;
+ protoYtTableRef.SetPath(ytTableRef.Path);
+ protoYtTableRef.SetCluster(ytTableRef.Cluster);
+ protoYtTableRef.SetTransactionId(ytTableRef.TransactionId);
+ return protoYtTableRef;
+}
+
+TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef) {
+ TYtTableRef ytTableRef;
+ ytTableRef.Path = protoYtTableRef.GetPath();
+ ytTableRef.Cluster = protoYtTableRef.GetCluster();
+ ytTableRef.TransactionId = protoYtTableRef.GetTransactionId();
+ return ytTableRef;
+}
+
+NProto::TFmrTableRef FmrTableRefToProto(const TFmrTableRef& fmrTableRef) {
+ NProto::TFmrTableRef protoFmrTableRef;
+ protoFmrTableRef.SetTableId(fmrTableRef.TableId);
+ return protoFmrTableRef;
+}
+
+TFmrTableRef FmrTableRefFromProto(const NProto::TFmrTableRef protoFmrTableRef) {
+ TFmrTableRef fmrTableRef;
+ fmrTableRef.TableId = protoFmrTableRef.GetTableId();
+ return fmrTableRef;
+}
+
+NProto::TTableRef TableRefToProto(const TTableRef& tableRef) {
+ NProto::TTableRef protoTableRef;
+ if (auto* ytTableRefPtr = std::get_if<TYtTableRef>(&tableRef.TableRef)) {
+ NProto::TYtTableRef protoYtTableRef = YtTableRefToProto(*ytTableRefPtr);
+ protoTableRef.MutableYtTableRef()->Swap(&protoYtTableRef);
+ } else {
+ auto* fmrTableRefPtr = std::get_if<TFmrTableRef>(&tableRef.TableRef);
+ NProto::TFmrTableRef protoFmrTableRef = FmrTableRefToProto(*fmrTableRefPtr);
+ protoTableRef.MutableFmrTableRef()->Swap(&protoFmrTableRef);
+ }
+ return protoTableRef;
+}
+
+TTableRef TableRefFromProto(const NProto::TTableRef& protoTableRef) {
+ std::variant<TYtTableRef, TFmrTableRef> tableRef;
+ if (protoTableRef.HasYtTableRef()) {
+ tableRef = YtTableRefFromProto(protoTableRef.GetYtTableRef());
+ } else {
+ tableRef = FmrTableRefFromProto(protoTableRef.GetFmrTableRef());
+ }
+ return {tableRef};
+}
+
+NProto::TUploadTaskParams UploadTaskParamsToProto(const TUploadTaskParams& uploadTaskParams) {
+ NProto::TUploadTaskParams protoUploadTaskParams;
+ auto input = FmrTableRefToProto(uploadTaskParams.Input);
+ auto output = YtTableRefToProto(uploadTaskParams.Output);
+ protoUploadTaskParams.MutableInput()->Swap(&input);
+ protoUploadTaskParams.MutableOutput()->Swap(&output);
+ return protoUploadTaskParams;
+}
+
+TUploadTaskParams UploadTaskParamsFromProto(const NProto::TUploadTaskParams& protoUploadTaskParams) {
+ TUploadTaskParams uploadTaskParams;
+ uploadTaskParams.Input = FmrTableRefFromProto(protoUploadTaskParams.GetInput());
+ uploadTaskParams.Output = YtTableRefFromProto(protoUploadTaskParams.GetOutput());
+ return uploadTaskParams;
+}
+
+NProto::TDownloadTaskParams DownloadTaskParamsToProto(const TDownloadTaskParams& downloadTaskParams) {
+ NProto::TDownloadTaskParams protoDownloadTaskParams;
+ auto input = YtTableRefToProto(downloadTaskParams.Input);
+ auto output = FmrTableRefToProto(downloadTaskParams.Output);
+ protoDownloadTaskParams.MutableInput()->Swap(&input);
+ protoDownloadTaskParams.MutableOutput()->Swap(&output);
+ return protoDownloadTaskParams;
+}
+
+TDownloadTaskParams DownloadTaskParamsFromProto(const NProto::TDownloadTaskParams& protoDownloadTaskParams) {
+ TDownloadTaskParams downloadTaskParams;
+ downloadTaskParams.Input = YtTableRefFromProto(protoDownloadTaskParams.GetInput());
+ downloadTaskParams.Output = FmrTableRefFromProto(protoDownloadTaskParams.GetOutput());
+ return downloadTaskParams;
+}
+
+NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTaskParams) {
+ NProto::TMergeTaskParams protoMergeTaskParams;
+ std::vector<NProto::TTableRef> inputTables;
+ for (size_t i = 0; i < mergeTaskParams.Input.size(); ++i) {
+ auto inputTable = TableRefToProto(mergeTaskParams.Input[i]);
+ auto* curInput = protoMergeTaskParams.AddInput();
+ curInput->Swap(&inputTable);
+ }
+ auto outputTable = FmrTableRefToProto(mergeTaskParams.Output);
+ protoMergeTaskParams.MutableOutput()->Swap(&outputTable);
+ return protoMergeTaskParams;
+}
+
+TMergeTaskParams MergeTaskParamsFromProto(const NProto::TMergeTaskParams& protoMergeTaskParams) {
+ TMergeTaskParams mergeTaskParams;
+ std::vector<TTableRef> input;
+ for (size_t i = 0; i < protoMergeTaskParams.InputSize(); ++i) {
+ TTableRef inputTable = TableRefFromProto(protoMergeTaskParams.GetInput(i));
+ input.emplace_back(inputTable);
+ }
+ mergeTaskParams.Input = input;
+ mergeTaskParams.Output = FmrTableRefFromProto(protoMergeTaskParams.GetOutput());
+ return mergeTaskParams;
+}
+
+NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams) {
+ NProto::TTaskParams protoTaskParams;
+ if (auto* uploadTaskParamsPtr = std::get_if<TUploadTaskParams>(&taskParams)) {
+ NProto::TUploadTaskParams protoUploadTaskParams = UploadTaskParamsToProto(*uploadTaskParamsPtr);
+ protoTaskParams.MutableUploadTaskParams()->Swap(&protoUploadTaskParams);
+ } else if (auto* downloadTaskParamsPtr = std::get_if<TDownloadTaskParams>(&taskParams)) {
+ NProto::TDownloadTaskParams protoDownloadTaskParams = DownloadTaskParamsToProto(*downloadTaskParamsPtr);
+ protoTaskParams.MutableDownloadTaskParams()->Swap(&protoDownloadTaskParams);
+ } else {
+ auto* mergeTaskParamsPtr = std::get_if<TMergeTaskParams>(&taskParams);
+ NProto::TMergeTaskParams protoMergeTaskParams = MergeTaskParamsToProto(*mergeTaskParamsPtr);
+ protoTaskParams.MutableMergeTaskParams()->Swap(&protoMergeTaskParams);
+ }
+ return protoTaskParams;
+}
+
+TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) {
+ TTaskParams taskParams;
+ if (protoTaskParams.HasDownloadTaskParams()) {
+ taskParams = DownloadTaskParamsFromProto(protoTaskParams.GetDownloadTaskParams());
+ } else if (protoTaskParams.HasUploadTaskParams()) {
+ taskParams = UploadTaskParamsFromProto(protoTaskParams.GetUploadTaskParams());
+ } else {
+ taskParams = MergeTaskParamsFromProto(protoTaskParams.GetMergeTaskParams());
+ }
+ return taskParams;
+}
+
+NProto::TTask TaskToProto(const TTask& task) {
+ NProto::TTask protoTask;
+ protoTask.SetTaskType(static_cast<NProto::ETaskType>(task.TaskType));
+ protoTask.SetTaskId(task.TaskId);
+ auto taskParams = TaskParamsToProto(task.TaskParams);
+ protoTask.MutableTaskParams()->Swap(&taskParams);
+ protoTask.SetSessionId(task.SessionId);
+ protoTask.SetNumRetries(task.NumRetries);
+ return protoTask;
+}
+
+TTask TaskFromProto(const NProto::TTask& protoTask) {
+ TTask task;
+ task.TaskType = static_cast<ETaskType>(protoTask.GetTaskType());
+ task.TaskId = protoTask.GetTaskId();
+ task.TaskParams = TaskParamsFromProto(protoTask.GetTaskParams());
+ task.SessionId = protoTask.GetSessionId();
+ task.NumRetries = protoTask.GetNumRetries();
+ return task;
+}
+
+NProto::TTaskState TaskStateToProto(const TTaskState& taskState) {
+ NProto::TTaskState protoTaskState;
+ protoTaskState.SetTaskStatus(static_cast<NProto::ETaskStatus>(taskState.TaskStatus));
+ protoTaskState.SetTaskId(taskState.TaskId);
+ if (taskState.TaskErrorMessage) {
+ auto fmrError = FmrErrorToProto(*taskState.TaskErrorMessage);
+ protoTaskState.MutableTaskErrorMessage()->Swap(&fmrError);
+ }
+ return protoTaskState;
+}
+
+TTaskState TaskStateFromProto(const NProto::TTaskState& protoTaskState) {
+ TTaskState taskState;
+ taskState.TaskStatus = static_cast<ETaskStatus>(protoTaskState.GetTaskStatus());
+ taskState.TaskId = protoTaskState.GetTaskId();
+ if (protoTaskState.HasTaskErrorMessage()) {
+ taskState.TaskErrorMessage = FmrErrorFromProto(protoTaskState.GetTaskErrorMessage());
+ }
+ return taskState;
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/request_options/ya.make b/yt/yql/providers/yt/fmr/request_options/ya.make
index 0010183a60..9e330848c2 100644
--- a/yt/yql/providers/yt/fmr/request_options/ya.make
+++ b/yt/yql/providers/yt/fmr/request_options/ya.make
@@ -13,3 +13,5 @@ YQL_LAST_ABI_VERSION()
GENERATE_ENUM_SERIALIZATION(yql_yt_request_options.h)
END()
+
+RECURSE(proto_helpers)
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
index 1e59e0dc8d..c73791bee4 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
@@ -1,6 +1,6 @@
#include "yql_yt_request_options.h"
-namespace NYql {
+namespace NYql::NFmr {
TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId) {
return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId);
@@ -14,14 +14,14 @@ TTaskResult::TPtr MakeTaskResult(ETaskStatus taskStatus, const TMaybe<TFmrError>
return MakeIntrusive<TTaskResult>(taskStatus, taskErrorMessage);
}
-} // namepsace NYql
+} // namespace NYql::NFmr
template<>
-void Out<NYql::TFmrError>(IOutputStream& out, const NYql::TFmrError& error) {
+void Out<NYql::NFmr::TFmrError>(IOutputStream& out, const NYql::NFmr::TFmrError& error) {
out << "FmrError[" << error.Component << "]";
- if (error.Component == NYql::EFmrComponent::Worker) {
+ if (error.Component == NYql::NFmr::EFmrComponent::Worker) {
out << "(TaskId: " << error.TaskId << " WorkerId: " << error.WorkerId << ") ";
- } else if (error.Component == NYql::EFmrComponent::Coordinator) {
+ } else if (error.Component == NYql::NFmr::EFmrComponent::Coordinator) {
out << "(OperationId: " << error.OperationId <<") ";
}
out << error.ErrorMessage;
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
index d8f21e5f1e..9d69d008b2 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
@@ -4,9 +4,10 @@
#include <util/generic/string.h>
#include <vector>
-namespace NYql {
+namespace NYql::NFmr {
enum class EOperationStatus {
+ Unknown,
Accepted,
InProgress,
Failed,
@@ -16,6 +17,7 @@ enum class EOperationStatus {
};
enum class ETaskStatus {
+ Unknown,
Accepted,
InProgress,
Failed,
@@ -24,12 +26,14 @@ enum class ETaskStatus {
};
enum class ETaskType {
+ Unknown,
Download,
Upload,
Merge
};
enum class EFmrComponent {
+ Unknown,
Coordinator,
Worker,
Job
@@ -78,6 +82,8 @@ struct TMergeTaskParams {
using TTaskParams = std::variant<TUploadTaskParams, TDownloadTaskParams, TMergeTaskParams>;
struct TTask: public TThrRefBase {
+ TTask() = default;
+
TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, ui32 numRetries = 1)
: TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), NumRetries(numRetries)
{
@@ -85,7 +91,7 @@ struct TTask: public TThrRefBase {
ETaskType TaskType;
TString TaskId;
- TTaskParams TaskParams;
+ TTaskParams TaskParams = {};
TString SessionId;
ui32 NumRetries; // Not supported yet
@@ -93,6 +99,8 @@ struct TTask: public TThrRefBase {
};
struct TTaskState: public TThrRefBase {
+ TTaskState() = default;
+
TTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& errorMessage = Nothing())
: TaskStatus(taskStatus), TaskId(taskId), TaskErrorMessage(errorMessage)
{
@@ -123,4 +131,4 @@ TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, co
TTaskResult::TPtr MakeTaskResult(ETaskStatus taskStatus, const TMaybe<TFmrError>& taskErrorMessage = Nothing());
-} // namespace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
index 8077c9abea..c5af15bb3a 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
+++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
@@ -8,7 +8,7 @@
#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
-namespace NYql {
+namespace NYql::NFmr {
TDownloadTaskParams downloadTaskParams{
.Input = TYtTableRef{"Path","Cluster","TransactionId"},
@@ -98,4 +98,4 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) {
}
}
-} // namspace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
index 00363e443f..9502faf806 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
+++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
@@ -5,7 +5,7 @@
#include <yql/essentials/utils/yql_panic.h>
#include "yql_yt_worker_impl.h"
-namespace NYql {
+namespace NYql::NFmr {
namespace {
@@ -136,4 +136,4 @@ IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory
return MakeIntrusive<TFmrWorker>(coordinator, jobFactory, settings);
}
-} // namspace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h
index dc43652dea..cbeff4bbf5 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h
+++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h
@@ -1,7 +1,7 @@
#include <library/cpp/random_provider/random_provider.h>
#include <yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h>
-namespace NYql {
+namespace NYql::NFmr {
struct TFmrWorkerSettings {
ui32 WorkerId;
@@ -11,4 +11,4 @@ struct TFmrWorkerSettings {
IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator,IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings);
-} // namspace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h
index d3d5690fbd..751324e795 100644
--- a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h
+++ b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h
@@ -3,7 +3,7 @@
#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
#include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h>
-namespace NYql {
+namespace NYql::NFmr {
class IFmrWorker: public TThrRefBase {
public:
@@ -16,4 +16,4 @@ public:
virtual void Stop() = 0;
};
-} // namspace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
index 266f3c8174..039b3747d1 100644
--- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
+++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
@@ -14,7 +14,7 @@
using namespace NThreading;
using namespace NYql::NNodes;
-namespace NYql {
+namespace NYql::NFmr {
namespace {
@@ -282,4 +282,4 @@ IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave, IFmrCoordinator::TPt
return MakeIntrusive<TFmrYtGateway>(std::move(slave), std::move(coordinator), settings);
}
-} // namspace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h
index df82d50527..769bb24fe5 100644
--- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h
+++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h
@@ -3,7 +3,7 @@
#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
#include <yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h>
-namespace NYql {
+namespace NYql::NFmr {
struct TFmrYtGatewaySettings {
TIntrusivePtr<IRandomProvider> RandomProvider;
@@ -19,4 +19,4 @@ IYtGateway::TPtr CreateYtFmrGateway(
}
);
-} // namspace NYql
+} // namespace NYql::NFmr
diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
index c2064ebd85..ed5a86fc0b 100644
--- a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
+++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
@@ -176,18 +176,18 @@ IYtGateway::TPtr TYtRunTool::CreateYtGateway() {
return ytGateway;
}
- auto coordinator = MakeFmrCoordinator();
- auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ auto coordinator = NFmr::MakeFmrCoordinator();
+ auto func = [&] (NFmr::TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
while (!cancelFlag->load()) {
Sleep(TDuration::Seconds(3));
- return ETaskStatus::Completed;
+ return NFmr::ETaskStatus::Completed;
}
- return ETaskStatus::Aborted;
+ return NFmr::ETaskStatus::Aborted;
}; // TODO - use function which actually calls Downloader/Uploader based on task params
- TFmrJobFactorySettings settings{.Function=func};
+ NFmr::TFmrJobFactorySettings settings{.Function=func};
auto jobFactory = MakeFmrJobFactory(settings);
- TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDefaultRandomProvider(),
+ NFmr::TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDefaultRandomProvider(),
.TimeToSleepBetweenRequests=TDuration::Seconds(1)};
FmrWorker_ = MakeFmrWorker(coordinator, jobFactory, workerSettings);
FmrWorker_->Start();
diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.h b/yt/yql/tools/ytrun/lib/ytrun_lib.h
index e724ce2ac6..96c62a2c33 100644
--- a/yt/yql/tools/ytrun/lib/ytrun_lib.h
+++ b/yt/yql/tools/ytrun/lib/ytrun_lib.h
@@ -33,7 +33,7 @@ protected:
size_t NumThreads_ = 1;
bool KeepTemp_ = false;
TString DefYtServer_;
- IFmrWorker::TPtr FmrWorker_;
+ NFmr::IFmrWorker::TPtr FmrWorker_;
};
} // NYql