aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorcdzyura171 <cdzyura171@yandex-team.com>2025-02-27 20:07:44 +0300
committercdzyura171 <cdzyura171@yandex-team.com>2025-02-27 20:37:45 +0300
commit9ae9b76b16407d250878aafff784174478f1b769 (patch)
tree762a102ab583bc0ee20a8d13c6a4d9c3acdb1844 /yt
parentb2829f5d95e52ba54c76f0c6358c21ff9e409290 (diff)
downloadydb-9ae9b76b16407d250878aafff784174478f1b769.tar.gz
Create FmrYtService and use it in Gateway
Create FmrYtService and use it in Gateway commit_hash:8478e1395da6e8937382c791fa834e89db4b610e
Diffstat (limited to 'yt')
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp14
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp2
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp3
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h1
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/ya.make16
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp318
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ya.make19
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp166
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h12
-rw-r--r--yt/yql/providers/yt/fmr/job/interface/ya.make9
-rw-r--r--yt/yql/providers/yt/fmr/job/interface/yql_yt_job.cpp1
-rw-r--r--yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h20
-rw-r--r--yt/yql/providers/yt/fmr/proto/coordinator.proto1
-rw-r--r--yt/yql/providers/yt/fmr/proto/request_options.proto8
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp25
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h4
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp4
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h16
-rw-r--r--yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.cpp1
-rw-r--r--yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h20
-rw-r--r--yt/yql/providers/yt/fmr/table_data_service/interface/ya.make13
-rw-r--r--yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.cpp55
-rw-r--r--yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h12
-rw-r--r--yt/yql/providers/yt/fmr/table_data_service/local/ut/test_table_service.cpp38
-rw-r--r--yt/yql/providers/yt/fmr/table_data_service/local/ut/ya.make14
-rw-r--r--yt/yql/providers/yt/fmr/table_data_service/local/ya.make18
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp9
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/impl/ya.make16
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp186
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h10
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/interface/ya.make13
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.cpp1
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h19
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/mock/ya.make17
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h84
-rw-r--r--yt/yql/providers/yt/gateway/file/yql_yt_file.cpp4
-rw-r--r--yt/yql/providers/yt/gateway/fmr/ya.make4
-rw-r--r--yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp94
-rw-r--r--yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h9
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_native.cpp16
-rw-r--r--yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp4
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_gateway.h20
-rw-r--r--yt/yql/tools/ytrun/lib/ya.make3
-rw-r--r--yt/yql/tools/ytrun/lib/ytrun_lib.cpp16
44 files changed, 1286 insertions, 49 deletions
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
index 36d7020adc..495057d550 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
@@ -45,12 +45,17 @@ private:
TDownloadTaskParams downloadTaskParams{
- .Input = TYtTableRef{"Path","Cluster","TransactionId"},
+ .Input = TYtTableRef{"Path","Cluster"},
.Output = TFmrTableRef{"TableId"}
};
TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Download, TTaskParams taskParams = downloadTaskParams) {
- return TStartOperationRequest{.TaskType = taskType, .TaskParams = taskParams, .SessionId = "SessionId", .IdempotencyKey = "IdempotencyKey"};
+ return TStartOperationRequest{
+ .TaskType = taskType,
+ .TaskParams = taskParams,
+ .IdempotencyKey = "IdempotencyKey",
+ .ClusterConnection = TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}
+ };
}
std::vector<TStartOperationRequest> CreateSeveralOperationRequests(
@@ -59,7 +64,10 @@ std::vector<TStartOperationRequest> CreateSeveralOperationRequests(
std::vector<TStartOperationRequest> startOperationRequests(numRequests);
for (int i = 0; i < numRequests; ++i) {
startOperationRequests[i] = TStartOperationRequest{
- .TaskType = taskType, .TaskParams = taskParams, .IdempotencyKey = "IdempotencyKey_" + ToString(i)
+ .TaskType = taskType,
+ .TaskParams = taskParams,
+ .IdempotencyKey = "IdempotencyKey_" + ToString(i),
+ .ClusterConnection = TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn", .Token = "token"}
};
}
return startOperationRequests;
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
index bec951c180..bd73813a43 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
@@ -57,7 +57,7 @@ public:
}
TString taskId = GenerateId();
- TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, request.TaskParams, request.SessionId);
+ TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, request.TaskParams, request.SessionId, request.ClusterConnection);
Tasks_[taskId] = TCoordinatorTaskInfo{.Task = createdTask, .TaskStatus = ETaskStatus::Accepted, .OperationId = operationId};
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
index 85f7d5834a..69908b3ac0 100644
--- a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
@@ -68,6 +68,8 @@ NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperatio
protoStartOperationRequest.SetIdempotencyKey(*startOperationRequest.IdempotencyKey);
}
protoStartOperationRequest.SetNumRetries(startOperationRequest.NumRetries);
+ auto protoClusterConnection = ClusterConnectionToProto(startOperationRequest.ClusterConnection);
+ protoStartOperationRequest.MutableClusterConnection()->Swap(&protoClusterConnection);
return protoStartOperationRequest;
}
@@ -80,6 +82,7 @@ TStartOperationRequest StartOperationRequestFromProto(const NProto::TStartOperat
startOperationRequest.IdempotencyKey = protoStartOperationRequest.GetIdempotencyKey();
}
startOperationRequest.NumRetries = protoStartOperationRequest.GetNumRetries();
+ startOperationRequest.ClusterConnection = ClusterConnectionFromProto(protoStartOperationRequest.GetClusterConnection());
return startOperationRequest;
}
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
index 8be5d757c2..ba9592de9a 100644
--- a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
@@ -26,6 +26,7 @@ struct TStartOperationRequest {
TString SessionId;
TMaybe<TString> IdempotencyKey = Nothing();
ui32 NumRetries = 1; // Not supported yet
+ TClusterConnection ClusterConnection = {};
};
struct TStartOperationResponse {
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make
new file mode 100644
index 0000000000..b2fb4a8853
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make
@@ -0,0 +1,16 @@
+UNITTEST()
+
+SRCS(
+ yql_yt_job_ut.cpp
+)
+
+PEERDIR(
+ yt/yql/providers/yt/fmr/job/impl
+ yt/yql/providers/yt/fmr/yt_service/mock
+ yt/yql/providers/yt/fmr/table_data_service/local
+ yql/essentials/utils/log
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
new file mode 100644
index 0000000000..053a106ea5
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
@@ -0,0 +1,318 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
+#include <yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h>
+#include <yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h>
+
+namespace NYql::NFmr {
+
+Y_UNIT_TEST_SUITE(FmrJobTests) {
+ Y_UNIT_TEST(DownloadTable) {
+ TString tableContent =
+ "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+ TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
+ NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
+ std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+
+ TYtTableRef input = TYtTableRef("test_cluster", "test_path");
+ TFmrTableRef output = TFmrTableRef("test_table_id");
+ TDownloadTaskParams params = TDownloadTaskParams(input, output);
+
+ ytUploadedTablesMock->AddTable(input, tableContent);
+
+ auto err = job->Download(params);
+
+ UNIT_ASSERT_C(!err,err.GetRef());
+ UNIT_ASSERT_NO_DIFF(tableDataServicePtr->Get("test_table_id").GetValueSync().GetRef(), tableContent);
+ }
+
+ Y_UNIT_TEST(UploadTable) {
+ TString ytTableContent =
+ "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+ TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
+ NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
+ std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+
+ TYtTableRef output = TYtTableRef("test_cluster", "test_path");
+ TFmrTableRef input = TFmrTableRef("test_table_id");
+ TUploadTaskParams params = TUploadTaskParams(input, output);
+
+ tableDataServicePtr->Put(input.TableId, ytTableContent);
+
+ auto err = job->Upload(params);
+
+ UNIT_ASSERT_C(!err,err.GetRef());
+ UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), ytTableContent);
+ }
+
+ Y_UNIT_TEST(MergeFmrTables) {
+ TString TableContent_1 =
+ "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)";
+ TString TableContent_2 =
+ "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+ TString TableContent_3 =
+ "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+ TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
+ NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
+ std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+
+ TFmrTableRef input_1 = TFmrTableRef("test_table_id_1");
+ TFmrTableRef input_2 = TFmrTableRef("test_table_id_2");
+ TFmrTableRef input_3 = TFmrTableRef("test_table_id_3");
+ TTableRef input_table_ref_1 = {input_1};
+ TTableRef input_table_ref_2 = {input_2};
+ TTableRef input_table_ref_3 = {input_3};
+ TFmrTableRef output = TFmrTableRef("test_table_id_output");
+ std::vector<TTableRef> inputs = {input_table_ref_1, input_table_ref_2, input_table_ref_3};
+ TMergeTaskParams params = TMergeTaskParams(inputs, output);
+
+ tableDataServicePtr->Put(input_1.TableId, TableContent_1);
+ tableDataServicePtr->Put(input_2.TableId, TableContent_2);
+ tableDataServicePtr->Put(input_3.TableId, TableContent_3);
+
+ auto err = job->Merge(params);
+
+ UNIT_ASSERT_C(!err,err.GetRef());
+ UNIT_ASSERT_NO_DIFF(tableDataServicePtr->Get(output.TableId).GetValueSync().GetRef(), TableContent_1 + TableContent_2 + TableContent_3);
+ }
+
+ Y_UNIT_TEST(MergeMixedTables) {
+ TString TableContent_1 =
+ "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)";
+ TString TableContent_2 =
+ "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+ TString TableContent_3 =
+ "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+ TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
+ NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
+ std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+
+ TFmrTableRef input_1 = TFmrTableRef("test_table_id_1");
+ TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster");
+ TFmrTableRef input_3 = TFmrTableRef("test_table_id_3");
+ TTableRef input_table_ref_1 = {input_1};
+ TTableRef input_table_ref_2 = {input_2};
+ TTableRef input_table_ref_3 = {input_3};
+ TFmrTableRef output = TFmrTableRef("test_table_id_output");
+ std::vector<TTableRef> inputs = {input_table_ref_1, input_table_ref_2, input_table_ref_3};
+ TMergeTaskParams params = TMergeTaskParams(inputs, output);
+
+ tableDataServicePtr->Put(input_1.TableId, TableContent_1);
+ ytUploadedTablesMock->AddTable(input_2, TableContent_2);
+ tableDataServicePtr->Put(input_3.TableId, TableContent_3);
+
+ auto err = job->Merge(params);
+
+ UNIT_ASSERT_C(!err,err.GetRef());
+ UNIT_ASSERT_NO_DIFF(tableDataServicePtr->Get(output.TableId).GetValueSync().GetRef(), TableContent_1 + TableContent_2 + TableContent_3);
+ }
+}
+
+Y_UNIT_TEST_SUITE(TaskRunTests) {
+ Y_UNIT_TEST(RunDownloadTask) {
+ TString ytTableContent =
+ "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}";
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+ TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
+ NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
+ std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
+
+ TYtTableRef input = TYtTableRef("test_cluster", "test_path");
+ TFmrTableRef output = TFmrTableRef("test_table_id");
+
+ ytUploadedTablesMock->AddTable(input, ytTableContent);
+ TDownloadTaskParams params = TDownloadTaskParams(input, output);
+ TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id");
+
+
+ ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag);
+
+ UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed);
+ UNIT_ASSERT_NO_DIFF(tableDataServicePtr->Get("test_table_id").GetValueSync().GetRef(), ytTableContent);
+ }
+
+ Y_UNIT_TEST(RunUploadTask) {
+ TString ytTableContent =
+ "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}";
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+ TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
+ NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
+ std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
+
+ TFmrTableRef input = TFmrTableRef("test_table_id");
+ TYtTableRef output = TYtTableRef("test_cluster", "test_path");
+
+ TUploadTaskParams params = TUploadTaskParams(input, output);
+ TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
+
+ tableDataServicePtr->Put(input.TableId, ytTableContent);
+
+ ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag);
+
+ UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed);
+ UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), ytTableContent);
+ }
+
+ Y_UNIT_TEST(RunUploadTaskWithNoTable) {
+ TString ytTableContent =
+ "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}";
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+ TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
+ NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
+ std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
+
+ TFmrTableRef input = TFmrTableRef("test_table_id");
+ TYtTableRef output = TYtTableRef("test_cluster", "test_path");
+
+ TUploadTaskParams params = TUploadTaskParams(input, output);
+ TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
+
+ // No table in tableDataServicePtr
+ // tableDataServicePtr->Put(input.TableId, ytTableContent);
+
+ ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag);
+
+ UNIT_ASSERT_EQUAL(status, ETaskStatus::Failed);
+ UNIT_ASSERT(ytUploadedTablesMock->IsEmpty());
+ }
+
+ Y_UNIT_TEST(RunMergeTask) {
+ TString TableContent_1 =
+ "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)";
+ TString TableContent_2 =
+ "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+ TString TableContent_3 =
+ "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+ TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
+ NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
+ std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
+
+ TFmrTableRef input_1 = TFmrTableRef("test_table_id_1");
+ TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster");
+ TFmrTableRef input_3 = TFmrTableRef("test_table_id_3");
+ TTableRef input_table_ref_1 = {input_1};
+ TTableRef input_table_ref_2 = {input_2};
+ TTableRef input_table_ref_3 = {input_3};
+ TFmrTableRef output = TFmrTableRef("test_table_id_output");
+ std::vector<TTableRef> inputs = {input_table_ref_1, input_table_ref_2, input_table_ref_3};
+ TMergeTaskParams params = TMergeTaskParams(inputs, output);
+
+ TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
+
+ tableDataServicePtr->Put(input_1.TableId, TableContent_1);
+ ytUploadedTablesMock->AddTable(input_2, TableContent_2);
+ tableDataServicePtr->Put(input_3.TableId, TableContent_3);
+
+ ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag);
+
+ UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed);
+ UNIT_ASSERT_NO_DIFF(tableDataServicePtr->Get(
+ output.TableId).GetValueSync().GetRef(),
+ TableContent_1 + TableContent_2 + TableContent_3
+ );
+ }
+
+ Y_UNIT_TEST(RunMergeTaskWithNoTable) {
+ TString TableContent_1 =
+ "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)";
+ TString TableContent_2 =
+ "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+ TString TableContent_3 =
+ "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+ TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
+ NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
+ std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
+
+ TFmrTableRef input_1 = TFmrTableRef("test_table_id_1");
+ TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster");
+ TFmrTableRef input_3 = TFmrTableRef("test_table_id_3");
+ TTableRef input_table_ref_1 = {input_1};
+ TTableRef input_table_ref_2 = {input_2};
+ TTableRef input_table_ref_3 = {input_3};
+ TFmrTableRef output = TFmrTableRef("test_table_id_output");
+ std::vector<TTableRef> inputs = {input_table_ref_1, input_table_ref_2, input_table_ref_3};
+ TMergeTaskParams params = TMergeTaskParams(inputs, output);
+
+ TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
+
+ tableDataServicePtr->Put(input_1.TableId, TableContent_1);
+ // No table in Yt
+ // ytUploadedTablesMock->AddTable(input_2, TableContent_2);
+ tableDataServicePtr->Put(input_3.TableId, TableContent_3);
+
+ ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag);
+ UNIT_ASSERT_EQUAL(status, ETaskStatus::Failed);
+ UNIT_ASSERT(!tableDataServicePtr->Get(output.TableId).GetValueSync());
+ }
+}
+
+} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/job/impl/ya.make b/yt/yql/providers/yt/fmr/job/impl/ya.make
new file mode 100644
index 0000000000..4a647bd58a
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/impl/ya.make
@@ -0,0 +1,19 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_job_impl.cpp
+)
+
+PEERDIR(
+ library/cpp/threading/future
+ yt/yql/providers/yt/fmr/job/interface
+ yt/yql/providers/yt/fmr/request_options
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
new file mode 100644
index 0000000000..c672d6d6b9
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
@@ -0,0 +1,166 @@
+#include <library/cpp/threading/future/core/future.h>
+
+#include <util/stream/file.h>
+
+#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
+#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
+#include <yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h>
+#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h>
+
+#include <yql/essentials/utils/log/log.h>
+
+namespace NYql::NFmr {
+
+class TFmrJob: public IFmrJob {
+public:
+
+ TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag)
+ : TableDataService_(tableDataService), YtService_(ytService), CancelFlag_(cancelFlag)
+ {
+ }
+
+ virtual TMaybe<TString> Download(const TDownloadTaskParams& params, const TClusterConnection& clusterConnection) override { // вынести в приватный метод для переиспользования
+ try {
+ const auto ytTable = params.Input;
+ const auto cluster = params.Input.Cluster;
+ const auto path = params.Input.Path;
+ const auto tableId = params.Output.TableId;
+
+ YQL_CLOG(DEBUG, FastMapReduce) << "Downloading " << cluster << '.' << path;
+
+ auto res = GetYtTableContent(ytTable, clusterConnection);
+ auto err = std::get_if<TError>(&res);
+ if (err) {
+ return err->ErrorMessage;
+ }
+ auto tableContent = std::get<TString>(res);
+ TableDataService_->Put(tableId, tableContent).Wait();
+ } catch (...) {
+ return CurrentExceptionMessage();
+ }
+
+ return Nothing();
+ }
+
+ virtual TMaybe<TString> Upload(const TUploadTaskParams& params, const TClusterConnection& clusterConnection) override {
+ const auto ytTable = params.Output;
+ const auto cluster = params.Output.Cluster;
+ const auto path = params.Output.Path;
+ const auto tableId = params.Input.TableId;
+
+ YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path;
+
+ TMaybe<TString> getResult = TableDataService_->Get(tableId).GetValueSync();
+
+ if (!getResult) {
+ YQL_CLOG(ERROR, FastMapReduce) << "Table " << tableId << " not found";
+ return "Table not found";
+ }
+
+ TString tableContent = getResult.GetRef();
+ TStringInput inputStream(tableContent);
+
+ YtService_->Upload(ytTable, inputStream, clusterConnection);
+
+ return Nothing();
+ }
+
+ virtual TMaybe<TString> Merge(const TMergeTaskParams& params, const TClusterConnection& clusterConnection) override {
+ const auto inputs = params.Input;
+ const auto output = params.Output;
+
+ YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs";
+
+ TString mergedTableContent = "";
+
+ for (const auto& inputTableRef : inputs) {
+ if (CancelFlag_->load()) {
+ return "Canceled";
+ }
+ auto res = GetTableContent(inputTableRef, clusterConnection);
+
+ auto err = std::get_if<TError>(&res);
+ if (err) {
+ return err->ErrorMessage;
+ }
+ TString tableContent = std::get<TString>(res);
+
+ mergedTableContent += tableContent;
+ }
+
+ TableDataService_->Put(output.TableId, mergedTableContent).Wait();
+
+ return Nothing();
+ }
+private:
+ std::variant<TString, TError> GetTableContent(const TTableRef& tableRef, const TClusterConnection& clusterConnection) {
+ auto ytTable = std::get_if<TYtTableRef>(&tableRef);
+ auto fmrTable = std::get_if<TFmrTableRef>(&tableRef);
+ if (ytTable) {
+ return GetYtTableContent(*ytTable, clusterConnection);
+ } else if (fmrTable) {
+ return GetFmrTableContent(*fmrTable);
+ } else {
+ ythrow yexception() << "Unsupported table type";
+ }
+ }
+
+ std::variant<TString, TError> GetYtTableContent(const TYtTableRef& ytTable, const TClusterConnection& clusterConnection) {
+ auto res = YtService_->Download(ytTable, clusterConnection);
+ auto* err = std::get_if<TError>(&res);
+ if (err) {
+ return *err;
+ }
+ auto tableFile = std::get_if<THolder<TTempFileHandle>>(&res);
+ TFileInput inputStream(tableFile->Get()->Name());
+ TString tableContent = inputStream.ReadAll();
+ return tableContent;
+ }
+
+ std::variant<TString, TError> GetFmrTableContent(const TFmrTableRef& fmrTable) {
+ auto res = TableDataService_->Get(fmrTable.TableId);
+ return res.GetValueSync().GetRef();
+ }
+private:
+ ITableDataService::TPtr TableDataService_;
+ IYtService::TPtr YtService_;
+ std::shared_ptr<std::atomic<bool>> CancelFlag_;
+};
+
+IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ return MakeIntrusive<TFmrJob>(tableDataService, ytService, cancelFlag);
+}
+
+ETaskStatus RunJob(
+ TTask::TPtr task,
+ ITableDataService::TPtr tableDataService,
+ IYtService::TPtr ytService,
+ std::shared_ptr<std::atomic<bool>> cancelFlag
+) {
+ IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag);
+
+ auto processTask = [job, task] (auto&& taskParams) {
+ using T = std::decay_t<decltype(taskParams)>;
+
+ if constexpr (std::is_same_v<T, TUploadTaskParams>) {
+ return job->Upload(taskParams, task->ClusterConnection);
+ } else if constexpr (std::is_same_v<T, TDownloadTaskParams>) {
+ return job->Download(taskParams, task->ClusterConnection);
+ } else if constexpr (std::is_same_v<T, TMergeTaskParams>) {
+ return job->Merge(taskParams, task->ClusterConnection);
+ } else {
+ throw std::runtime_error{"Unsupported task type"};
+ }
+ };
+
+ TMaybe<TString> taskResult = std::visit(processTask, task->TaskParams);
+
+ if (taskResult.Defined()) {
+ YQL_CLOG(ERROR, FastMapReduce) << "Task failed: " << taskResult.GetRef();
+ return ETaskStatus::Failed;
+ }
+
+ return ETaskStatus::Completed;
+};
+
+} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
new file mode 100644
index 0000000000..28280ff0ce
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
@@ -0,0 +1,12 @@
+#include <yt/cpp/mapreduce/interface/fwd.h>
+#include <yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h>
+#include <yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h>
+#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h>
+
+namespace NYql::NFmr {
+
+IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag);
+
+ETaskStatus RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag);
+
+} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/job/interface/ya.make b/yt/yql/providers/yt/fmr/job/interface/ya.make
new file mode 100644
index 0000000000..7d256622f7
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/interface/ya.make
@@ -0,0 +1,9 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_job.cpp
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.cpp b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.cpp
new file mode 100644
index 0000000000..a80a31a1a9
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.cpp
@@ -0,0 +1 @@
+#include "yql_yt_job.h"
diff --git a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h
new file mode 100644
index 0000000000..187ed21d3c
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h
@@ -0,0 +1,20 @@
+#pragma once
+
+#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
+
+namespace NYql::NFmr {
+
+class IFmrJob: public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<IFmrJob>;
+
+ virtual ~IFmrJob() = default;
+
+ virtual TMaybe<TString> Download(const TDownloadTaskParams& params, const TClusterConnection& clusterConnection = TClusterConnection()) = 0;
+
+ virtual TMaybe<TString> Upload(const TUploadTaskParams& params, const TClusterConnection& clusterConnection = TClusterConnection()) = 0;
+
+ virtual TMaybe<TString> Merge(const TMergeTaskParams& params, const TClusterConnection& clusterConnection = TClusterConnection()) = 0;
+};
+
+} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/proto/coordinator.proto b/yt/yql/providers/yt/fmr/proto/coordinator.proto
index 33990caa28..c76bb50fc3 100644
--- a/yt/yql/providers/yt/fmr/proto/coordinator.proto
+++ b/yt/yql/providers/yt/fmr/proto/coordinator.proto
@@ -22,6 +22,7 @@ message TStartOperationRequest {
string SessionId = 3;
optional string IdempotencyKey = 4;
uint32 NumRetries = 5;
+ TClusterConnection ClusterConnection = 6;
}
message TStartOperationResponse {
diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto
index e37583a0d2..7cbd1eecf4 100644
--- a/yt/yql/providers/yt/fmr/proto/request_options.proto
+++ b/yt/yql/providers/yt/fmr/proto/request_options.proto
@@ -48,7 +48,6 @@ message TStatistics {}
message TYtTableRef {
string Path = 1;
string Cluster = 2;
- string TransactionId = 3;
}
message TFmrTableRef {
@@ -85,12 +84,19 @@ message TTaskParams {
}
}
+message TClusterConnection {
+ string TransactionId = 1;
+ string YtServerName = 2;
+ optional string Token = 3;
+}
+
message TTask {
ETaskType TaskType = 1;
string TaskId = 2;
TTaskParams TaskParams = 3;
string SessionId = 4;
optional uint32 NumRetries = 5;
+ TClusterConnection ClusterConnection = 6;
}
message TTaskState {
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
index 52cc0f3ad6..7a931909ac 100644
--- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
@@ -38,7 +38,6 @@ NProto::TYtTableRef YtTableRefToProto(const TYtTableRef& ytTableRef) {
NProto::TYtTableRef protoYtTableRef;
protoYtTableRef.SetPath(ytTableRef.Path);
protoYtTableRef.SetCluster(ytTableRef.Cluster);
- protoYtTableRef.SetTransactionId(ytTableRef.TransactionId);
return protoYtTableRef;
}
@@ -46,7 +45,6 @@ TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef) {
TYtTableRef ytTableRef;
ytTableRef.Path = protoYtTableRef.GetPath();
ytTableRef.Cluster = protoYtTableRef.GetCluster();
- ytTableRef.TransactionId = protoYtTableRef.GetTransactionId();
return ytTableRef;
}
@@ -170,6 +168,26 @@ TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) {
return taskParams;
}
+NProto::TClusterConnection ClusterConnectionToProto(const TClusterConnection& clusterConnection) {
+ NProto::TClusterConnection protoClusterConnection;
+ protoClusterConnection.SetTransactionId(clusterConnection.TransactionId);
+ protoClusterConnection.SetYtServerName(clusterConnection.YtServerName);
+ if (clusterConnection.Token) {
+ protoClusterConnection.SetToken(*clusterConnection.Token);
+ }
+ return protoClusterConnection;
+}
+
+TClusterConnection ClusterConnectionFromProto(const NProto::TClusterConnection& protoClusterConnection) {
+ TClusterConnection clusterConnection{};
+ clusterConnection.TransactionId = protoClusterConnection.GetTransactionId();
+ clusterConnection.YtServerName = protoClusterConnection.GetYtServerName();
+ if (protoClusterConnection.HasToken()) {
+ clusterConnection.Token = protoClusterConnection.GetToken();
+ }
+ return clusterConnection;
+}
+
NProto::TTask TaskToProto(const TTask& task) {
NProto::TTask protoTask;
protoTask.SetTaskType(static_cast<NProto::ETaskType>(task.TaskType));
@@ -178,6 +196,8 @@ NProto::TTask TaskToProto(const TTask& task) {
protoTask.MutableTaskParams()->Swap(&taskParams);
protoTask.SetSessionId(task.SessionId);
protoTask.SetNumRetries(task.NumRetries);
+ auto clusterConnection = ClusterConnectionToProto(task.ClusterConnection);
+ protoTask.MutableClusterConnection()->Swap(&clusterConnection);
return protoTask;
}
@@ -188,6 +208,7 @@ TTask TaskFromProto(const NProto::TTask& protoTask) {
task.TaskParams = TaskParamsFromProto(protoTask.GetTaskParams());
task.SessionId = protoTask.GetSessionId();
task.NumRetries = protoTask.GetNumRetries();
+ task.ClusterConnection = ClusterConnectionFromProto(protoTask.GetClusterConnection());
return task;
}
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h
index e886e91901..5108bbb0f3 100644
--- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h
@@ -37,6 +37,10 @@ NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams);
TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams);
+NProto::TClusterConnection ClusterConnectionToProto(const TClusterConnection& clusterConnection);
+
+TClusterConnection ClusterConnectionFromProto(const NProto::TClusterConnection& protoClusterConnection);
+
NProto::TTask TaskToProto(const TTask& task);
TTask TaskFromProto(const NProto::TTask& protoTask);
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
index c73791bee4..f6c31167c3 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
@@ -2,8 +2,8 @@
namespace NYql::NFmr {
-TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId) {
- return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId);
+TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection) {
+ return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId, clusterConnection);
}
TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage) {
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
index b423021a71..de7c4eb435 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
@@ -54,7 +54,6 @@ struct TError {
struct TYtTableRef {
TString Path;
TString Cluster;
- TString TransactionId;
};
struct TFmrTableRef {
@@ -160,11 +159,19 @@ using TTaskParams = std::variant<TUploadTaskParams, TDownloadTaskParams, TMergeT
using TTaskParamsNew = std::variant<TUploadTaskParamsNew, TDownloadTaskParamsNew, TMergeTaskParamsNew>;
+struct TClusterConnection {
+ TString TransactionId;
+ TString YtServerName;
+ TMaybe<TString> Token;
+};
+
+using TTaskParams = std::variant<TUploadTaskParams, TDownloadTaskParams, TMergeTaskParams>;
+
struct TTask: public TThrRefBase {
TTask() = default;
- TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, ui32 numRetries = 1)
- : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), NumRetries(numRetries)
+ TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection, ui32 numRetries = 1)
+ : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), ClusterConnection(clusterConnection), NumRetries(numRetries)
{
}
@@ -172,6 +179,7 @@ struct TTask: public TThrRefBase {
TString TaskId;
TTaskParams TaskParams = {};
TString SessionId;
+ TClusterConnection ClusterConnection = {};
ui32 NumRetries; // Not supported yet
using TPtr = TIntrusivePtr<TTask>;
@@ -205,7 +213,7 @@ struct TTaskResult: public TThrRefBase {
using TPtr = TIntrusivePtr<TTaskResult>;
};
-TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId);
+TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection = TClusterConnection{});
TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage = Nothing());
diff --git a/yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.cpp b/yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.cpp
new file mode 100644
index 0000000000..7f3e3c5135
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.cpp
@@ -0,0 +1 @@
+#include "table_data_service.h"
diff --git a/yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h b/yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h
new file mode 100644
index 0000000000..98a2ec9ddc
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h
@@ -0,0 +1,20 @@
+#pragma once
+
+#include <library/cpp/threading/future/core/future.h>
+
+namespace NYql::NFmr {
+
+class ITableDataService: public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<ITableDataService>;
+
+ virtual ~ITableDataService() = default;
+
+ virtual NThreading::TFuture<void> Put(const TString& id, const TString& data) = 0;
+
+ virtual NThreading::TFuture<TMaybe<TString>> Get(const TString& id) = 0;
+
+ virtual NThreading::TFuture<void> Delete(const TString& id) = 0;
+};
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/table_data_service/interface/ya.make b/yt/yql/providers/yt/fmr/table_data_service/interface/ya.make
new file mode 100644
index 0000000000..63abe6d78f
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/table_data_service/interface/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ table_data_service.cpp
+)
+
+PEERDIR(
+ library/cpp/threading/future
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.cpp b/yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.cpp
new file mode 100644
index 0000000000..445263ff5e
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.cpp
@@ -0,0 +1,55 @@
+#include "table_data_service.h"
+
+namespace NYql::NFmr {
+
+namespace {
+
+class TLocalTableDataService: public ITableDataService {
+public:
+ TLocalTableDataService(const TLocalTableDataServiceSettings& settings): NumParts_(settings.NumParts) {
+ Data_.resize(NumParts_);
+ }
+
+ NThreading::TFuture<void> Put(const TString& key, const TString& value) {
+ auto& map = Data_[std::hash<TString>()(key) % NumParts_];
+ auto it = map.find(key);
+ if (it != map.end()) {
+ return NThreading::MakeFuture();
+ }
+ map.insert({key, value});
+ return NThreading::MakeFuture();
+ }
+
+ NThreading::TFuture<TMaybe<TString>> Get(const TString& key) {
+ TMaybe<TString> value = Nothing();
+ auto& map = Data_[std::hash<TString>()(key) % NumParts_];
+ auto it = map.find(key);
+ if (it != map.end()) {
+ value = it->second;
+ }
+ return NThreading::MakeFuture(value);
+ }
+
+ NThreading::TFuture<void> Delete(const TString& key) {
+ auto& map = Data_[std::hash<TString>()(key) % NumParts_];
+ auto it = map.find(key);
+ if (it == map.end()) {
+ return NThreading::MakeFuture();
+ }
+ map.erase(key);
+ return NThreading::MakeFuture();
+ }
+
+private:
+ std::vector<std::unordered_map<TString, TString>> Data_;
+ const ui32 NumParts_;
+};
+
+} // namespace
+
+ITableDataService::TPtr MakeLocalTableDataService(const TLocalTableDataServiceSettings& settings) {
+ return MakeIntrusive<TLocalTableDataService>(settings);
+}
+
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h b/yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h
new file mode 100644
index 0000000000..eb4d0cb2f6
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h
@@ -0,0 +1,12 @@
+#include <library/cpp/threading/future/future.h>
+#include <yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h>
+
+namespace NYql::NFmr {
+
+struct TLocalTableDataServiceSettings {
+ ui32 NumParts;
+};
+
+ITableDataService::TPtr MakeLocalTableDataService(const TLocalTableDataServiceSettings& settings);
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/table_data_service/local/ut/test_table_service.cpp b/yt/yql/providers/yt/fmr/table_data_service/local/ut/test_table_service.cpp
new file mode 100644
index 0000000000..a48df5f3df
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/table_data_service/local/ut/test_table_service.cpp
@@ -0,0 +1,38 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h>
+
+using namespace NYql::NFmr;
+
+Y_UNIT_TEST_SUITE(TLocalTableServiceTest)
+{
+ Y_UNIT_TEST(GetNonexistentKey) {
+ ITableDataService::TPtr tableDataService = MakeLocalTableDataService({3});
+ auto getFuture = tableDataService->Get("key");
+ UNIT_ASSERT_VALUES_EQUAL(getFuture.GetValueSync(), Nothing());
+ }
+ Y_UNIT_TEST(GetExistingKey) {
+ ITableDataService::TPtr tableDataService = MakeLocalTableDataService({3});
+ auto putFuture = tableDataService->Put("key", "1");
+ putFuture.GetValueSync();
+ auto getFuture = tableDataService->Get("key");
+ UNIT_ASSERT_VALUES_EQUAL(getFuture.GetValueSync(), "1");
+ }
+ Y_UNIT_TEST(DeleteNonexistentKey) {
+ ITableDataService::TPtr tableDataService = MakeLocalTableDataService({3});
+ auto putFuture = tableDataService->Put("key", "1");
+ putFuture.GetValueSync();
+ auto deleteFuture = tableDataService->Delete("other_key");
+ deleteFuture.GetValueSync();
+ auto getFuture = tableDataService->Get("key");
+ UNIT_ASSERT_VALUES_EQUAL(getFuture.GetValueSync(), "1");
+ }
+ Y_UNIT_TEST(DeleteExistingKey) {
+ ITableDataService::TPtr tableDataService = MakeLocalTableDataService({3});
+ auto putFuture = tableDataService->Put("key", "1");
+ putFuture.GetValueSync();
+ auto deleteFuture = tableDataService->Delete("key");
+ deleteFuture.GetValueSync();
+ auto getFuture = tableDataService->Get("key");
+ UNIT_ASSERT_VALUES_EQUAL(getFuture.GetValueSync(), Nothing());
+ }
+}
diff --git a/yt/yql/providers/yt/fmr/table_data_service/local/ut/ya.make b/yt/yql/providers/yt/fmr/table_data_service/local/ut/ya.make
new file mode 100644
index 0000000000..9b57c059db
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/table_data_service/local/ut/ya.make
@@ -0,0 +1,14 @@
+UNITTEST()
+
+SRCS(
+ test_table_service.cpp
+)
+
+PEERDIR(
+ yt/yql/providers/yt/fmr/table_data_service/local
+)
+
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/table_data_service/local/ya.make b/yt/yql/providers/yt/fmr/table_data_service/local/ya.make
new file mode 100644
index 0000000000..78290fcd2e
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/table_data_service/local/ya.make
@@ -0,0 +1,18 @@
+LIBRARY()
+
+SRCS(
+ table_data_service.cpp
+)
+
+PEERDIR(
+ library/cpp/threading/future
+ yt/yql/providers/yt/fmr/table_data_service/interface
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
index 4996c932c4..e16e20c911 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
+++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
@@ -11,12 +11,17 @@
namespace NYql::NFmr {
TDownloadTaskParams downloadTaskParams{
- .Input = TYtTableRef{"Path","Cluster","TransactionId"},
+ .Input = TYtTableRef{"Path","Cluster"},
.Output = TFmrTableRef{"TableId"}
};
TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Download, TTaskParams taskParams = downloadTaskParams) {
- return TStartOperationRequest{.TaskType = taskType, .TaskParams = taskParams, .IdempotencyKey = "IdempotencyKey"};
+ return TStartOperationRequest{
+ .TaskType = taskType,
+ .TaskParams = taskParams,
+ .IdempotencyKey = "IdempotencyKey",
+ .ClusterConnection = TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}
+ };
}
Y_UNIT_TEST_SUITE(FmrWorkerTests) {
diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/ya.make b/yt/yql/providers/yt/fmr/yt_service/impl/ya.make
new file mode 100644
index 0000000000..2da71e6b7d
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_service/impl/ya.make
@@ -0,0 +1,16 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_yt_service_impl.cpp
+)
+
+PEERDIR(
+ library/cpp/yson
+ library/cpp/yson/node
+ yt/cpp/mapreduce/client
+ yt/yql/providers/yt/fmr/yt_service/interface
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp
new file mode 100644
index 0000000000..1da2ff6bd3
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp
@@ -0,0 +1,186 @@
+#include <library/cpp/yson/node/node_io.h>
+#include <library/cpp/yson/node/node_visitor.h>
+#include <library/cpp/yson/parser.h>
+#include <library/cpp/yson/writer.h>
+#include <yt/cpp/mapreduce/interface/client.h>
+
+#include "yql_yt_yt_service_impl.h"
+
+namespace NYql::NFmr {
+
+namespace {
+
+class TYtFmrRowConsumer: public NYson::TYsonConsumerBase {
+public:
+ TYtFmrRowConsumer(NYT::TTableWriterPtr<NYT::TNode> writer): Writer_(writer)
+ {
+ CurNode_ = NYT::TNode();
+ Builder_ = MakeHolder<NYT::TNodeBuilder>(&CurNode_);
+ }
+
+ void OnStringScalar(TStringBuf value) override {
+ Builder_->OnStringScalar(value);
+ }
+
+ void OnInt64Scalar(i64 value) override {
+ Builder_->OnInt64Scalar(value);
+ }
+
+ void OnUint64Scalar(ui64 value) override {
+ Builder_->OnUint64Scalar(value);
+ }
+
+ void OnDoubleScalar(double value) override {
+ Builder_->OnDoubleScalar(value);
+ }
+
+ void OnBooleanScalar(bool value) override {
+ Builder_->OnBooleanScalar(value);
+ }
+
+ void OnEntity() override {
+ Builder_->OnEntity();
+ }
+
+ void OnBeginList() override {
+ ++Level_;
+ Builder_->OnBeginList();
+ }
+
+ void OnBeginList(ui64 reserveSize) {
+ Builder_->OnBeginList(reserveSize);
+ }
+
+ void OnListItem() override {
+ if (Level_ == 0) {
+ Flush();
+ } else {
+ Builder_->OnListItem();
+ }
+ }
+
+ void OnEndList() override {
+ --Level_;
+ Builder_->OnEndList();
+ }
+
+ void OnBeginMap() override {
+ ++Level_;
+ Builder_->OnBeginMap();
+ }
+
+ void OnBeginMap(ui64 reserveSize) {
+ Builder_->OnBeginMap(reserveSize);
+ }
+
+ void OnKeyedItem(TStringBuf key) override {
+ Builder_->OnKeyedItem(key);
+ }
+
+ void OnEndMap() override {
+ --Level_;
+ Builder_->OnEndMap();
+ }
+
+ void OnBeginAttributes() override {
+ ++Level_;
+ Builder_->OnBeginAttributes();
+ }
+
+ void OnEndAttributes() override {
+ --Level_;
+ Builder_->OnEndAttributes();
+ }
+
+ void OnNode(NYT::TNode node) {
+ Builder_->OnNode(node);
+ }
+
+ ~TYtFmrRowConsumer() {
+ Y_ABORT_UNLESS(!CurNode_.HasValue());
+ }
+
+ void Flush() {
+ if (CurNode_.HasValue()) {
+ Writer_->AddRow(CurNode_);
+ }
+ CurNode_ = NYT::TNode();
+ Builder_ = MakeHolder<NYT::TNodeBuilder>(&CurNode_);
+ }
+private:
+ NYT::TNode CurNode_;
+ NYT::TTableWriterPtr<NYT::TNode> Writer_;
+ THolder<NYT::TNodeBuilder> Builder_;
+ ui32 Level_ = 0;
+};
+
+class TFmrYtService: public NYql::NFmr::IYtService {
+public:
+
+ std::variant<THolder<TTempFileHandle>, TError> Download(const TYtTableRef& ytTable, const TClusterConnection& clusterConnection) override {
+ try {
+ if (!ClusterConnections_.contains(ytTable.Cluster)) {
+ ClusterConnections_[ytTable.Cluster] = clusterConnection;
+ }
+ auto client = CreateClient(ClusterConnections_[ytTable.Cluster]);
+ auto tmpFile = MakeHolder<TTempFileHandle>();
+ auto transaction = client->AttachTransaction(GetGuid(clusterConnection.TransactionId));
+ TFileOutput downloadStream(tmpFile->Name());
+
+ NYson::TYsonWriter writer {&downloadStream, NYT::NYson::EYsonFormat::Text, NYT::NYson::EYsonType::ListFragment};
+ NYT::TNodeVisitor visitor{&writer};
+ auto reader = transaction->CreateTableReader<NYT::TNode>("//" + ytTable.Path);
+ for (; reader->IsValid(); reader->Next()) {
+ auto& row = reader->GetRow();
+ writer.OnListItem();
+ visitor.Visit(row);
+ }
+ downloadStream.Flush();
+ return tmpFile;
+ } catch (...) {
+ return TError{CurrentExceptionMessage()};
+ }
+ }
+
+ TMaybe<TError> Upload(const TYtTableRef& ytTable, IInputStream& tableContent, const TClusterConnection& clusterConnection) override {
+ try {
+ if (!ClusterConnections_.contains(ytTable.Cluster)) {
+ ClusterConnections_[ytTable.Cluster] = clusterConnection;
+ }
+ auto client = CreateClient(ClusterConnections_[ytTable.Cluster]);
+ auto transaction = client->AttachTransaction(GetGuid(clusterConnection.TransactionId));
+ auto options = NYT::TCreateOptions().Recursive(true).IgnoreExisting(true);
+ transaction->Create("//" + ytTable.Path, NYT::NT_TABLE, options);
+ auto path = NYT::TRichYPath("//" + ytTable.Path).Append(true);
+ auto ytWriter = transaction->CreateTableWriter<NYT::TNode>(path);
+
+ TYtFmrRowConsumer builder(ytWriter);
+ NYson::TYsonParser parser(&builder, &tableContent, NYT::NYson::EYsonType::ListFragment);
+ parser.Parse();
+ builder.Flush();
+ return Nothing();
+ } catch (...) {
+ return TError{CurrentExceptionMessage()};
+ }
+ }
+
+private:
+ std::unordered_map<TString, TClusterConnection> ClusterConnections_;
+
+ NYT::IClientPtr CreateClient(const TClusterConnection& clusterConnection) {
+ NYT::TCreateClientOptions createOpts;
+ auto token = clusterConnection.Token;
+ if (token) {
+ createOpts.Token(*token);
+ }
+ return NYT::CreateClient(clusterConnection.YtServerName, createOpts);
+ }
+};
+
+} // namespace
+
+IYtService::TPtr MakeFmrYtSerivce() {
+ return MakeIntrusive<TFmrYtService>();
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h
new file mode 100644
index 0000000000..90d2a69430
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h
@@ -0,0 +1,10 @@
+#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h>
+
+namespace NYql::NFmr {
+
+struct TFmrYtSerivceSettings {
+};
+
+IYtService::TPtr MakeFmrYtSerivce();
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/ya.make b/yt/yql/providers/yt/fmr/yt_service/interface/ya.make
new file mode 100644
index 0000000000..6ea8e94c4f
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_service/interface/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_yt_service.cpp
+)
+
+PEERDIR(
+ yt/yql/providers/yt/fmr/request_options
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.cpp b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.cpp
new file mode 100644
index 0000000000..fbcafbc3a2
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.cpp
@@ -0,0 +1 @@
+#include "yql_yt_yt_service.h"
diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h
new file mode 100644
index 0000000000..7d3e59b523
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <util/system/tempfile.h>
+#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
+
+namespace NYql::NFmr {
+
+class IYtService: public TThrRefBase {
+public:
+ virtual ~IYtService() = default;
+
+ using TPtr = TIntrusivePtr<IYtService>;
+
+ virtual std::variant<THolder<TTempFileHandle>, TError> Download(const TYtTableRef& ytTable, const TClusterConnection& clusterConnection = TClusterConnection()) = 0;
+
+ virtual TMaybe<TError> Upload(const TYtTableRef& ytTable, IInputStream& tableContent, const TClusterConnection& clusterConnection = TClusterConnection()) = 0;
+};
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_service/mock/ya.make b/yt/yql/providers/yt/fmr/yt_service/mock/ya.make
new file mode 100644
index 0000000000..707812d60c
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_service/mock/ya.make
@@ -0,0 +1,17 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_yt_service_mock.h
+)
+
+PEERDIR(
+ yt/yql/providers/yt/fmr/yt_service/interface
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h b/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h
new file mode 100644
index 0000000000..7d5c4d1076
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h
@@ -0,0 +1,84 @@
+#pragma once
+
+#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h>
+
+namespace NYql::NFmr {
+
+namespace {
+
+class TYtUploadedTablesMock: public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<TYtUploadedTablesMock>;
+
+ ~TYtUploadedTablesMock() = default;
+
+ void AddTable(const TYtTableRef& ytTable, const TString& tableContent) {
+ TString key = GetTableKey(ytTable);
+ UploadedTables_[key] = tableContent;
+ }
+ bool Contains(const TYtTableRef& ytTable) const {
+ TString key = GetTableKey(ytTable);
+ return UploadedTables_.find(key) != UploadedTables_.end();
+ }
+ const TString& GetTableContent(const TYtTableRef& ytTable) const {
+ TString key = GetTableKey(ytTable);
+ return UploadedTables_.at(key);
+ }
+ void Clear() {
+ UploadedTables_.clear();
+ }
+ bool IsEmpty() const {
+ return UploadedTables_.empty();
+ }
+ int UploadedTablesNum() const {
+ return UploadedTables_.size();
+ }
+
+private:
+ TString GetTableKey(const TYtTableRef& ytTable) const {
+ return ytTable.Cluster + ":" + ytTable.Path;
+ }
+ std::unordered_map<TString, TString> UploadedTables_;
+};
+
+
+class TYtServiceMock: public NYql::NFmr::IYtService {
+public:
+
+ TYtServiceMock(TYtUploadedTablesMock::TPtr ytUploadedTablesMock)
+ : YtUploadedTablesMock_(ytUploadedTablesMock)
+ {
+ }
+
+ std::variant<THolder<TTempFileHandle>, TError> Download(const TYtTableRef& ytTable, const TClusterConnection& /*clusterConnection*/) override {
+ if (!YtUploadedTablesMock_->Contains(ytTable)) {
+ return TError("Table not found");
+ }
+ auto tmpFile = MakeHolder<TTempFileHandle>();
+ TString tableContent = YtUploadedTablesMock_->GetTableContent(ytTable);
+ tmpFile->Write(tableContent.data(), tableContent.size());
+ return tmpFile;
+ }
+
+ TMaybe<TError> Upload(const TYtTableRef& ytTable, IInputStream& tableContent, const TClusterConnection& /*clusterConnection*/) override {
+ YtUploadedTablesMock_->AddTable(ytTable, tableContent.ReadAll());
+ return Nothing();
+ }
+
+private:
+ TYtUploadedTablesMock::TPtr YtUploadedTablesMock_;
+};
+
+} // namespace
+
+IYtService::TPtr MakeYtServiceMock(TYtUploadedTablesMock::TPtr ytUploadedTablesMock) {
+ return MakeIntrusive<TYtServiceMock>(ytUploadedTablesMock);
+}
+
+TYtUploadedTablesMock::TPtr MakeYtUploadedTablesMock() {
+ return MakeIntrusive<TYtUploadedTablesMock>();
+}
+
+} // namespace NYql::NFmr
+
+// TODO - move this to .cpp file
diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp
index b0e53f5cb7..3a1e127f3d 100644
--- a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp
+++ b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp
@@ -1583,6 +1583,10 @@ private:
return res;
}
+ TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& /*options*/) override {
+ ythrow yexception() << "GetClusterConnection should not be called for file gateway";
+ }
+
private:
TYtFileServices::TPtr Services_;
diff --git a/yt/yql/providers/yt/gateway/fmr/ya.make b/yt/yql/providers/yt/gateway/fmr/ya.make
index bfc91df2f5..c424b95d0b 100644
--- a/yt/yql/providers/yt/gateway/fmr/ya.make
+++ b/yt/yql/providers/yt/gateway/fmr/ya.make
@@ -6,8 +6,12 @@ SRCS(
PEERDIR(
yql/essentials/utils/log
+ yt/cpp/mapreduce/client
+ yt/yql/providers/yt/gateway/lib
+ yt/yql/providers/yt/gateway/native
yt/yql/providers/yt/expr_nodes
yt/yql/providers/yt/fmr/coordinator/interface
+ yt/yql/providers/yt/lib/config_clusters
yt/yql/providers/yt/provider
)
diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
index db00f2ff59..76a907a5c6 100644
--- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
+++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
@@ -2,7 +2,10 @@
#include <thread>
+#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
+#include <yt/yql/providers/yt/gateway/lib/yt_helpers.h>
+#include <yt/yql/providers/yt/gateway/native/yql_yt_native.h>
#include <yt/yql/providers/yt/provider/yql_yt_helpers.h>
#include <yql/essentials/utils/log/log.h>
@@ -40,7 +43,7 @@ public:
with_lock(SessionStates_->Mutex) {
auto checkOperationStatuses = [&] <typename T> (std::unordered_map<TString, TPromise<T>>& operationStatuses, const TString& sessionId) {
for (auto& [operationId, promise]: operationStatuses) {
- YQL_CLOG(DEBUG, FastMapReduce) << "Sending get operation request to coordinator with operationId: " << operationId;
+ YQL_CLOG(TRACE, FastMapReduce) << "Sending get operation request to coordinator with operationId: " << operationId;
auto getOperationFuture = Coordinator_->GetOperation({operationId});
getOperationFuture.Subscribe([&, operationId, sessionId] (const auto& getFuture) {
@@ -96,10 +99,22 @@ public:
return Slave_->Publish(node, ctx, std::move(options));
}
auto publish = TYtPublish(node);
+ TString sessionId = options.SessionId();
auto cluster = publish.DataSink().Cluster().StringValue();
+ auto token = options.Config()->Auth.Get();
+ TString transformedInputPath;
+ TString userName = GetUsername(sessionId);
+ for (auto out: publish.Input()) {
+ auto outTable = GetOutTable(out).Cast<TYtOutTable>();
+ TStringBuf inputPath = outTable.Name().Value();
+ transformedInputPath = NYql::TransformPath(GetTablesTmpFolder(*options.Config()), inputPath, true, userName);
+ break;
+ }
+
+ // TODO - handle several inputs in Publish, use ColumnGroups, Run Merge
+
auto outputPath = publish.Publish().Name().StringValue();
- auto transactionId = GenerateId();
auto idempotencyKey = GenerateId();
auto fmrTableId = cluster + "." + outputPath;
@@ -107,15 +122,14 @@ public:
TFuture<TDownloadTableToFmrResult> downloadToFmrFuture;
TFuture<void> downloadedSuccessfully;
- TString sessionId = options.SessionId();
-
with_lock(SessionStates_->Mutex) {
auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses;
if (!tablePresenceStatuses.contains(fmrTableId)) {
- TYtTableRef ytTable{.Path = outputPath, .Cluster = cluster, .TransactionId = transactionId};
+ TYtTableRef ytTable{.Path = transformedInputPath, .Cluster = cluster};
+ TFmrTableRef fmrTable{.TableId = fmrTableId};
tablePresenceStatuses[fmrTableId] = ETablePresenceStatus::Both;
- downloadToFmrFuture = DownloadToFmrTableDataSerivce(ytTable, sessionId);
+ downloadToFmrFuture = DownloadToFmrTableDataSerivce(ytTable, fmrTable, sessionId, options.Config());
downloadedSuccessfully = downloadToFmrFuture.Apply([downloadedSuccessfully] (auto& downloadFuture) {
auto downloadResult = downloadFuture.GetValueSync();
});
@@ -125,15 +139,27 @@ public:
}
downloadedSuccessfully.Wait(); // blocking until download to fmr finishes
- YQL_CLOG(INFO, FastMapReduce) << "Uploading table with cluster " << cluster << " and path " << outputPath << " from fmr to yt";
-
TUploadTaskParams uploadTaskParams{
.Input = TFmrTableRef{fmrTableId},
- .Output = TYtTableRef{outputPath, cluster, transactionId}
+ .Output = TYtTableRef{outputPath, cluster}
};
+ auto clusterConnectionOptions = TClusterConnectionOptions(options.SessionId())
+ .Cluster(cluster).Config(options.Config());
+ auto clusterConnection = GetClusterConnection(std::move(clusterConnectionOptions));
+ YQL_ENSURE(clusterConnection.Success());
+
TStartOperationRequest uploadRequest{
- .TaskType = ETaskType::Upload, .TaskParams = uploadTaskParams, .SessionId = sessionId, .IdempotencyKey=idempotencyKey, .NumRetries=1
+ .TaskType = ETaskType::Upload,
+ .TaskParams = uploadTaskParams,
+ .SessionId = sessionId,
+ .IdempotencyKey=idempotencyKey,
+ .NumRetries=1,
+ .ClusterConnection = TClusterConnection{
+ .TransactionId = clusterConnection.TransactionId,
+ .YtServerName = clusterConnection.YtServerName,
+ .Token = clusterConnection.Token
+ }
};
auto promise = NewPromise<TPublishResult>();
@@ -155,19 +181,34 @@ public:
return future;
}
- TFuture<TDownloadTableToFmrResult> DownloadToFmrTableDataSerivce(const TYtTableRef& ytTableRef, const TString& sessionId) {
+ TFuture<TDownloadTableToFmrResult> DownloadToFmrTableDataSerivce(
+ const TYtTableRef& ytTableRef, const TFmrTableRef& fmrTableRef, const TString& sessionId, TYtSettings::TConstPtr& config)
+ {
YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
- TString fmrTableId = ytTableRef.Cluster + "." + ytTableRef.Path;
+ TString fmrTableId = fmrTableRef.TableId;
TDownloadTaskParams downloadTaskParams{
.Input = ytTableRef,
.Output = {fmrTableId}
};
auto idempotencyKey = GenerateId();
+ auto clusterConnectionOptions = TClusterConnectionOptions(sessionId)
+ .Cluster(ytTableRef.Cluster).Config(config);
+ auto clusterConnection = GetClusterConnection(std::move(clusterConnectionOptions));
+ YQL_ENSURE(clusterConnection.Success());
TStartOperationRequest downloadRequest{
- .TaskType = ETaskType::Download, .TaskParams = downloadTaskParams, .SessionId = sessionId, .IdempotencyKey=idempotencyKey, .NumRetries=1
+ .TaskType = ETaskType::Download,
+ .TaskParams = downloadTaskParams,
+ .SessionId = sessionId,
+ .IdempotencyKey = idempotencyKey,
+ .NumRetries=1,
+ .ClusterConnection = TClusterConnection{
+ .TransactionId = clusterConnection.TransactionId,
+ .YtServerName = clusterConnection.YtServerName,
+ .Token = clusterConnection.Token
+ }
};
- YQL_CLOG(DEBUG, FastMapReduce) << "Starting download to from yt table: " << fmrTableId;
+ YQL_CLOG(DEBUG, FastMapReduce) << "Starting download from yt table: " << fmrTableId;
auto promise = NewPromise<TDownloadTableToFmrResult>();
auto future = promise.GetFuture();
@@ -186,22 +227,24 @@ public:
return future;
}
- void OpenSession(TOpenSessionOptions&& options) final {
- Slave_->OpenSession(std::move(options));
+ TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) override {
+ return Slave_->GetClusterConnection(std::move(options));
+ }
+ void OpenSession(TOpenSessionOptions&& options) final {
TString sessionId = options.SessionId();
YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
with_lock(SessionStates_->Mutex) {
- auto sessions = SessionStates_->Sessions;
+ auto& sessions = SessionStates_->Sessions;
if (sessions.contains(sessionId)) {
YQL_LOG_CTX_THROW yexception() << "Session already exists: " << sessionId;
}
- sessions[sessionId] = TSessionInfo();
+ sessions[sessionId] = TSessionInfo{.UserName = options.UserName()};
}
+ Slave_->OpenSession(std::move(options));
}
TFuture<void> CloseSession(TCloseSessionOptions&& options) final {
- Slave_->CloseSession(std::move(options)).Wait();
YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
with_lock(SessionStates_->Mutex) {
@@ -211,11 +254,11 @@ public:
sessions.erase(it);
}
}
+ Slave_->CloseSession(std::move(options)).Wait();
return MakeFuture();
}
TFuture<void> CleanupSession(TCleanupSessionOptions&& options) final {
- Slave_->CleanupSession(std::move(options)).Wait();
YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
TString sessionId = options.SessionId();
@@ -239,7 +282,7 @@ public:
cancelOperationsFunc(operationStates.DownloadOperationStatuses);
cancelOperationsFunc(operationStates.UploadOperationStatuses);
}
-
+ Slave_->CleanupSession(std::move(options)).Wait();
return MakeFuture();
}
@@ -252,6 +295,7 @@ private:
struct TSessionInfo {
TFmrGatewayOperationsState OperationStates;
std::unordered_map<TString, ETablePresenceStatus> TablePresenceStatuses; // yt cluster and path -> is it In Yt, Fmr TableDataService
+ TString UserName;
};
struct TSession {
@@ -282,6 +326,14 @@ private:
}
promise.SetValue(commonOperationResult);
}
+
+ TString GetUsername(const TString& sessionId) {
+ with_lock(SessionStates_->Mutex) {
+ YQL_ENSURE(SessionStates_->Sessions.contains(sessionId));
+ auto& session = SessionStates_->Sessions[sessionId];
+ return session.UserName;
+ }
+ }
};
} // namespace
diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h
index 769bb24fe5..6d102e57ca 100644
--- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h
+++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h
@@ -6,17 +6,14 @@
namespace NYql::NFmr {
struct TFmrYtGatewaySettings {
- TIntrusivePtr<IRandomProvider> RandomProvider;
- TDuration TimeToSleepBetweenGetOperationRequests;
+ TIntrusivePtr<IRandomProvider> RandomProvider = CreateDefaultRandomProvider();
+ TDuration TimeToSleepBetweenGetOperationRequests = TDuration::Seconds(1);
};
IYtGateway::TPtr CreateYtFmrGateway(
IYtGateway::TPtr slave,
IFmrCoordinator::TPtr coordinator = nullptr,
- const TFmrYtGatewaySettings& settings = TFmrYtGatewaySettings{
- .RandomProvider = CreateDefaultRandomProvider(),
- .TimeToSleepBetweenGetOperationRequests = TDuration::Seconds(1)
- }
+ const TFmrYtGatewaySettings& settings = TFmrYtGatewaySettings{}
);
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
index 40df6077ef..196ff9a1c3 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -5698,6 +5698,22 @@ private:
return ctx;
}
+ TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) override {
+ try {
+ auto session = GetSession(options.SessionId(), true);
+ auto ytServer = Clusters_->GetServer(options.Cluster());
+ auto entry = session->TxCache_.GetEntry(ytServer);
+ TClusterConnectionResult clusterConnectionResult{};
+ clusterConnectionResult.TransactionId = GetGuidAsString(entry->Tx->GetId());
+ clusterConnectionResult.YtServerName = ytServer;
+ clusterConnectionResult.Token = options.Config()->Auth.Get();
+ clusterConnectionResult.SetSuccess();
+ return clusterConnectionResult;
+ } catch (...) {
+ return ResultFromCurrentException<TClusterConnectionResult>({}, true);
+ }
+ }
+
private:
const TYtNativeServices Services_;
const TConfigClusters::TPtr Clusters_;
diff --git a/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp b/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp
index 5ad80977e9..3a9bd3272b 100644
--- a/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp
+++ b/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp
@@ -961,6 +961,10 @@ public:
return Inner_->AddCluster(cluster);
}
+ TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) override {
+ return Inner_->GetClusterConnection(std::move(options));
+ }
+
private:
const IYtGateway::TPtr Inner_;
const TQContext QContext_;
diff --git a/yt/yql/providers/yt/provider/yql_yt_gateway.h b/yt/yql/providers/yt/provider/yql_yt_gateway.h
index b331f2142c..a98cd1d877 100644
--- a/yt/yql/providers/yt/provider/yql_yt_gateway.h
+++ b/yt/yql/providers/yt/provider/yql_yt_gateway.h
@@ -613,6 +613,24 @@ public:
struct TUploadTableResult: public NCommon::TOperationResult {
};
+ struct TClusterConnectionOptions: public TCommonOptions {
+ using TSelf = TClusterConnectionOptions;
+
+ TClusterConnectionOptions(const TString& sessionId)
+ : TCommonOptions(sessionId)
+ {
+ }
+
+ OPTION_FIELD(TString, Cluster)
+ OPTION_FIELD(TYtSettings::TConstPtr, Config)
+ };
+
+ struct TClusterConnectionResult: public NCommon::TOperationResult {
+ TString TransactionId;
+ TString YtServerName;
+ TMaybe<TString> Token;
+ };
+
public:
virtual ~IYtGateway() = default;
@@ -673,6 +691,8 @@ public:
virtual TGetTablePartitionsResult GetTablePartitions(TGetTablePartitionsOptions&& options) = 0;
virtual void AddCluster(const TYtClusterConfig& cluster) = 0;
+
+ virtual TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) = 0;
};
}
diff --git a/yt/yql/tools/ytrun/lib/ya.make b/yt/yql/tools/ytrun/lib/ya.make
index 007272ae8e..8f04604931 100644
--- a/yt/yql/tools/ytrun/lib/ya.make
+++ b/yt/yql/tools/ytrun/lib/ya.make
@@ -8,8 +8,11 @@ PEERDIR(
yt/yql/providers/yt/provider
yt/yql/providers/yt/fmr/coordinator/client
yt/yql/providers/yt/fmr/coordinator/impl
+ yt/yql/providers/yt/fmr/job/impl
yt/yql/providers/yt/fmr/job_factory/impl
+ yt/yql/providers/yt/fmr/table_data_service/local
yt/yql/providers/yt/fmr/worker/impl
+ yt/yql/providers/yt/fmr/yt_service/impl
yt/yql/providers/yt/gateway/native
yt/yql/providers/yt/gateway/fmr
yt/yql/providers/yt/lib/config_clusters
diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
index dcd374a2c3..724a8d72e1 100644
--- a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
+++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
@@ -10,7 +10,10 @@
#include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h>
#include <yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h>
#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
+#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
+#include <yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h>
+#include <yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h>
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
#include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
#include <yql/essentials/core/services/yql_transform_pipeline.h>
@@ -197,13 +200,12 @@ IYtGateway::TPtr TYtRunTool::CreateYtGateway() {
}
if (!DisableLocalFmrWorker_) {
- auto func = [&] (NFmr::TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
- while (!cancelFlag->load()) {
- Sleep(TDuration::Seconds(3));
- return NFmr::ETaskStatus::Completed;
- }
- return NFmr::ETaskStatus::Failed;
- }; // TODO - use function which actually calls Downloader/Uploader based on task params
+ auto tableDataService = MakeLocalTableDataService(NFmr::TLocalTableDataServiceSettings(3));
+ auto fmrYtSerivce = NFmr::MakeFmrYtSerivce();
+
+ auto func = [tableDataService, fmrYtSerivce] (NFmr::TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) mutable {
+ return NFmr::RunJob(task, tableDataService, fmrYtSerivce, cancelFlag);
+ };
NFmr::TFmrJobFactorySettings settings{.Function=func};
auto jobFactory = MakeFmrJobFactory(settings);