diff options
author | cdzyura171 <cdzyura171@yandex-team.com> | 2025-02-27 20:07:44 +0300 |
---|---|---|
committer | cdzyura171 <cdzyura171@yandex-team.com> | 2025-02-27 20:37:45 +0300 |
commit | 9ae9b76b16407d250878aafff784174478f1b769 (patch) | |
tree | 762a102ab583bc0ee20a8d13c6a4d9c3acdb1844 /yt | |
parent | b2829f5d95e52ba54c76f0c6358c21ff9e409290 (diff) | |
download | ydb-9ae9b76b16407d250878aafff784174478f1b769.tar.gz |
Create FmrYtService and use it in Gateway
Create FmrYtService and use it in Gateway
commit_hash:8478e1395da6e8937382c791fa834e89db4b610e
Diffstat (limited to 'yt')
44 files changed, 1286 insertions, 49 deletions
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp index 36d7020adc..495057d550 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp @@ -45,12 +45,17 @@ private: TDownloadTaskParams downloadTaskParams{ - .Input = TYtTableRef{"Path","Cluster","TransactionId"}, + .Input = TYtTableRef{"Path","Cluster"}, .Output = TFmrTableRef{"TableId"} }; TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Download, TTaskParams taskParams = downloadTaskParams) { - return TStartOperationRequest{.TaskType = taskType, .TaskParams = taskParams, .SessionId = "SessionId", .IdempotencyKey = "IdempotencyKey"}; + return TStartOperationRequest{ + .TaskType = taskType, + .TaskParams = taskParams, + .IdempotencyKey = "IdempotencyKey", + .ClusterConnection = TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"} + }; } std::vector<TStartOperationRequest> CreateSeveralOperationRequests( @@ -59,7 +64,10 @@ std::vector<TStartOperationRequest> CreateSeveralOperationRequests( std::vector<TStartOperationRequest> startOperationRequests(numRequests); for (int i = 0; i < numRequests; ++i) { startOperationRequests[i] = TStartOperationRequest{ - .TaskType = taskType, .TaskParams = taskParams, .IdempotencyKey = "IdempotencyKey_" + ToString(i) + .TaskType = taskType, + .TaskParams = taskParams, + .IdempotencyKey = "IdempotencyKey_" + ToString(i), + .ClusterConnection = TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn", .Token = "token"} }; } return startOperationRequests; diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp index bec951c180..bd73813a43 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp @@ -57,7 +57,7 @@ public: } TString taskId = GenerateId(); - TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, request.TaskParams, request.SessionId); + TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, request.TaskParams, request.SessionId, request.ClusterConnection); Tasks_[taskId] = TCoordinatorTaskInfo{.Task = createdTask, .TaskStatus = ETaskStatus::Accepted, .OperationId = operationId}; diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp index 85f7d5834a..69908b3ac0 100644 --- a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp @@ -68,6 +68,8 @@ NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperatio protoStartOperationRequest.SetIdempotencyKey(*startOperationRequest.IdempotencyKey); } protoStartOperationRequest.SetNumRetries(startOperationRequest.NumRetries); + auto protoClusterConnection = ClusterConnectionToProto(startOperationRequest.ClusterConnection); + protoStartOperationRequest.MutableClusterConnection()->Swap(&protoClusterConnection); return protoStartOperationRequest; } @@ -80,6 +82,7 @@ TStartOperationRequest StartOperationRequestFromProto(const NProto::TStartOperat startOperationRequest.IdempotencyKey = protoStartOperationRequest.GetIdempotencyKey(); } startOperationRequest.NumRetries = protoStartOperationRequest.GetNumRetries(); + startOperationRequest.ClusterConnection = ClusterConnectionFromProto(protoStartOperationRequest.GetClusterConnection()); return startOperationRequest; } diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h index 8be5d757c2..ba9592de9a 100644 --- a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h +++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h @@ -26,6 +26,7 @@ struct TStartOperationRequest { TString SessionId; TMaybe<TString> IdempotencyKey = Nothing(); ui32 NumRetries = 1; // Not supported yet + TClusterConnection ClusterConnection = {}; }; struct TStartOperationResponse { diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make new file mode 100644 index 0000000000..b2fb4a8853 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make @@ -0,0 +1,16 @@ +UNITTEST() + +SRCS( + yql_yt_job_ut.cpp +) + +PEERDIR( + yt/yql/providers/yt/fmr/job/impl + yt/yql/providers/yt/fmr/yt_service/mock + yt/yql/providers/yt/fmr/table_data_service/local + yql/essentials/utils/log +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp new file mode 100644 index 0000000000..053a106ea5 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp @@ -0,0 +1,318 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h> +#include <yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h> +#include <yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h> + +namespace NYql::NFmr { + +Y_UNIT_TEST_SUITE(FmrJobTests) { + Y_UNIT_TEST(DownloadTable) { + TString tableContent = + "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); + NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); + std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + + TYtTableRef input = TYtTableRef("test_cluster", "test_path"); + TFmrTableRef output = TFmrTableRef("test_table_id"); + TDownloadTaskParams params = TDownloadTaskParams(input, output); + + ytUploadedTablesMock->AddTable(input, tableContent); + + auto err = job->Download(params); + + UNIT_ASSERT_C(!err,err.GetRef()); + UNIT_ASSERT_NO_DIFF(tableDataServicePtr->Get("test_table_id").GetValueSync().GetRef(), tableContent); + } + + Y_UNIT_TEST(UploadTable) { + TString ytTableContent = + "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); + NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); + std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + + TYtTableRef output = TYtTableRef("test_cluster", "test_path"); + TFmrTableRef input = TFmrTableRef("test_table_id"); + TUploadTaskParams params = TUploadTaskParams(input, output); + + tableDataServicePtr->Put(input.TableId, ytTableContent); + + auto err = job->Upload(params); + + UNIT_ASSERT_C(!err,err.GetRef()); + UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), ytTableContent); + } + + Y_UNIT_TEST(MergeFmrTables) { + TString TableContent_1 = + "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)"; + TString TableContent_2 = + "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; + TString TableContent_3 = + "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); + NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); + std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + + TFmrTableRef input_1 = TFmrTableRef("test_table_id_1"); + TFmrTableRef input_2 = TFmrTableRef("test_table_id_2"); + TFmrTableRef input_3 = TFmrTableRef("test_table_id_3"); + TTableRef input_table_ref_1 = {input_1}; + TTableRef input_table_ref_2 = {input_2}; + TTableRef input_table_ref_3 = {input_3}; + TFmrTableRef output = TFmrTableRef("test_table_id_output"); + std::vector<TTableRef> inputs = {input_table_ref_1, input_table_ref_2, input_table_ref_3}; + TMergeTaskParams params = TMergeTaskParams(inputs, output); + + tableDataServicePtr->Put(input_1.TableId, TableContent_1); + tableDataServicePtr->Put(input_2.TableId, TableContent_2); + tableDataServicePtr->Put(input_3.TableId, TableContent_3); + + auto err = job->Merge(params); + + UNIT_ASSERT_C(!err,err.GetRef()); + UNIT_ASSERT_NO_DIFF(tableDataServicePtr->Get(output.TableId).GetValueSync().GetRef(), TableContent_1 + TableContent_2 + TableContent_3); + } + + Y_UNIT_TEST(MergeMixedTables) { + TString TableContent_1 = + "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)"; + TString TableContent_2 = + "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; + TString TableContent_3 = + "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); + NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); + std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + + TFmrTableRef input_1 = TFmrTableRef("test_table_id_1"); + TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster"); + TFmrTableRef input_3 = TFmrTableRef("test_table_id_3"); + TTableRef input_table_ref_1 = {input_1}; + TTableRef input_table_ref_2 = {input_2}; + TTableRef input_table_ref_3 = {input_3}; + TFmrTableRef output = TFmrTableRef("test_table_id_output"); + std::vector<TTableRef> inputs = {input_table_ref_1, input_table_ref_2, input_table_ref_3}; + TMergeTaskParams params = TMergeTaskParams(inputs, output); + + tableDataServicePtr->Put(input_1.TableId, TableContent_1); + ytUploadedTablesMock->AddTable(input_2, TableContent_2); + tableDataServicePtr->Put(input_3.TableId, TableContent_3); + + auto err = job->Merge(params); + + UNIT_ASSERT_C(!err,err.GetRef()); + UNIT_ASSERT_NO_DIFF(tableDataServicePtr->Get(output.TableId).GetValueSync().GetRef(), TableContent_1 + TableContent_2 + TableContent_3); + } +} + +Y_UNIT_TEST_SUITE(TaskRunTests) { + Y_UNIT_TEST(RunDownloadTask) { + TString ytTableContent = + "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}"; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); + NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); + std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); + + TYtTableRef input = TYtTableRef("test_cluster", "test_path"); + TFmrTableRef output = TFmrTableRef("test_table_id"); + + ytUploadedTablesMock->AddTable(input, ytTableContent); + TDownloadTaskParams params = TDownloadTaskParams(input, output); + TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id"); + + + ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag); + + UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed); + UNIT_ASSERT_NO_DIFF(tableDataServicePtr->Get("test_table_id").GetValueSync().GetRef(), ytTableContent); + } + + Y_UNIT_TEST(RunUploadTask) { + TString ytTableContent = + "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}"; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); + NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); + std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); + + TFmrTableRef input = TFmrTableRef("test_table_id"); + TYtTableRef output = TYtTableRef("test_cluster", "test_path"); + + TUploadTaskParams params = TUploadTaskParams(input, output); + TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); + + tableDataServicePtr->Put(input.TableId, ytTableContent); + + ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag); + + UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed); + UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), ytTableContent); + } + + Y_UNIT_TEST(RunUploadTaskWithNoTable) { + TString ytTableContent = + "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}"; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); + NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); + std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); + + TFmrTableRef input = TFmrTableRef("test_table_id"); + TYtTableRef output = TYtTableRef("test_cluster", "test_path"); + + TUploadTaskParams params = TUploadTaskParams(input, output); + TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); + + // No table in tableDataServicePtr + // tableDataServicePtr->Put(input.TableId, ytTableContent); + + ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag); + + UNIT_ASSERT_EQUAL(status, ETaskStatus::Failed); + UNIT_ASSERT(ytUploadedTablesMock->IsEmpty()); + } + + Y_UNIT_TEST(RunMergeTask) { + TString TableContent_1 = + "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)"; + TString TableContent_2 = + "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; + TString TableContent_3 = + "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); + NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); + std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); + + TFmrTableRef input_1 = TFmrTableRef("test_table_id_1"); + TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster"); + TFmrTableRef input_3 = TFmrTableRef("test_table_id_3"); + TTableRef input_table_ref_1 = {input_1}; + TTableRef input_table_ref_2 = {input_2}; + TTableRef input_table_ref_3 = {input_3}; + TFmrTableRef output = TFmrTableRef("test_table_id_output"); + std::vector<TTableRef> inputs = {input_table_ref_1, input_table_ref_2, input_table_ref_3}; + TMergeTaskParams params = TMergeTaskParams(inputs, output); + + TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); + + tableDataServicePtr->Put(input_1.TableId, TableContent_1); + ytUploadedTablesMock->AddTable(input_2, TableContent_2); + tableDataServicePtr->Put(input_3.TableId, TableContent_3); + + ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag); + + UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed); + UNIT_ASSERT_NO_DIFF(tableDataServicePtr->Get( + output.TableId).GetValueSync().GetRef(), + TableContent_1 + TableContent_2 + TableContent_3 + ); + } + + Y_UNIT_TEST(RunMergeTaskWithNoTable) { + TString TableContent_1 = + "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)"; + TString TableContent_2 = + "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; + TString TableContent_3 = + "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); + NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); + std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); + + TFmrTableRef input_1 = TFmrTableRef("test_table_id_1"); + TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster"); + TFmrTableRef input_3 = TFmrTableRef("test_table_id_3"); + TTableRef input_table_ref_1 = {input_1}; + TTableRef input_table_ref_2 = {input_2}; + TTableRef input_table_ref_3 = {input_3}; + TFmrTableRef output = TFmrTableRef("test_table_id_output"); + std::vector<TTableRef> inputs = {input_table_ref_1, input_table_ref_2, input_table_ref_3}; + TMergeTaskParams params = TMergeTaskParams(inputs, output); + + TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); + + tableDataServicePtr->Put(input_1.TableId, TableContent_1); + // No table in Yt + // ytUploadedTablesMock->AddTable(input_2, TableContent_2); + tableDataServicePtr->Put(input_3.TableId, TableContent_3); + + ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag); + UNIT_ASSERT_EQUAL(status, ETaskStatus::Failed); + UNIT_ASSERT(!tableDataServicePtr->Get(output.TableId).GetValueSync()); + } +} + +} // namespace NYql diff --git a/yt/yql/providers/yt/fmr/job/impl/ya.make b/yt/yql/providers/yt/fmr/job/impl/ya.make new file mode 100644 index 0000000000..4a647bd58a --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/ya.make @@ -0,0 +1,19 @@ +LIBRARY() + +SRCS( + yql_yt_job_impl.cpp +) + +PEERDIR( + library/cpp/threading/future + yt/yql/providers/yt/fmr/job/interface + yt/yql/providers/yt/fmr/request_options +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp new file mode 100644 index 0000000000..c672d6d6b9 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp @@ -0,0 +1,166 @@ +#include <library/cpp/threading/future/core/future.h> + +#include <util/stream/file.h> + +#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h> +#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> +#include <yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h> +#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h> + +#include <yql/essentials/utils/log/log.h> + +namespace NYql::NFmr { + +class TFmrJob: public IFmrJob { +public: + + TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag) + : TableDataService_(tableDataService), YtService_(ytService), CancelFlag_(cancelFlag) + { + } + + virtual TMaybe<TString> Download(const TDownloadTaskParams& params, const TClusterConnection& clusterConnection) override { // вынести в приватный метод для переиспользования + try { + const auto ytTable = params.Input; + const auto cluster = params.Input.Cluster; + const auto path = params.Input.Path; + const auto tableId = params.Output.TableId; + + YQL_CLOG(DEBUG, FastMapReduce) << "Downloading " << cluster << '.' << path; + + auto res = GetYtTableContent(ytTable, clusterConnection); + auto err = std::get_if<TError>(&res); + if (err) { + return err->ErrorMessage; + } + auto tableContent = std::get<TString>(res); + TableDataService_->Put(tableId, tableContent).Wait(); + } catch (...) { + return CurrentExceptionMessage(); + } + + return Nothing(); + } + + virtual TMaybe<TString> Upload(const TUploadTaskParams& params, const TClusterConnection& clusterConnection) override { + const auto ytTable = params.Output; + const auto cluster = params.Output.Cluster; + const auto path = params.Output.Path; + const auto tableId = params.Input.TableId; + + YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path; + + TMaybe<TString> getResult = TableDataService_->Get(tableId).GetValueSync(); + + if (!getResult) { + YQL_CLOG(ERROR, FastMapReduce) << "Table " << tableId << " not found"; + return "Table not found"; + } + + TString tableContent = getResult.GetRef(); + TStringInput inputStream(tableContent); + + YtService_->Upload(ytTable, inputStream, clusterConnection); + + return Nothing(); + } + + virtual TMaybe<TString> Merge(const TMergeTaskParams& params, const TClusterConnection& clusterConnection) override { + const auto inputs = params.Input; + const auto output = params.Output; + + YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs"; + + TString mergedTableContent = ""; + + for (const auto& inputTableRef : inputs) { + if (CancelFlag_->load()) { + return "Canceled"; + } + auto res = GetTableContent(inputTableRef, clusterConnection); + + auto err = std::get_if<TError>(&res); + if (err) { + return err->ErrorMessage; + } + TString tableContent = std::get<TString>(res); + + mergedTableContent += tableContent; + } + + TableDataService_->Put(output.TableId, mergedTableContent).Wait(); + + return Nothing(); + } +private: + std::variant<TString, TError> GetTableContent(const TTableRef& tableRef, const TClusterConnection& clusterConnection) { + auto ytTable = std::get_if<TYtTableRef>(&tableRef); + auto fmrTable = std::get_if<TFmrTableRef>(&tableRef); + if (ytTable) { + return GetYtTableContent(*ytTable, clusterConnection); + } else if (fmrTable) { + return GetFmrTableContent(*fmrTable); + } else { + ythrow yexception() << "Unsupported table type"; + } + } + + std::variant<TString, TError> GetYtTableContent(const TYtTableRef& ytTable, const TClusterConnection& clusterConnection) { + auto res = YtService_->Download(ytTable, clusterConnection); + auto* err = std::get_if<TError>(&res); + if (err) { + return *err; + } + auto tableFile = std::get_if<THolder<TTempFileHandle>>(&res); + TFileInput inputStream(tableFile->Get()->Name()); + TString tableContent = inputStream.ReadAll(); + return tableContent; + } + + std::variant<TString, TError> GetFmrTableContent(const TFmrTableRef& fmrTable) { + auto res = TableDataService_->Get(fmrTable.TableId); + return res.GetValueSync().GetRef(); + } +private: + ITableDataService::TPtr TableDataService_; + IYtService::TPtr YtService_; + std::shared_ptr<std::atomic<bool>> CancelFlag_; +}; + +IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag) { + return MakeIntrusive<TFmrJob>(tableDataService, ytService, cancelFlag); +} + +ETaskStatus RunJob( + TTask::TPtr task, + ITableDataService::TPtr tableDataService, + IYtService::TPtr ytService, + std::shared_ptr<std::atomic<bool>> cancelFlag +) { + IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag); + + auto processTask = [job, task] (auto&& taskParams) { + using T = std::decay_t<decltype(taskParams)>; + + if constexpr (std::is_same_v<T, TUploadTaskParams>) { + return job->Upload(taskParams, task->ClusterConnection); + } else if constexpr (std::is_same_v<T, TDownloadTaskParams>) { + return job->Download(taskParams, task->ClusterConnection); + } else if constexpr (std::is_same_v<T, TMergeTaskParams>) { + return job->Merge(taskParams, task->ClusterConnection); + } else { + throw std::runtime_error{"Unsupported task type"}; + } + }; + + TMaybe<TString> taskResult = std::visit(processTask, task->TaskParams); + + if (taskResult.Defined()) { + YQL_CLOG(ERROR, FastMapReduce) << "Task failed: " << taskResult.GetRef(); + return ETaskStatus::Failed; + } + + return ETaskStatus::Completed; +}; + +} // namespace NYql diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h new file mode 100644 index 0000000000..28280ff0ce --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h @@ -0,0 +1,12 @@ +#include <yt/cpp/mapreduce/interface/fwd.h> +#include <yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h> +#include <yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h> +#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h> + +namespace NYql::NFmr { + +IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag); + +ETaskStatus RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag); + +} // namespace NYql diff --git a/yt/yql/providers/yt/fmr/job/interface/ya.make b/yt/yql/providers/yt/fmr/job/interface/ya.make new file mode 100644 index 0000000000..7d256622f7 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/interface/ya.make @@ -0,0 +1,9 @@ +LIBRARY() + +SRCS( + yql_yt_job.cpp +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.cpp b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.cpp new file mode 100644 index 0000000000..a80a31a1a9 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.cpp @@ -0,0 +1 @@ +#include "yql_yt_job.h" diff --git a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h new file mode 100644 index 0000000000..187ed21d3c --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h @@ -0,0 +1,20 @@ +#pragma once + +#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> + +namespace NYql::NFmr { + +class IFmrJob: public TThrRefBase { +public: + using TPtr = TIntrusivePtr<IFmrJob>; + + virtual ~IFmrJob() = default; + + virtual TMaybe<TString> Download(const TDownloadTaskParams& params, const TClusterConnection& clusterConnection = TClusterConnection()) = 0; + + virtual TMaybe<TString> Upload(const TUploadTaskParams& params, const TClusterConnection& clusterConnection = TClusterConnection()) = 0; + + virtual TMaybe<TString> Merge(const TMergeTaskParams& params, const TClusterConnection& clusterConnection = TClusterConnection()) = 0; +}; + +} // namespace NYql diff --git a/yt/yql/providers/yt/fmr/proto/coordinator.proto b/yt/yql/providers/yt/fmr/proto/coordinator.proto index 33990caa28..c76bb50fc3 100644 --- a/yt/yql/providers/yt/fmr/proto/coordinator.proto +++ b/yt/yql/providers/yt/fmr/proto/coordinator.proto @@ -22,6 +22,7 @@ message TStartOperationRequest { string SessionId = 3; optional string IdempotencyKey = 4; uint32 NumRetries = 5; + TClusterConnection ClusterConnection = 6; } message TStartOperationResponse { diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto index e37583a0d2..7cbd1eecf4 100644 --- a/yt/yql/providers/yt/fmr/proto/request_options.proto +++ b/yt/yql/providers/yt/fmr/proto/request_options.proto @@ -48,7 +48,6 @@ message TStatistics {} message TYtTableRef { string Path = 1; string Cluster = 2; - string TransactionId = 3; } message TFmrTableRef { @@ -85,12 +84,19 @@ message TTaskParams { } } +message TClusterConnection { + string TransactionId = 1; + string YtServerName = 2; + optional string Token = 3; +} + message TTask { ETaskType TaskType = 1; string TaskId = 2; TTaskParams TaskParams = 3; string SessionId = 4; optional uint32 NumRetries = 5; + TClusterConnection ClusterConnection = 6; } message TTaskState { diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp index 52cc0f3ad6..7a931909ac 100644 --- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp @@ -38,7 +38,6 @@ NProto::TYtTableRef YtTableRefToProto(const TYtTableRef& ytTableRef) { NProto::TYtTableRef protoYtTableRef; protoYtTableRef.SetPath(ytTableRef.Path); protoYtTableRef.SetCluster(ytTableRef.Cluster); - protoYtTableRef.SetTransactionId(ytTableRef.TransactionId); return protoYtTableRef; } @@ -46,7 +45,6 @@ TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef) { TYtTableRef ytTableRef; ytTableRef.Path = protoYtTableRef.GetPath(); ytTableRef.Cluster = protoYtTableRef.GetCluster(); - ytTableRef.TransactionId = protoYtTableRef.GetTransactionId(); return ytTableRef; } @@ -170,6 +168,26 @@ TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) { return taskParams; } +NProto::TClusterConnection ClusterConnectionToProto(const TClusterConnection& clusterConnection) { + NProto::TClusterConnection protoClusterConnection; + protoClusterConnection.SetTransactionId(clusterConnection.TransactionId); + protoClusterConnection.SetYtServerName(clusterConnection.YtServerName); + if (clusterConnection.Token) { + protoClusterConnection.SetToken(*clusterConnection.Token); + } + return protoClusterConnection; +} + +TClusterConnection ClusterConnectionFromProto(const NProto::TClusterConnection& protoClusterConnection) { + TClusterConnection clusterConnection{}; + clusterConnection.TransactionId = protoClusterConnection.GetTransactionId(); + clusterConnection.YtServerName = protoClusterConnection.GetYtServerName(); + if (protoClusterConnection.HasToken()) { + clusterConnection.Token = protoClusterConnection.GetToken(); + } + return clusterConnection; +} + NProto::TTask TaskToProto(const TTask& task) { NProto::TTask protoTask; protoTask.SetTaskType(static_cast<NProto::ETaskType>(task.TaskType)); @@ -178,6 +196,8 @@ NProto::TTask TaskToProto(const TTask& task) { protoTask.MutableTaskParams()->Swap(&taskParams); protoTask.SetSessionId(task.SessionId); protoTask.SetNumRetries(task.NumRetries); + auto clusterConnection = ClusterConnectionToProto(task.ClusterConnection); + protoTask.MutableClusterConnection()->Swap(&clusterConnection); return protoTask; } @@ -188,6 +208,7 @@ TTask TaskFromProto(const NProto::TTask& protoTask) { task.TaskParams = TaskParamsFromProto(protoTask.GetTaskParams()); task.SessionId = protoTask.GetSessionId(); task.NumRetries = protoTask.GetNumRetries(); + task.ClusterConnection = ClusterConnectionFromProto(protoTask.GetClusterConnection()); return task; } diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h index e886e91901..5108bbb0f3 100644 --- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h @@ -37,6 +37,10 @@ NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams); TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams); +NProto::TClusterConnection ClusterConnectionToProto(const TClusterConnection& clusterConnection); + +TClusterConnection ClusterConnectionFromProto(const NProto::TClusterConnection& protoClusterConnection); + NProto::TTask TaskToProto(const TTask& task); TTask TaskFromProto(const NProto::TTask& protoTask); diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp index c73791bee4..f6c31167c3 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp @@ -2,8 +2,8 @@ namespace NYql::NFmr { -TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId) { - return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId); +TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection) { + return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId, clusterConnection); } TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage) { diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h index b423021a71..de7c4eb435 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h @@ -54,7 +54,6 @@ struct TError { struct TYtTableRef { TString Path; TString Cluster; - TString TransactionId; }; struct TFmrTableRef { @@ -160,11 +159,19 @@ using TTaskParams = std::variant<TUploadTaskParams, TDownloadTaskParams, TMergeT using TTaskParamsNew = std::variant<TUploadTaskParamsNew, TDownloadTaskParamsNew, TMergeTaskParamsNew>; +struct TClusterConnection { + TString TransactionId; + TString YtServerName; + TMaybe<TString> Token; +}; + +using TTaskParams = std::variant<TUploadTaskParams, TDownloadTaskParams, TMergeTaskParams>; + struct TTask: public TThrRefBase { TTask() = default; - TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, ui32 numRetries = 1) - : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), NumRetries(numRetries) + TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection, ui32 numRetries = 1) + : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), ClusterConnection(clusterConnection), NumRetries(numRetries) { } @@ -172,6 +179,7 @@ struct TTask: public TThrRefBase { TString TaskId; TTaskParams TaskParams = {}; TString SessionId; + TClusterConnection ClusterConnection = {}; ui32 NumRetries; // Not supported yet using TPtr = TIntrusivePtr<TTask>; @@ -205,7 +213,7 @@ struct TTaskResult: public TThrRefBase { using TPtr = TIntrusivePtr<TTaskResult>; }; -TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId); +TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection = TClusterConnection{}); TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage = Nothing()); diff --git a/yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.cpp b/yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.cpp new file mode 100644 index 0000000000..7f3e3c5135 --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.cpp @@ -0,0 +1 @@ +#include "table_data_service.h" diff --git a/yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h b/yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h new file mode 100644 index 0000000000..98a2ec9ddc --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h @@ -0,0 +1,20 @@ +#pragma once + +#include <library/cpp/threading/future/core/future.h> + +namespace NYql::NFmr { + +class ITableDataService: public TThrRefBase { +public: + using TPtr = TIntrusivePtr<ITableDataService>; + + virtual ~ITableDataService() = default; + + virtual NThreading::TFuture<void> Put(const TString& id, const TString& data) = 0; + + virtual NThreading::TFuture<TMaybe<TString>> Get(const TString& id) = 0; + + virtual NThreading::TFuture<void> Delete(const TString& id) = 0; +}; + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/table_data_service/interface/ya.make b/yt/yql/providers/yt/fmr/table_data_service/interface/ya.make new file mode 100644 index 0000000000..63abe6d78f --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/interface/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + table_data_service.cpp +) + +PEERDIR( + library/cpp/threading/future +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.cpp b/yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.cpp new file mode 100644 index 0000000000..445263ff5e --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.cpp @@ -0,0 +1,55 @@ +#include "table_data_service.h" + +namespace NYql::NFmr { + +namespace { + +class TLocalTableDataService: public ITableDataService { +public: + TLocalTableDataService(const TLocalTableDataServiceSettings& settings): NumParts_(settings.NumParts) { + Data_.resize(NumParts_); + } + + NThreading::TFuture<void> Put(const TString& key, const TString& value) { + auto& map = Data_[std::hash<TString>()(key) % NumParts_]; + auto it = map.find(key); + if (it != map.end()) { + return NThreading::MakeFuture(); + } + map.insert({key, value}); + return NThreading::MakeFuture(); + } + + NThreading::TFuture<TMaybe<TString>> Get(const TString& key) { + TMaybe<TString> value = Nothing(); + auto& map = Data_[std::hash<TString>()(key) % NumParts_]; + auto it = map.find(key); + if (it != map.end()) { + value = it->second; + } + return NThreading::MakeFuture(value); + } + + NThreading::TFuture<void> Delete(const TString& key) { + auto& map = Data_[std::hash<TString>()(key) % NumParts_]; + auto it = map.find(key); + if (it == map.end()) { + return NThreading::MakeFuture(); + } + map.erase(key); + return NThreading::MakeFuture(); + } + +private: + std::vector<std::unordered_map<TString, TString>> Data_; + const ui32 NumParts_; +}; + +} // namespace + +ITableDataService::TPtr MakeLocalTableDataService(const TLocalTableDataServiceSettings& settings) { + return MakeIntrusive<TLocalTableDataService>(settings); +} + + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h b/yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h new file mode 100644 index 0000000000..eb4d0cb2f6 --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h @@ -0,0 +1,12 @@ +#include <library/cpp/threading/future/future.h> +#include <yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h> + +namespace NYql::NFmr { + +struct TLocalTableDataServiceSettings { + ui32 NumParts; +}; + +ITableDataService::TPtr MakeLocalTableDataService(const TLocalTableDataServiceSettings& settings); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/table_data_service/local/ut/test_table_service.cpp b/yt/yql/providers/yt/fmr/table_data_service/local/ut/test_table_service.cpp new file mode 100644 index 0000000000..a48df5f3df --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/local/ut/test_table_service.cpp @@ -0,0 +1,38 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h> + +using namespace NYql::NFmr; + +Y_UNIT_TEST_SUITE(TLocalTableServiceTest) +{ + Y_UNIT_TEST(GetNonexistentKey) { + ITableDataService::TPtr tableDataService = MakeLocalTableDataService({3}); + auto getFuture = tableDataService->Get("key"); + UNIT_ASSERT_VALUES_EQUAL(getFuture.GetValueSync(), Nothing()); + } + Y_UNIT_TEST(GetExistingKey) { + ITableDataService::TPtr tableDataService = MakeLocalTableDataService({3}); + auto putFuture = tableDataService->Put("key", "1"); + putFuture.GetValueSync(); + auto getFuture = tableDataService->Get("key"); + UNIT_ASSERT_VALUES_EQUAL(getFuture.GetValueSync(), "1"); + } + Y_UNIT_TEST(DeleteNonexistentKey) { + ITableDataService::TPtr tableDataService = MakeLocalTableDataService({3}); + auto putFuture = tableDataService->Put("key", "1"); + putFuture.GetValueSync(); + auto deleteFuture = tableDataService->Delete("other_key"); + deleteFuture.GetValueSync(); + auto getFuture = tableDataService->Get("key"); + UNIT_ASSERT_VALUES_EQUAL(getFuture.GetValueSync(), "1"); + } + Y_UNIT_TEST(DeleteExistingKey) { + ITableDataService::TPtr tableDataService = MakeLocalTableDataService({3}); + auto putFuture = tableDataService->Put("key", "1"); + putFuture.GetValueSync(); + auto deleteFuture = tableDataService->Delete("key"); + deleteFuture.GetValueSync(); + auto getFuture = tableDataService->Get("key"); + UNIT_ASSERT_VALUES_EQUAL(getFuture.GetValueSync(), Nothing()); + } +} diff --git a/yt/yql/providers/yt/fmr/table_data_service/local/ut/ya.make b/yt/yql/providers/yt/fmr/table_data_service/local/ut/ya.make new file mode 100644 index 0000000000..9b57c059db --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/local/ut/ya.make @@ -0,0 +1,14 @@ +UNITTEST() + +SRCS( + test_table_service.cpp +) + +PEERDIR( + yt/yql/providers/yt/fmr/table_data_service/local +) + + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/table_data_service/local/ya.make b/yt/yql/providers/yt/fmr/table_data_service/local/ya.make new file mode 100644 index 0000000000..78290fcd2e --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/local/ya.make @@ -0,0 +1,18 @@ +LIBRARY() + +SRCS( + table_data_service.cpp +) + +PEERDIR( + library/cpp/threading/future + yt/yql/providers/yt/fmr/table_data_service/interface +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp index 4996c932c4..e16e20c911 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp +++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp @@ -11,12 +11,17 @@ namespace NYql::NFmr { TDownloadTaskParams downloadTaskParams{ - .Input = TYtTableRef{"Path","Cluster","TransactionId"}, + .Input = TYtTableRef{"Path","Cluster"}, .Output = TFmrTableRef{"TableId"} }; TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Download, TTaskParams taskParams = downloadTaskParams) { - return TStartOperationRequest{.TaskType = taskType, .TaskParams = taskParams, .IdempotencyKey = "IdempotencyKey"}; + return TStartOperationRequest{ + .TaskType = taskType, + .TaskParams = taskParams, + .IdempotencyKey = "IdempotencyKey", + .ClusterConnection = TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"} + }; } Y_UNIT_TEST_SUITE(FmrWorkerTests) { diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/ya.make b/yt/yql/providers/yt/fmr/yt_service/impl/ya.make new file mode 100644 index 0000000000..2da71e6b7d --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_service/impl/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + yql_yt_yt_service_impl.cpp +) + +PEERDIR( + library/cpp/yson + library/cpp/yson/node + yt/cpp/mapreduce/client + yt/yql/providers/yt/fmr/yt_service/interface +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp new file mode 100644 index 0000000000..1da2ff6bd3 --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp @@ -0,0 +1,186 @@ +#include <library/cpp/yson/node/node_io.h> +#include <library/cpp/yson/node/node_visitor.h> +#include <library/cpp/yson/parser.h> +#include <library/cpp/yson/writer.h> +#include <yt/cpp/mapreduce/interface/client.h> + +#include "yql_yt_yt_service_impl.h" + +namespace NYql::NFmr { + +namespace { + +class TYtFmrRowConsumer: public NYson::TYsonConsumerBase { +public: + TYtFmrRowConsumer(NYT::TTableWriterPtr<NYT::TNode> writer): Writer_(writer) + { + CurNode_ = NYT::TNode(); + Builder_ = MakeHolder<NYT::TNodeBuilder>(&CurNode_); + } + + void OnStringScalar(TStringBuf value) override { + Builder_->OnStringScalar(value); + } + + void OnInt64Scalar(i64 value) override { + Builder_->OnInt64Scalar(value); + } + + void OnUint64Scalar(ui64 value) override { + Builder_->OnUint64Scalar(value); + } + + void OnDoubleScalar(double value) override { + Builder_->OnDoubleScalar(value); + } + + void OnBooleanScalar(bool value) override { + Builder_->OnBooleanScalar(value); + } + + void OnEntity() override { + Builder_->OnEntity(); + } + + void OnBeginList() override { + ++Level_; + Builder_->OnBeginList(); + } + + void OnBeginList(ui64 reserveSize) { + Builder_->OnBeginList(reserveSize); + } + + void OnListItem() override { + if (Level_ == 0) { + Flush(); + } else { + Builder_->OnListItem(); + } + } + + void OnEndList() override { + --Level_; + Builder_->OnEndList(); + } + + void OnBeginMap() override { + ++Level_; + Builder_->OnBeginMap(); + } + + void OnBeginMap(ui64 reserveSize) { + Builder_->OnBeginMap(reserveSize); + } + + void OnKeyedItem(TStringBuf key) override { + Builder_->OnKeyedItem(key); + } + + void OnEndMap() override { + --Level_; + Builder_->OnEndMap(); + } + + void OnBeginAttributes() override { + ++Level_; + Builder_->OnBeginAttributes(); + } + + void OnEndAttributes() override { + --Level_; + Builder_->OnEndAttributes(); + } + + void OnNode(NYT::TNode node) { + Builder_->OnNode(node); + } + + ~TYtFmrRowConsumer() { + Y_ABORT_UNLESS(!CurNode_.HasValue()); + } + + void Flush() { + if (CurNode_.HasValue()) { + Writer_->AddRow(CurNode_); + } + CurNode_ = NYT::TNode(); + Builder_ = MakeHolder<NYT::TNodeBuilder>(&CurNode_); + } +private: + NYT::TNode CurNode_; + NYT::TTableWriterPtr<NYT::TNode> Writer_; + THolder<NYT::TNodeBuilder> Builder_; + ui32 Level_ = 0; +}; + +class TFmrYtService: public NYql::NFmr::IYtService { +public: + + std::variant<THolder<TTempFileHandle>, TError> Download(const TYtTableRef& ytTable, const TClusterConnection& clusterConnection) override { + try { + if (!ClusterConnections_.contains(ytTable.Cluster)) { + ClusterConnections_[ytTable.Cluster] = clusterConnection; + } + auto client = CreateClient(ClusterConnections_[ytTable.Cluster]); + auto tmpFile = MakeHolder<TTempFileHandle>(); + auto transaction = client->AttachTransaction(GetGuid(clusterConnection.TransactionId)); + TFileOutput downloadStream(tmpFile->Name()); + + NYson::TYsonWriter writer {&downloadStream, NYT::NYson::EYsonFormat::Text, NYT::NYson::EYsonType::ListFragment}; + NYT::TNodeVisitor visitor{&writer}; + auto reader = transaction->CreateTableReader<NYT::TNode>("//" + ytTable.Path); + for (; reader->IsValid(); reader->Next()) { + auto& row = reader->GetRow(); + writer.OnListItem(); + visitor.Visit(row); + } + downloadStream.Flush(); + return tmpFile; + } catch (...) { + return TError{CurrentExceptionMessage()}; + } + } + + TMaybe<TError> Upload(const TYtTableRef& ytTable, IInputStream& tableContent, const TClusterConnection& clusterConnection) override { + try { + if (!ClusterConnections_.contains(ytTable.Cluster)) { + ClusterConnections_[ytTable.Cluster] = clusterConnection; + } + auto client = CreateClient(ClusterConnections_[ytTable.Cluster]); + auto transaction = client->AttachTransaction(GetGuid(clusterConnection.TransactionId)); + auto options = NYT::TCreateOptions().Recursive(true).IgnoreExisting(true); + transaction->Create("//" + ytTable.Path, NYT::NT_TABLE, options); + auto path = NYT::TRichYPath("//" + ytTable.Path).Append(true); + auto ytWriter = transaction->CreateTableWriter<NYT::TNode>(path); + + TYtFmrRowConsumer builder(ytWriter); + NYson::TYsonParser parser(&builder, &tableContent, NYT::NYson::EYsonType::ListFragment); + parser.Parse(); + builder.Flush(); + return Nothing(); + } catch (...) { + return TError{CurrentExceptionMessage()}; + } + } + +private: + std::unordered_map<TString, TClusterConnection> ClusterConnections_; + + NYT::IClientPtr CreateClient(const TClusterConnection& clusterConnection) { + NYT::TCreateClientOptions createOpts; + auto token = clusterConnection.Token; + if (token) { + createOpts.Token(*token); + } + return NYT::CreateClient(clusterConnection.YtServerName, createOpts); + } +}; + +} // namespace + +IYtService::TPtr MakeFmrYtSerivce() { + return MakeIntrusive<TFmrYtService>(); +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h new file mode 100644 index 0000000000..90d2a69430 --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h @@ -0,0 +1,10 @@ +#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h> + +namespace NYql::NFmr { + +struct TFmrYtSerivceSettings { +}; + +IYtService::TPtr MakeFmrYtSerivce(); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/ya.make b/yt/yql/providers/yt/fmr/yt_service/interface/ya.make new file mode 100644 index 0000000000..6ea8e94c4f --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_service/interface/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + yql_yt_yt_service.cpp +) + +PEERDIR( + yt/yql/providers/yt/fmr/request_options +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.cpp b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.cpp new file mode 100644 index 0000000000..fbcafbc3a2 --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.cpp @@ -0,0 +1 @@ +#include "yql_yt_yt_service.h" diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h new file mode 100644 index 0000000000..7d3e59b523 --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h @@ -0,0 +1,19 @@ +#pragma once + +#include <util/system/tempfile.h> +#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> + +namespace NYql::NFmr { + +class IYtService: public TThrRefBase { +public: + virtual ~IYtService() = default; + + using TPtr = TIntrusivePtr<IYtService>; + + virtual std::variant<THolder<TTempFileHandle>, TError> Download(const TYtTableRef& ytTable, const TClusterConnection& clusterConnection = TClusterConnection()) = 0; + + virtual TMaybe<TError> Upload(const TYtTableRef& ytTable, IInputStream& tableContent, const TClusterConnection& clusterConnection = TClusterConnection()) = 0; +}; + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_service/mock/ya.make b/yt/yql/providers/yt/fmr/yt_service/mock/ya.make new file mode 100644 index 0000000000..707812d60c --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_service/mock/ya.make @@ -0,0 +1,17 @@ +LIBRARY() + +SRCS( + yql_yt_yt_service_mock.h +) + +PEERDIR( + yt/yql/providers/yt/fmr/yt_service/interface +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h b/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h new file mode 100644 index 0000000000..7d5c4d1076 --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h @@ -0,0 +1,84 @@ +#pragma once + +#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h> + +namespace NYql::NFmr { + +namespace { + +class TYtUploadedTablesMock: public TThrRefBase { +public: + using TPtr = TIntrusivePtr<TYtUploadedTablesMock>; + + ~TYtUploadedTablesMock() = default; + + void AddTable(const TYtTableRef& ytTable, const TString& tableContent) { + TString key = GetTableKey(ytTable); + UploadedTables_[key] = tableContent; + } + bool Contains(const TYtTableRef& ytTable) const { + TString key = GetTableKey(ytTable); + return UploadedTables_.find(key) != UploadedTables_.end(); + } + const TString& GetTableContent(const TYtTableRef& ytTable) const { + TString key = GetTableKey(ytTable); + return UploadedTables_.at(key); + } + void Clear() { + UploadedTables_.clear(); + } + bool IsEmpty() const { + return UploadedTables_.empty(); + } + int UploadedTablesNum() const { + return UploadedTables_.size(); + } + +private: + TString GetTableKey(const TYtTableRef& ytTable) const { + return ytTable.Cluster + ":" + ytTable.Path; + } + std::unordered_map<TString, TString> UploadedTables_; +}; + + +class TYtServiceMock: public NYql::NFmr::IYtService { +public: + + TYtServiceMock(TYtUploadedTablesMock::TPtr ytUploadedTablesMock) + : YtUploadedTablesMock_(ytUploadedTablesMock) + { + } + + std::variant<THolder<TTempFileHandle>, TError> Download(const TYtTableRef& ytTable, const TClusterConnection& /*clusterConnection*/) override { + if (!YtUploadedTablesMock_->Contains(ytTable)) { + return TError("Table not found"); + } + auto tmpFile = MakeHolder<TTempFileHandle>(); + TString tableContent = YtUploadedTablesMock_->GetTableContent(ytTable); + tmpFile->Write(tableContent.data(), tableContent.size()); + return tmpFile; + } + + TMaybe<TError> Upload(const TYtTableRef& ytTable, IInputStream& tableContent, const TClusterConnection& /*clusterConnection*/) override { + YtUploadedTablesMock_->AddTable(ytTable, tableContent.ReadAll()); + return Nothing(); + } + +private: + TYtUploadedTablesMock::TPtr YtUploadedTablesMock_; +}; + +} // namespace + +IYtService::TPtr MakeYtServiceMock(TYtUploadedTablesMock::TPtr ytUploadedTablesMock) { + return MakeIntrusive<TYtServiceMock>(ytUploadedTablesMock); +} + +TYtUploadedTablesMock::TPtr MakeYtUploadedTablesMock() { + return MakeIntrusive<TYtUploadedTablesMock>(); +} + +} // namespace NYql::NFmr + +// TODO - move this to .cpp file diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp index b0e53f5cb7..3a1e127f3d 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp @@ -1583,6 +1583,10 @@ private: return res; } + TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& /*options*/) override { + ythrow yexception() << "GetClusterConnection should not be called for file gateway"; + } + private: TYtFileServices::TPtr Services_; diff --git a/yt/yql/providers/yt/gateway/fmr/ya.make b/yt/yql/providers/yt/gateway/fmr/ya.make index bfc91df2f5..c424b95d0b 100644 --- a/yt/yql/providers/yt/gateway/fmr/ya.make +++ b/yt/yql/providers/yt/gateway/fmr/ya.make @@ -6,8 +6,12 @@ SRCS( PEERDIR( yql/essentials/utils/log + yt/cpp/mapreduce/client + yt/yql/providers/yt/gateway/lib + yt/yql/providers/yt/gateway/native yt/yql/providers/yt/expr_nodes yt/yql/providers/yt/fmr/coordinator/interface + yt/yql/providers/yt/lib/config_clusters yt/yql/providers/yt/provider ) diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp index db00f2ff59..76a907a5c6 100644 --- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp @@ -2,7 +2,10 @@ #include <thread> +#include <yt/cpp/mapreduce/interface/client.h> #include <yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h> +#include <yt/yql/providers/yt/gateway/lib/yt_helpers.h> +#include <yt/yql/providers/yt/gateway/native/yql_yt_native.h> #include <yt/yql/providers/yt/provider/yql_yt_helpers.h> #include <yql/essentials/utils/log/log.h> @@ -40,7 +43,7 @@ public: with_lock(SessionStates_->Mutex) { auto checkOperationStatuses = [&] <typename T> (std::unordered_map<TString, TPromise<T>>& operationStatuses, const TString& sessionId) { for (auto& [operationId, promise]: operationStatuses) { - YQL_CLOG(DEBUG, FastMapReduce) << "Sending get operation request to coordinator with operationId: " << operationId; + YQL_CLOG(TRACE, FastMapReduce) << "Sending get operation request to coordinator with operationId: " << operationId; auto getOperationFuture = Coordinator_->GetOperation({operationId}); getOperationFuture.Subscribe([&, operationId, sessionId] (const auto& getFuture) { @@ -96,10 +99,22 @@ public: return Slave_->Publish(node, ctx, std::move(options)); } auto publish = TYtPublish(node); + TString sessionId = options.SessionId(); auto cluster = publish.DataSink().Cluster().StringValue(); + auto token = options.Config()->Auth.Get(); + TString transformedInputPath; + TString userName = GetUsername(sessionId); + for (auto out: publish.Input()) { + auto outTable = GetOutTable(out).Cast<TYtOutTable>(); + TStringBuf inputPath = outTable.Name().Value(); + transformedInputPath = NYql::TransformPath(GetTablesTmpFolder(*options.Config()), inputPath, true, userName); + break; + } + + // TODO - handle several inputs in Publish, use ColumnGroups, Run Merge + auto outputPath = publish.Publish().Name().StringValue(); - auto transactionId = GenerateId(); auto idempotencyKey = GenerateId(); auto fmrTableId = cluster + "." + outputPath; @@ -107,15 +122,14 @@ public: TFuture<TDownloadTableToFmrResult> downloadToFmrFuture; TFuture<void> downloadedSuccessfully; - TString sessionId = options.SessionId(); - with_lock(SessionStates_->Mutex) { auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses; if (!tablePresenceStatuses.contains(fmrTableId)) { - TYtTableRef ytTable{.Path = outputPath, .Cluster = cluster, .TransactionId = transactionId}; + TYtTableRef ytTable{.Path = transformedInputPath, .Cluster = cluster}; + TFmrTableRef fmrTable{.TableId = fmrTableId}; tablePresenceStatuses[fmrTableId] = ETablePresenceStatus::Both; - downloadToFmrFuture = DownloadToFmrTableDataSerivce(ytTable, sessionId); + downloadToFmrFuture = DownloadToFmrTableDataSerivce(ytTable, fmrTable, sessionId, options.Config()); downloadedSuccessfully = downloadToFmrFuture.Apply([downloadedSuccessfully] (auto& downloadFuture) { auto downloadResult = downloadFuture.GetValueSync(); }); @@ -125,15 +139,27 @@ public: } downloadedSuccessfully.Wait(); // blocking until download to fmr finishes - YQL_CLOG(INFO, FastMapReduce) << "Uploading table with cluster " << cluster << " and path " << outputPath << " from fmr to yt"; - TUploadTaskParams uploadTaskParams{ .Input = TFmrTableRef{fmrTableId}, - .Output = TYtTableRef{outputPath, cluster, transactionId} + .Output = TYtTableRef{outputPath, cluster} }; + auto clusterConnectionOptions = TClusterConnectionOptions(options.SessionId()) + .Cluster(cluster).Config(options.Config()); + auto clusterConnection = GetClusterConnection(std::move(clusterConnectionOptions)); + YQL_ENSURE(clusterConnection.Success()); + TStartOperationRequest uploadRequest{ - .TaskType = ETaskType::Upload, .TaskParams = uploadTaskParams, .SessionId = sessionId, .IdempotencyKey=idempotencyKey, .NumRetries=1 + .TaskType = ETaskType::Upload, + .TaskParams = uploadTaskParams, + .SessionId = sessionId, + .IdempotencyKey=idempotencyKey, + .NumRetries=1, + .ClusterConnection = TClusterConnection{ + .TransactionId = clusterConnection.TransactionId, + .YtServerName = clusterConnection.YtServerName, + .Token = clusterConnection.Token + } }; auto promise = NewPromise<TPublishResult>(); @@ -155,19 +181,34 @@ public: return future; } - TFuture<TDownloadTableToFmrResult> DownloadToFmrTableDataSerivce(const TYtTableRef& ytTableRef, const TString& sessionId) { + TFuture<TDownloadTableToFmrResult> DownloadToFmrTableDataSerivce( + const TYtTableRef& ytTableRef, const TFmrTableRef& fmrTableRef, const TString& sessionId, TYtSettings::TConstPtr& config) + { YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); - TString fmrTableId = ytTableRef.Cluster + "." + ytTableRef.Path; + TString fmrTableId = fmrTableRef.TableId; TDownloadTaskParams downloadTaskParams{ .Input = ytTableRef, .Output = {fmrTableId} }; auto idempotencyKey = GenerateId(); + auto clusterConnectionOptions = TClusterConnectionOptions(sessionId) + .Cluster(ytTableRef.Cluster).Config(config); + auto clusterConnection = GetClusterConnection(std::move(clusterConnectionOptions)); + YQL_ENSURE(clusterConnection.Success()); TStartOperationRequest downloadRequest{ - .TaskType = ETaskType::Download, .TaskParams = downloadTaskParams, .SessionId = sessionId, .IdempotencyKey=idempotencyKey, .NumRetries=1 + .TaskType = ETaskType::Download, + .TaskParams = downloadTaskParams, + .SessionId = sessionId, + .IdempotencyKey = idempotencyKey, + .NumRetries=1, + .ClusterConnection = TClusterConnection{ + .TransactionId = clusterConnection.TransactionId, + .YtServerName = clusterConnection.YtServerName, + .Token = clusterConnection.Token + } }; - YQL_CLOG(DEBUG, FastMapReduce) << "Starting download to from yt table: " << fmrTableId; + YQL_CLOG(DEBUG, FastMapReduce) << "Starting download from yt table: " << fmrTableId; auto promise = NewPromise<TDownloadTableToFmrResult>(); auto future = promise.GetFuture(); @@ -186,22 +227,24 @@ public: return future; } - void OpenSession(TOpenSessionOptions&& options) final { - Slave_->OpenSession(std::move(options)); + TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) override { + return Slave_->GetClusterConnection(std::move(options)); + } + void OpenSession(TOpenSessionOptions&& options) final { TString sessionId = options.SessionId(); YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); with_lock(SessionStates_->Mutex) { - auto sessions = SessionStates_->Sessions; + auto& sessions = SessionStates_->Sessions; if (sessions.contains(sessionId)) { YQL_LOG_CTX_THROW yexception() << "Session already exists: " << sessionId; } - sessions[sessionId] = TSessionInfo(); + sessions[sessionId] = TSessionInfo{.UserName = options.UserName()}; } + Slave_->OpenSession(std::move(options)); } TFuture<void> CloseSession(TCloseSessionOptions&& options) final { - Slave_->CloseSession(std::move(options)).Wait(); YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); with_lock(SessionStates_->Mutex) { @@ -211,11 +254,11 @@ public: sessions.erase(it); } } + Slave_->CloseSession(std::move(options)).Wait(); return MakeFuture(); } TFuture<void> CleanupSession(TCleanupSessionOptions&& options) final { - Slave_->CleanupSession(std::move(options)).Wait(); YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); TString sessionId = options.SessionId(); @@ -239,7 +282,7 @@ public: cancelOperationsFunc(operationStates.DownloadOperationStatuses); cancelOperationsFunc(operationStates.UploadOperationStatuses); } - + Slave_->CleanupSession(std::move(options)).Wait(); return MakeFuture(); } @@ -252,6 +295,7 @@ private: struct TSessionInfo { TFmrGatewayOperationsState OperationStates; std::unordered_map<TString, ETablePresenceStatus> TablePresenceStatuses; // yt cluster and path -> is it In Yt, Fmr TableDataService + TString UserName; }; struct TSession { @@ -282,6 +326,14 @@ private: } promise.SetValue(commonOperationResult); } + + TString GetUsername(const TString& sessionId) { + with_lock(SessionStates_->Mutex) { + YQL_ENSURE(SessionStates_->Sessions.contains(sessionId)); + auto& session = SessionStates_->Sessions[sessionId]; + return session.UserName; + } + } }; } // namespace diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h index 769bb24fe5..6d102e57ca 100644 --- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h @@ -6,17 +6,14 @@ namespace NYql::NFmr { struct TFmrYtGatewaySettings { - TIntrusivePtr<IRandomProvider> RandomProvider; - TDuration TimeToSleepBetweenGetOperationRequests; + TIntrusivePtr<IRandomProvider> RandomProvider = CreateDefaultRandomProvider(); + TDuration TimeToSleepBetweenGetOperationRequests = TDuration::Seconds(1); }; IYtGateway::TPtr CreateYtFmrGateway( IYtGateway::TPtr slave, IFmrCoordinator::TPtr coordinator = nullptr, - const TFmrYtGatewaySettings& settings = TFmrYtGatewaySettings{ - .RandomProvider = CreateDefaultRandomProvider(), - .TimeToSleepBetweenGetOperationRequests = TDuration::Seconds(1) - } + const TFmrYtGatewaySettings& settings = TFmrYtGatewaySettings{} ); } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp index 40df6077ef..196ff9a1c3 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -5698,6 +5698,22 @@ private: return ctx; } + TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) override { + try { + auto session = GetSession(options.SessionId(), true); + auto ytServer = Clusters_->GetServer(options.Cluster()); + auto entry = session->TxCache_.GetEntry(ytServer); + TClusterConnectionResult clusterConnectionResult{}; + clusterConnectionResult.TransactionId = GetGuidAsString(entry->Tx->GetId()); + clusterConnectionResult.YtServerName = ytServer; + clusterConnectionResult.Token = options.Config()->Auth.Get(); + clusterConnectionResult.SetSuccess(); + return clusterConnectionResult; + } catch (...) { + return ResultFromCurrentException<TClusterConnectionResult>({}, true); + } + } + private: const TYtNativeServices Services_; const TConfigClusters::TPtr Clusters_; diff --git a/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp b/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp index 5ad80977e9..3a9bd3272b 100644 --- a/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp +++ b/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp @@ -961,6 +961,10 @@ public: return Inner_->AddCluster(cluster); } + TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) override { + return Inner_->GetClusterConnection(std::move(options)); + } + private: const IYtGateway::TPtr Inner_; const TQContext QContext_; diff --git a/yt/yql/providers/yt/provider/yql_yt_gateway.h b/yt/yql/providers/yt/provider/yql_yt_gateway.h index b331f2142c..a98cd1d877 100644 --- a/yt/yql/providers/yt/provider/yql_yt_gateway.h +++ b/yt/yql/providers/yt/provider/yql_yt_gateway.h @@ -613,6 +613,24 @@ public: struct TUploadTableResult: public NCommon::TOperationResult { }; + struct TClusterConnectionOptions: public TCommonOptions { + using TSelf = TClusterConnectionOptions; + + TClusterConnectionOptions(const TString& sessionId) + : TCommonOptions(sessionId) + { + } + + OPTION_FIELD(TString, Cluster) + OPTION_FIELD(TYtSettings::TConstPtr, Config) + }; + + struct TClusterConnectionResult: public NCommon::TOperationResult { + TString TransactionId; + TString YtServerName; + TMaybe<TString> Token; + }; + public: virtual ~IYtGateway() = default; @@ -673,6 +691,8 @@ public: virtual TGetTablePartitionsResult GetTablePartitions(TGetTablePartitionsOptions&& options) = 0; virtual void AddCluster(const TYtClusterConfig& cluster) = 0; + + virtual TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) = 0; }; } diff --git a/yt/yql/tools/ytrun/lib/ya.make b/yt/yql/tools/ytrun/lib/ya.make index 007272ae8e..8f04604931 100644 --- a/yt/yql/tools/ytrun/lib/ya.make +++ b/yt/yql/tools/ytrun/lib/ya.make @@ -8,8 +8,11 @@ PEERDIR( yt/yql/providers/yt/provider yt/yql/providers/yt/fmr/coordinator/client yt/yql/providers/yt/fmr/coordinator/impl + yt/yql/providers/yt/fmr/job/impl yt/yql/providers/yt/fmr/job_factory/impl + yt/yql/providers/yt/fmr/table_data_service/local yt/yql/providers/yt/fmr/worker/impl + yt/yql/providers/yt/fmr/yt_service/impl yt/yql/providers/yt/gateway/native yt/yql/providers/yt/gateway/fmr yt/yql/providers/yt/lib/config_clusters diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp index dcd374a2c3..724a8d72e1 100644 --- a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp +++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp @@ -10,7 +10,10 @@ #include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h> #include <yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h> #include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h> +#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h> #include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> +#include <yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h> +#include <yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h> #include <yql/essentials/providers/common/provider/yql_provider_names.h> #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h> #include <yql/essentials/core/services/yql_transform_pipeline.h> @@ -197,13 +200,12 @@ IYtGateway::TPtr TYtRunTool::CreateYtGateway() { } if (!DisableLocalFmrWorker_) { - auto func = [&] (NFmr::TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { - while (!cancelFlag->load()) { - Sleep(TDuration::Seconds(3)); - return NFmr::ETaskStatus::Completed; - } - return NFmr::ETaskStatus::Failed; - }; // TODO - use function which actually calls Downloader/Uploader based on task params + auto tableDataService = MakeLocalTableDataService(NFmr::TLocalTableDataServiceSettings(3)); + auto fmrYtSerivce = NFmr::MakeFmrYtSerivce(); + + auto func = [tableDataService, fmrYtSerivce] (NFmr::TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) mutable { + return NFmr::RunJob(task, tableDataService, fmrYtSerivce, cancelFlag); + }; NFmr::TFmrJobFactorySettings settings{.Function=func}; auto jobFactory = MakeFmrJobFactory(settings); |