diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-05-29 13:34:22 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-05-29 13:53:52 +0300 |
commit | fdbc38349df2ee0ddc678fa2bffe84786f9639a3 (patch) | |
tree | bf49a9cbcd326a82b5380c4042b8277863db0ea5 | |
parent | 25151c4698d52226d8b4882d2ef7fb2f04587b7c (diff) | |
download | ydb-fdbc38349df2ee0ddc678fa2bffe84786f9639a3.tar.gz |
Intermediate changes
commit_hash:6dbef13d0dcaf09696934ec231fa4610d7edfec1
81 files changed, 1901 insertions, 570 deletions
@@ -18,6 +18,7 @@ tools_cache_master = true use_atd_revisions_info = true use_jstyle_server = true use_command_file_in_testtool = true +use_universal_fetcher_everywhere = true # ===== opensource only table params ===== diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson b/yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson index 203b3d1ce66..ef865b7f5f7 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson +++ b/yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson @@ -23,4 +23,14 @@ "max_row_weight" = 16777216; }; }; -}
\ No newline at end of file + "partition" = { + "yt_table" = { + "max_data_weight_per_part" = 104857600; + "max_parts" = 100; + }; + "fmr_table" = { + "max_data_weight_per_part" = 104857600; + "max_parts" = 100; + }; + } +} diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make index 2f67f6d05ef..179b3e230f0 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make @@ -2,10 +2,12 @@ UNITTEST() SRCS( yql_yt_coordinator_ut.cpp + yql_yt_partitioner_ut.cpp ) PEERDIR( yt/yql/providers/yt/fmr/coordinator/impl + yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file yt/yql/providers/yt/fmr/job_factory/impl yt/yql/providers/yt/fmr/worker/impl ) diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp index 0de9cb8fda0..ae3505fdd7b 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp @@ -7,6 +7,7 @@ #include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h> #include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> #include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h> +#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h> namespace NYql::NFmr { @@ -45,10 +46,12 @@ private: TDownloadOperationParams downloadOperationParams{ - .Input = TYtTableRef{"Path","Cluster"}, + .Input = TYtTableRef{.Path = "Path", .Cluster = "Cluster", .FilePath = "File_path"}, .Output = TFmrTableRef{{"TestCluster", "TestPath"}} }; +// TODO - создать общий файл на все тесты, наполнить его чем-то + TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Download, TOperationParams operationParams = downloadOperationParams) { return TStartOperationRequest{ .TaskType = taskType, @@ -83,13 +86,13 @@ auto defaultTaskFunction = [] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { Y_UNIT_TEST(StartOperation) { - auto coordinator = MakeFmrCoordinator(); + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync(); auto status = startOperationResponse.Status; UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Accepted); } Y_UNIT_TEST(RetryAcceptedOperation) { - auto coordinator = MakeFmrCoordinator(); + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); auto downloadRequest = CreateOperationRequest(); auto firstResponse = coordinator->StartOperation(downloadRequest).GetValueSync(); auto firstOperationId = firstResponse.OperationId; @@ -102,13 +105,13 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { } Y_UNIT_TEST(DeleteNonexistentOperation) { - auto coordinator = MakeFmrCoordinator(); + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); auto deleteOperationResponse = coordinator->DeleteOperation({"delete_operation_id"}).GetValueSync(); EOperationStatus status = deleteOperationResponse.Status; UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::NotFound); } Y_UNIT_TEST(DeleteOperationBeforeSendToWorker) { - auto coordinator = MakeFmrCoordinator(); + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync(); TString operationId = startOperationResponse.OperationId; auto deleteOperationResponse = coordinator->DeleteOperation({operationId}).GetValueSync(); @@ -116,13 +119,13 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Aborted); } Y_UNIT_TEST(GetNonexistentOperation) { - auto coordinator = MakeFmrCoordinator(); + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); auto getOperationResponse = coordinator->GetOperation({"get_operation_id"}).GetValueSync(); EOperationStatus status = getOperationResponse.Status; UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::NotFound); } Y_UNIT_TEST(GetAcceptedOperationStatus) { - auto coordinator = MakeFmrCoordinator(); + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync(); TString operationId = startOperationResponse.OperationId; auto getOperationResponse = coordinator->GetOperation({operationId}).GetValueSync(); @@ -130,7 +133,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Accepted); } Y_UNIT_TEST(GetRunningOperationStatus) { - auto coordinator = MakeFmrCoordinator(); + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync(); TString operationId = startOperationResponse.OperationId; @@ -145,7 +148,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::InProgress); } Y_UNIT_TEST(GetCompletedOperationStatuses) { - auto coordinator = MakeFmrCoordinator(); + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); auto startOperationRequests = CreateSeveralOperationRequests(); std::vector<TString> operationIds; for (auto& request: startOperationRequests) { @@ -165,34 +168,27 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { } } Y_UNIT_TEST(GetCompletedAndFailedOperationStatuses) { - auto coordinator = MakeFmrCoordinator(); + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); auto downloadOperationRequests = CreateSeveralOperationRequests(); std::vector<TString> downloadOperationIds; for (auto& request: downloadOperationRequests) { auto startOperationResponse = coordinator->StartOperation(request).GetValueSync(); downloadOperationIds.emplace_back(startOperationResponse.OperationId); } - auto uploadOperationRequest = CreateOperationRequest(ETaskType::Upload, TUploadOperationParams{ - {{"Cluster", "Path"}}, - {} + auto badDownloadRequest = CreateOperationRequest(ETaskType::Download, TDownloadOperationParams{ + .Input = TYtTableRef{.Path = "bad_path", .Cluster = "bad_cluster", .FilePath = "bad_file_path"}, + .Output = TFmrTableRef{{"bad_cluster", "bad_path"}} }); - auto uploadOperationResponse = coordinator->StartOperation(uploadOperationRequest).GetValueSync(); - auto uploadOperationId = uploadOperationResponse.OperationId; + auto badDownloadOperationResponse = coordinator->StartOperation(badDownloadRequest).GetValueSync(); + auto badDownloadOperationId = badDownloadOperationResponse.OperationId; auto func = [&] (TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) { while (! cancelFlag->load()) { Sleep(TDuration::Seconds(1)); - ETaskStatus taskStatus = std::visit([] (auto&& taskParams) { - using T = std::decay_t<decltype(taskParams)>; - if constexpr (std::is_same_v<T, TUploadTaskParams>) { - return ETaskStatus::Failed; - } - return ETaskStatus::Completed; - }, task->TaskParams); - if (taskStatus == ETaskStatus::Failed) { + TDownloadTaskParams downloadTaskParams = std::get<TDownloadTaskParams>(task->TaskParams); + if (downloadTaskParams.Output.TableId.Contains("bad_path")) { return TJobResult{.TaskStatus = ETaskStatus::Failed, .Stats = TStatistics()}; } - Sleep(TDuration::Seconds(1)); return TJobResult{.TaskStatus = ETaskStatus::Completed, .Stats = TStatistics()}; } return TJobResult{.TaskStatus = ETaskStatus::Failed, .Stats = TStatistics()}; @@ -210,12 +206,12 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { EOperationStatus status = getDownloadOperationResponse.Status; UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Completed); } - auto getUploadOperationResponse = coordinator->GetOperation({uploadOperationId}).GetValueSync(); - EOperationStatus uploadStatus = getUploadOperationResponse.Status; - UNIT_ASSERT_VALUES_EQUAL(uploadStatus, EOperationStatus::Failed); + auto getBadDownloadOperationResponse = coordinator->GetOperation({badDownloadOperationId}).GetValueSync(); + EOperationStatus badDownloadStatus = getBadDownloadOperationResponse.Status; + UNIT_ASSERT_VALUES_EQUAL(badDownloadStatus, EOperationStatus::Failed); } Y_UNIT_TEST(RetryRunningOperation) { - auto coordinator = MakeFmrCoordinator(); + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); auto downloadRequest = CreateOperationRequest(); auto startOperationResponse = coordinator->StartOperation(downloadRequest).GetValueSync(); TString firstOperationId = startOperationResponse.OperationId; @@ -236,7 +232,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { Y_UNIT_TEST(RetryRunningOperationAfterIdempotencyKeyClear) { auto coordinatorSettings = TFmrCoordinatorSettings(); coordinatorSettings.IdempotencyKeyStoreTime = TDuration::Seconds(1); - auto coordinator = MakeFmrCoordinator(coordinatorSettings); + auto coordinator = MakeFmrCoordinator(coordinatorSettings, MakeFileYtCoordinatorService()); TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction}; auto factory = MakeFmrJobFactory(settings); @@ -260,7 +256,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { UNIT_ASSERT_VALUES_EQUAL(secondOperationStatus, EOperationStatus::Accepted); } Y_UNIT_TEST(CancelTasksAfterVolatileIdReload) { - auto coordinator = MakeFmrCoordinator(); + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { int numIterations = 0; while (!cancelFlag->load()) { @@ -292,7 +288,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { UNIT_ASSERT_NO_DIFF(*error.OperationId, operationId); } Y_UNIT_TEST(HandleJobErrors) { - auto coordinator = MakeFmrCoordinator(); + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync(); TString operationId = startOperationResponse.OperationId; @@ -323,13 +319,21 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { } Y_UNIT_TEST(GetFmrTableInfo) { - auto coordinator = MakeFmrCoordinator(); - TTableStats tableStats = {.Chunks = 1, .Rows = 2, .DataWeight = 3}; - TString tableId = "test_table"; - TFmrTableOutputRef fmrTableOutputRef{.TableId = tableId, .PartId = "test_part_id"}; - std::unordered_map<TFmrTableOutputRef, TTableStats> outputTables{{fmrTableOutputRef, tableStats}}; - auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); + ui64 totalChunkCount = 10, chunkRowCount = 1, chunkDataWeight = 2; + TString tableId = "TestCluster.TestPath"; // corresponds to CreateOperationRequest() + auto func = [&] (TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) { while (!cancelFlag->load()) { + Sleep(TDuration::Seconds(1)); + TDownloadTaskParams downloadTaskParams = std::get<TDownloadTaskParams>(task->TaskParams); + TString partId = downloadTaskParams.Output.PartId; + TFmrTableOutputRef fmrTableOutputRef{.TableId = tableId, .PartId = partId}; + TTableChunkStats tableChunkStats{ + .PartId = partId, + .PartIdChunkStats = std::vector<TChunkStats>(totalChunkCount, TChunkStats{.Rows = chunkRowCount, .DataWeight = chunkDataWeight}) + }; + std::unordered_map<TFmrTableOutputRef, TTableChunkStats> outputTables{{fmrTableOutputRef, tableChunkStats}}; + return TJobResult{.TaskStatus = ETaskStatus::Completed, .Stats = TStatistics{ .OutputTables = outputTables }}; @@ -346,9 +350,9 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { Sleep(TDuration::Seconds(3)); auto response = coordinator->GetFmrTableInfo({tableId}).GetValueSync(); worker->Stop(); - UNIT_ASSERT_VALUES_EQUAL(response.TableStats.Chunks, tableStats.Chunks); - UNIT_ASSERT_VALUES_EQUAL(response.TableStats.Rows, tableStats.Rows); - UNIT_ASSERT_VALUES_EQUAL(response.TableStats.DataWeight, tableStats.DataWeight); + UNIT_ASSERT_VALUES_EQUAL(response.TableStats.Chunks, totalChunkCount); + UNIT_ASSERT_VALUES_EQUAL(response.TableStats.Rows, totalChunkCount * chunkRowCount); + UNIT_ASSERT_VALUES_EQUAL(response.TableStats.DataWeight, totalChunkCount * chunkDataWeight); } } diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_partitioner_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_partitioner_ut.cpp new file mode 100644 index 00000000000..87d491afd60 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_partitioner_ut.cpp @@ -0,0 +1,220 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.h> + +namespace NYql::NFmr { + +const TString FirstPartId = "test_part_id_0", SecondPartId = "test_part_id_1"; + +std::unordered_map<TFmrTableId, std::vector<TString>> GetTestPartIdsForTable(const TFmrTableId& fmrId) { + return std::unordered_map<TFmrTableId, std::vector<TString>>{{fmrId, std::vector<TString>{FirstPartId, SecondPartId}}}; +} + +std::unordered_map<TString, std::vector<TChunkStats>> GetTestPartIdStats() { + const std::vector<TChunkStats> firstPartitionChunkStats{ + TChunkStats{.DataWeight = 30}, + TChunkStats{.DataWeight = 30}, + TChunkStats{.DataWeight = 10}, + TChunkStats{.DataWeight = 20}, + }; + const std::vector<TChunkStats> secondPartitionChunkStats{ + TChunkStats{.DataWeight = 40}, + TChunkStats{.DataWeight = 15}, + }; + + return std::unordered_map<TString, std::vector<TChunkStats>>{ + {FirstPartId, firstPartitionChunkStats}, + {SecondPartId, secondPartitionChunkStats} + }; +} + +const TString FirstTablePartId = "first_table_part_id", SecondTablePartId = "sec_table_part_id", ThirdTablePartId = "third_table_part_id"; + +std::unordered_map<TFmrTableId, std::vector<TString>> GetTestPartIdsForMultipleTables(std::vector<TFmrTableId>& fmrTableIds) { + UNIT_ASSERT_VALUES_EQUAL(fmrTableIds.size(), 3); + return std::unordered_map<TFmrTableId, std::vector<TString>>{ + {fmrTableIds[0], std::vector<TString>{FirstTablePartId}}, + {fmrTableIds[1], std::vector<TString>{SecondTablePartId}}, + {fmrTableIds[2], std::vector<TString>{ThirdTablePartId}} + }; +} + +std::unordered_map<TString, std::vector<TChunkStats>> GetTestPartIdStatsForMultipleTables() { + const std::vector<TChunkStats> firstPartitionChunkStats{ + TChunkStats{.DataWeight = 40}, + TChunkStats{.DataWeight = 20}, + }; + const std::vector<TChunkStats> secondPartitionChunkStats{ + TChunkStats{.DataWeight = 40}, + TChunkStats{.DataWeight = 15}, + TChunkStats{.DataWeight = 5}, + }; + + const std::vector<TChunkStats> thirdPartitionChunkStats{ + TChunkStats{.DataWeight = 20}, + TChunkStats{.DataWeight = 30}, + TChunkStats{.DataWeight = 60}, + }; + + return std::unordered_map<TString, std::vector<TChunkStats>>{ + {FirstTablePartId, firstPartitionChunkStats}, + {SecondTablePartId, secondPartitionChunkStats}, + {ThirdTablePartId, thirdPartitionChunkStats} + }; +} + +std::vector<std::vector<TFmrTableInputRef>> ChangeGottenTasksFormat(const std::vector<TTaskTableInputRef>& inputTasks) { + // needed for testing so we can check resulting vectors for equality. + std::vector<std::vector<TFmrTableInputRef>> resultTasks; + for (auto& task: inputTasks) { + std::vector<TFmrTableInputRef> curTask; + std::transform(task.Inputs.begin(),task.Inputs.end(), std::back_inserter(curTask), [](const TTaskTableRef& tablePart){ + return std::get<TFmrTableInputRef>(tablePart); + }); + resultTasks.emplace_back(curTask); + } + return resultTasks; +} + +Y_UNIT_TEST_SUITE(PartitionerTests) { + Y_UNIT_TEST(PartitionFmrTable) { + auto fmrTableId = TFmrTableId("test_cluster", "test_path"); + TFmrTableRef fmrTable = TFmrTableRef{fmrTableId}; + + auto partIdsForTables = GetTestPartIdsForTable(fmrTableId); + auto partIdStats = GetTestPartIdStats(); + TFmrPartitionerSettings settings{.MaxDataWeightPerPart = 50, .MaxParts = 100}; + TFmrPartitioner partitioner(partIdsForTables, partIdStats, settings); + + auto [gottenTasks, status] = partitioner.PartitionFmrTablesIntoTasks({fmrTable}); + UNIT_ASSERT_VALUES_EQUAL(status, true); + + std::vector<std::vector<TFmrTableInputRef>> expectedTasks = { + {TFmrTableInputRef{ + .TableId = fmrTableId.Id, + .TableRanges = { + TTableRange{.PartId = FirstPartId, .MinChunk = 0, .MaxChunk = 1} + } + }}, + {TFmrTableInputRef{ + .TableId = fmrTableId.Id, + .TableRanges = { + TTableRange{.PartId = FirstPartId, .MinChunk = 1, .MaxChunk = 3} + } + }}, + {TFmrTableInputRef{ + .TableId = fmrTableId.Id, + .TableRanges = { + TTableRange{.PartId = SecondPartId, .MinChunk = 0, .MaxChunk = 1} + } + }}, + {TFmrTableInputRef{ + .TableId = fmrTableId.Id, + .TableRanges = { + TTableRange{.PartId = FirstPartId, .MinChunk = 3, .MaxChunk = 4}, + TTableRange{.PartId = SecondPartId, .MinChunk = 1, .MaxChunk = 2}, + } + }}, + }; + UNIT_ASSERT_VALUES_EQUAL(ChangeGottenTasksFormat(gottenTasks), expectedTasks); + } + Y_UNIT_TEST(MaxPartsNumExceeded) { + auto fmrTableId = TFmrTableId("test_cluster", "test_path"); + TFmrTableRef fmrTable = TFmrTableRef{fmrTableId}; + + auto partIdsForTables = GetTestPartIdsForTable(fmrTableId); + auto partIdStats = GetTestPartIdStats(); + TFmrPartitionerSettings settings{.MaxDataWeightPerPart = 50, .MaxParts = 2}; + TFmrPartitioner partitioner(partIdsForTables, partIdStats, settings); + + auto [gottenTasks, status] = partitioner.PartitionFmrTablesIntoTasks({fmrTable}); + UNIT_ASSERT_VALUES_EQUAL(status, false); + } + Y_UNIT_TEST(SeveralFullPartitionsInTask) { + auto fmrTableId = TFmrTableId("test_cluster", "test_path"); + TFmrTableRef fmrTable = TFmrTableRef{fmrTableId}; + + auto partIdsForTables = GetTestPartIdsForTable(fmrTableId); + auto partIdStats = GetTestPartIdStats(); + TFmrPartitionerSettings settings{.MaxDataWeightPerPart = 1000000, .MaxParts = 1}; + TFmrPartitioner partitioner(partIdsForTables, partIdStats, settings); + + auto [gottenTasks, status] = partitioner.PartitionFmrTablesIntoTasks({fmrTable}); + UNIT_ASSERT_VALUES_EQUAL(status, true); + + std::vector<std::vector<TFmrTableInputRef>> expectedTasks = { + { + TFmrTableInputRef{ + .TableId = fmrTableId.Id, + .TableRanges = { + TTableRange{.PartId = FirstPartId, .MinChunk = 0, .MaxChunk = 4}, + TTableRange{.PartId = SecondPartId, .MinChunk = 0, .MaxChunk = 2}, + } + } + } + }; + UNIT_ASSERT_VALUES_EQUAL(ChangeGottenTasksFormat(gottenTasks), expectedTasks); + } + Y_UNIT_TEST(SeveralInputTables) { + std::vector<TFmrTableId> inputFmrTableIds{ + TFmrTableId("test_cluster_1", "test_path_1"), + TFmrTableId("test_cluster_2", "test_path_2"), + TFmrTableId("test_cluster_3", "test_path_3"), + }; + std::vector<TFmrTableRef> inputTables; + for (auto& id: inputFmrTableIds) { + inputTables.emplace_back(TFmrTableRef{.FmrTableId = id}); + } + + auto partIdsForTables = GetTestPartIdsForMultipleTables(inputFmrTableIds); + auto partIdStats = GetTestPartIdStatsForMultipleTables(); + TFmrPartitionerSettings settings{.MaxDataWeightPerPart = 50, .MaxParts = 1000}; + TFmrPartitioner partitioner(partIdsForTables, partIdStats, settings); + auto [gottenTasks, status] = partitioner.PartitionFmrTablesIntoTasks(inputTables); + UNIT_ASSERT_VALUES_EQUAL(status, true); + + std::vector<std::vector<TFmrTableInputRef>> expectedTasks = { + {TFmrTableInputRef{ + .TableId = TFmrTableId("test_cluster_1", "test_path_1").Id, + .TableRanges = { + TTableRange{.PartId = FirstTablePartId, .MinChunk = 0, .MaxChunk = 1} + } + }}, + {TFmrTableInputRef{ + .TableId = TFmrTableId("test_cluster_2", "test_path_2").Id, + .TableRanges = { + TTableRange{.PartId = SecondTablePartId, .MinChunk = 0, .MaxChunk = 1} + } + }}, + + {TFmrTableInputRef{ + .TableId = TFmrTableId("test_cluster_3", "test_path_3").Id, + .TableRanges = { + TTableRange{.PartId = ThirdTablePartId, .MinChunk = 0, .MaxChunk = 2} + } + }}, + {TFmrTableInputRef{ + .TableId = TFmrTableId("test_cluster_3", "test_path_3").Id, + .TableRanges = { + TTableRange{.PartId = ThirdTablePartId, .MinChunk = 2, .MaxChunk = 3} + } + }}, + { + TFmrTableInputRef{ + .TableId = TFmrTableId("test_cluster_1", "test_path_1").Id, + .TableRanges = { + TTableRange{.PartId = FirstTablePartId, .MinChunk = 1, .MaxChunk = 2} + } + }, + TFmrTableInputRef{ + .TableId = TFmrTableId("test_cluster_2", "test_path_2").Id, + .TableRanges = { + TTableRange{.PartId = SecondTablePartId, .MinChunk = 1, .MaxChunk = 3} + } + } + } + }; + UNIT_ASSERT_VALUES_EQUAL(ChangeGottenTasksFormat(gottenTasks), expectedTasks); + } +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make index 20abd2bbc84..5d0057b51f8 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( yql_yt_coordinator_impl.cpp + yql_yt_partitioner.cpp ) PEERDIR( @@ -11,8 +12,10 @@ PEERDIR( library/cpp/yson/node yt/cpp/mapreduce/common yt/yql/providers/yt/fmr/coordinator/interface - yql/essentials/utils/log + yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface + yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl yql/essentials/utils + yql/essentials/utils/log ) RESOURCE( diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp index 10ce8469243..22d9ebc6da2 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp @@ -1,6 +1,7 @@ #include <thread> #include <library/cpp/resource/resource.h> #include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.h> #include <yql/essentials/utils/log/log.h> #include <yql/essentials/utils/yql_panic.h> #include "yql_yt_coordinator_impl.h" @@ -17,39 +18,16 @@ TFmrCoordinatorSettings::TFmrCoordinatorSettings() { namespace { -struct TCoordinatorTaskInfo { - TTask::TPtr Task; - ETaskStatus TaskStatus; - TString OperationId; -}; - -struct TOperationInfo { - std::unordered_set<TString> TaskIds; // for now each operation consists only of one task, until paritioner is implemented - EOperationStatus OperationStatus; - std::vector<TFmrError> ErrorMessages; - TString SessionId; - std::vector<TString> OutputTableIds = {}; -}; - -struct TIdempotencyKeyInfo { - TString OperationId; - TInstant OperationCreationTime; -}; - -struct TCoordinatorFmrTableStats { - TTableStats Stats; - TString PartId; // only one PartId for now -}; - class TFmrCoordinator: public IFmrCoordinator { public: - TFmrCoordinator(const TFmrCoordinatorSettings& settings) + TFmrCoordinator(const TFmrCoordinatorSettings& settings, IYtCoordinatorService::TPtr ytCoordinatorService) : WorkersNum_(settings.WorkersNum), RandomProvider_(settings.RandomProvider), StopCoordinator_(false), TimeToSleepBetweenClearKeyRequests_(settings.TimeToSleepBetweenClearKeyRequests), IdempotencyKeyStoreTime_(settings.IdempotencyKeyStoreTime), - DefaultFmrOperationSpec_(settings.DefaultFmrOperationSpec) + DefaultFmrOperationSpec_(settings.DefaultFmrOperationSpec), + YtCoordinatorService_(ytCoordinatorService) { StartClearingIdempotencyKeys(); } @@ -73,13 +51,20 @@ public: IdempotencyKeys_[*IdempotencyKey] = TIdempotencyKeyInfo{.OperationId = operationId, .OperationCreationTime=TInstant::Now()}; } - TString taskId = GenerateId(); - auto taskParams = MakeDefaultTaskParamsFromOperation(request.OperationParams); + auto fmrOperationSpec = GetMergedFmrOperationSpec(request.FmrOperationSpec); + auto taskParams = PartitionOperationIntoSeveralTasks(request.OperationParams, fmrOperationSpec, request.ClusterConnections); - TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, taskParams, request.SessionId, request.ClusterConnections, GetJobSettings(request.FmrOperationSpec)); - Tasks_[taskId] = TCoordinatorTaskInfo{.Task = createdTask, .TaskStatus = ETaskStatus::Accepted, .OperationId = operationId}; + std::unordered_set<TString> taskIds; + + for (auto& currentTaskParams: taskParams) { + TString taskId = GenerateId(); + TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, currentTaskParams, request.SessionId, request.ClusterConnections, fmrOperationSpec); + Tasks_[taskId] = TCoordinatorTaskInfo{.Task = createdTask, .TaskStatus = ETaskStatus::Accepted, .OperationId = operationId}; + TasksToRun_.emplace(createdTask, taskId); + taskIds.emplace(taskId); + } - Operations_[operationId] = {.TaskIds = {taskId}, .OperationStatus = EOperationStatus::Accepted, .SessionId = request.SessionId}; + Operations_[operationId] = {.TaskIds = taskIds, .OperationStatus = EOperationStatus::Accepted, .SessionId = request.SessionId}; YQL_CLOG(DEBUG, FastMapReduce) << "Starting operation with id " << operationId; return NThreading::MakeFuture(TStartOperationResponse(EOperationStatus::Accepted, operationId)); } @@ -96,8 +81,11 @@ public: auto operationStatus = operationInfo.OperationStatus; auto errorMessages = operationInfo.ErrorMessages; std::vector<TTableStats> outputTablesStats; - for (auto& tableId : operationInfo.OutputTableIds) { - outputTablesStats.emplace_back(FmrTableStatistics_[tableId].Stats); + if (operationStatus == EOperationStatus::Completed) { + // Calculating output table stats only in case of successful completion of opereation + for (auto& tableId : operationInfo.OutputTableIds) { + outputTablesStats.emplace_back(CalculateTableStats(tableId)); + } } return NThreading::MakeFuture(TGetOperationResponse(operationStatus, errorMessages, outputTablesStats)); } @@ -111,15 +99,14 @@ public: YQL_LOG_CTX_ROOT_SESSION_SCOPE(Operations_[operationId].SessionId); YQL_CLOG(DEBUG, FastMapReduce) << "Deleting operation with id " << operationId; auto taskIds = Operations_[operationId].TaskIds; - YQL_ENSURE(taskIds.size() == 1); - auto taskId = *taskIds.begin(); - YQL_ENSURE(Tasks_.contains(taskId)); - - auto taskStatus = Tasks_[taskId].TaskStatus; - if (taskStatus == ETaskStatus::InProgress) { - TaskToDeleteIds_.insert(taskId); // Task is currently running, send signal to worker to cancel - } else { - ClearTask(taskId); // Task either hasn't begun running or finished, remove info + for (auto& taskId: taskIds){ + YQL_ENSURE(Tasks_.contains(taskId)); + auto taskStatus = Tasks_[taskId].TaskStatus; + if (taskStatus == ETaskStatus::InProgress) { + TaskToDeleteIds_.insert(taskId); // Task is currently running, send signal to worker to cancel + } else { + ClearTask(taskId); // Task either hasn't begun running or finished, remove info + } } return NThreading::MakeFuture(TDeleteOperationResponse(EOperationStatus::Aborted)); @@ -148,9 +135,9 @@ public: for (auto& requestTaskState: request.TaskStates) { auto taskId = requestTaskState->TaskId; + YQL_ENSURE(Tasks_.contains(taskId)); auto operationId = Tasks_[taskId].OperationId; YQL_LOG_CTX_ROOT_SESSION_SCOPE(Operations_[operationId].SessionId); - YQL_ENSURE(Tasks_.contains(taskId)); auto taskStatus = requestTaskState->TaskStatus; YQL_ENSURE(taskStatus != ETaskStatus::Accepted); SetUnfinishedTaskStatus(taskId, taskStatus, requestTaskState->TaskErrorMessage); @@ -159,48 +146,52 @@ public: } auto statistics = requestTaskState->Stats; + YQL_CLOG(TRACE, FastMapReduce) << " Task with id " << taskId << " has current status " << taskStatus << Endl; + bool isOperationCompleted = (GetOperationStatus(operationId) == EOperationStatus::Completed); for (auto& [fmrTableId, tableStats]: statistics.OutputTables) { - if (FmrTableStatistics_.contains(fmrTableId.TableId)) { - auto curTableStats = FmrTableStatistics_[fmrTableId.TableId]; - YQL_ENSURE( - tableStats.Chunks >= curTableStats.Stats.Chunks && - tableStats.DataWeight >= curTableStats.Stats.DataWeight && - tableStats.Rows >= curTableStats.Stats.Rows - ); - YQL_ENSURE(fmrTableId.PartId == curTableStats.PartId); - if (taskStatus == ETaskStatus::Completed) { - YQL_CLOG(DEBUG, FastMapReduce) << "Current statistic from table with id" << fmrTableId.TableId << "_" << fmrTableId.PartId << ": " << tableStats; - } + Operations_[operationId].OutputTableIds.emplace(fmrTableId.TableId); + PartIdStats_[fmrTableId.PartId] = tableStats.PartIdChunkStats; + if (isOperationCompleted) { + YQL_CLOG(INFO, FastMapReduce) << "Operation with id " << operationId << " has finished successfully"; + CalculateTableStats(fmrTableId.TableId, true); } - Operations_[operationId].OutputTableIds.emplace_back(fmrTableId.TableId); - FmrTableStatistics_[fmrTableId.TableId] = TCoordinatorFmrTableStats{ - .Stats = tableStats, - .PartId = fmrTableId.PartId - }; + // TODO - проверка на валидность возвращаемой воркером статистики? } } - std::vector<TTask::TPtr> tasksToRun; - for (auto& taskToRunInfo: Tasks_) { - if (taskToRunInfo.second.TaskStatus == ETaskStatus::Accepted) { - SetUnfinishedTaskStatus(taskToRunInfo.first, ETaskStatus::InProgress); - tasksToRun.emplace_back(taskToRunInfo.second.Task); + std::vector<TTask::TPtr> currentTasksToRun; + ui64 filledSlots = 0; + while (filledSlots < request.AvailableSlots) { + if (TasksToRun_.empty()) { + break; } + auto [task, taskId] = TasksToRun_.front(); + TasksToRun_.pop(); + if (!Tasks_.contains(taskId)) { + continue; + } + auto& taskInfo = Tasks_[taskId]; + YQL_ENSURE(taskInfo.TaskStatus == ETaskStatus::Accepted); + SetUnfinishedTaskStatus(taskId, ETaskStatus::InProgress); + SetPartIdsForTask(task); + currentTasksToRun.emplace_back(task); + ++filledSlots; } - return NThreading::MakeFuture(THeartbeatResponse{.TasksToRun = tasksToRun, .TaskToDeleteIds = TaskToDeleteIds_}); + + return NThreading::MakeFuture(THeartbeatResponse{.TasksToRun = currentTasksToRun, .TaskToDeleteIds = TaskToDeleteIds_}); } NThreading::TFuture<TGetFmrTableInfoResponse> GetFmrTableInfo(const TGetFmrTableInfoRequest& request) override { TGuard<TMutex> guard(Mutex_); TGetFmrTableInfoResponse response; auto tableId = request.TableId; - if (!FmrTableStatistics_.contains(tableId)) { + if (!PartIdsForTables_.contains(tableId)) { response.ErrorMessages = {TFmrError{ .Component = EFmrComponent::Coordinator, .ErrorMessage = "Fmr table id " + tableId + " was not found" }}; return NThreading::MakeFuture(response); } - response.TableStats = FmrTableStatistics_[tableId].Stats; + response.TableStats = CalculateTableStats(tableId); return NThreading::MakeFuture(response); } @@ -219,11 +210,11 @@ private: if (Operations_.contains(operationId)) { auto& operationInfo = Operations_[operationId]; auto operationStatus = operationInfo.OperationStatus; - auto& taskIds = operationInfo.TaskIds; - YQL_ENSURE(taskIds.size() == 1); - auto taskId = *operationInfo.TaskIds.begin(); if (operationStatus != EOperationStatus::Accepted && operationStatus != EOperationStatus::InProgress) { - ClearTask(taskId); + auto& taskIds = operationInfo.TaskIds; + for (auto& taskId: taskIds) { + ClearTask(taskId); + } } } } else { @@ -245,7 +236,14 @@ private: YQL_ENSURE(Tasks_.contains(taskId)); auto& taskInfo = Tasks_[taskId]; TaskToDeleteIds_.erase(taskId); - Operations_.erase(taskInfo.OperationId); + + YQL_ENSURE(Operations_.contains(taskInfo.OperationId)); + auto& currentTaskIdsForOperation = Operations_[taskInfo.OperationId]; + currentTaskIdsForOperation.TaskIds.erase(taskId); + if (currentTaskIdsForOperation.TaskIds.empty()) { + // All task for operation are cleared, can clear it + Operations_.erase(taskInfo.OperationId); + } Tasks_.erase(taskId); } @@ -256,6 +254,7 @@ private: if (taskInfo.TaskStatus != ETaskStatus::Accepted && taskInfo.TaskStatus != ETaskStatus::InProgress) { return; } + YQL_CLOG(TRACE, FastMapReduce) << "Setting task status for task id" << taskId << " from " << taskInfo.TaskStatus << " to new Task status " << newTaskStatus << "\n"; taskInfo.TaskStatus = newTaskStatus; operationInfo.OperationStatus = GetOperationStatus(taskInfo.OperationId); if (taskErrorMessage) { @@ -265,102 +264,89 @@ private: } EOperationStatus GetOperationStatus(const TString& operationId) { - if (! Operations_.contains(operationId)) { + if (!Operations_.contains(operationId)) { return EOperationStatus::NotFound; } std::unordered_set<TString> taskIds = Operations_[operationId].TaskIds; - YQL_ENSURE(taskIds.size() == 1); + std::unordered_set<ETaskStatus> taskStatuses; + + for (auto& taskId: taskIds) { + taskStatuses.emplace(Tasks_[taskId].TaskStatus); + } + YQL_ENSURE(!taskStatuses.contains(ETaskStatus::Unknown)); - auto taskId = *taskIds.begin(); - ETaskStatus taskStatus = Tasks_[taskId].TaskStatus; - return static_cast<EOperationStatus>(taskStatus); + if (taskStatuses.contains(ETaskStatus::Failed)) { + return EOperationStatus::Failed; + } + if (taskStatuses.contains(ETaskStatus::InProgress)) { + return EOperationStatus::InProgress; + } + if (taskStatuses.contains(ETaskStatus::InProgress)) { + return EOperationStatus::InProgress; + } + if (taskStatuses.contains(ETaskStatus::Accepted)) { + return EOperationStatus::Accepted; + } + return EOperationStatus::Completed; } - TTableRange GetTableRangeFromId(const TString& tableId) { - if (!FmrTableStatistics_.contains(tableId)) { - TString partId = GenerateId(); - FmrTableStatistics_[tableId] = TCoordinatorFmrTableStats{.Stats=TTableStats{}, .PartId=partId}; - return TTableRange{.PartId = partId}; - } - auto fmrTableStats = FmrTableStatistics_[tableId]; - return TTableRange{ - .PartId = fmrTableStats.PartId, - .MinChunk = 0, - .MaxChunk = fmrTableStats.Stats.Chunks - }; + TFmrPartitionerSettings GetFmrPartitionerSettings(const NYT::TNode& fmrOperationSpec) { + TFmrPartitionerSettings settings; + auto& fmrPartitionSettings = fmrOperationSpec["partition"]["fmr_table"]; + settings.MaxDataWeightPerPart = fmrPartitionSettings["max_data_weight_per_part"].AsInt64(); + settings.MaxParts = fmrPartitionSettings["max_parts"].AsInt64(); + return settings; } - std::vector<TTaskTableRef> TaskInputTablesFromOperationInputTables(const std::vector<TOperationTableRef>& operationTables) { - std::vector<TTaskTableRef> taskInputTables; - for (auto& elem: operationTables) { - if (const TYtTableRef* ytTableRef = std::get_if<TYtTableRef>(&elem)) { - taskInputTables.emplace_back(*ytTableRef); - } else { - TFmrTableRef fmrTableRef = std::get<TFmrTableRef>(elem); - TString inputTableId = fmrTableRef.FmrTableId.Id; - TFmrTableInputRef tableInput{ - .TableId = inputTableId, - .TableRanges = {GetTableRangeFromId(inputTableId)} - }; - taskInputTables.emplace_back(tableInput); - } + TYtPartitionerSettings GetYtPartitionerSettings(const NYT::TNode& fmrOperationSpec) { + TYtPartitionerSettings settings; + auto& ytPartitionSettings = fmrOperationSpec["partition"]["yt_table"]; + settings.MaxDataWeightPerPart = ytPartitionSettings["max_data_weight_per_part"].AsInt64(); + settings.MaxParts = ytPartitionSettings["max_parts"].AsInt64(); + return settings; + } + + std::vector<TTaskParams> PartitionOperationIntoSeveralTasks(const TOperationParams& operationParams, const NYT::TNode& fmrOperationSpec, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) { + auto fmrPartitionerSettings = GetFmrPartitionerSettings(fmrOperationSpec); + auto ytPartitionerSettings = GetYtPartitionerSettings(fmrOperationSpec); + auto fmrPartitioner = TFmrPartitioner(PartIdsForTables_,PartIdStats_, fmrPartitionerSettings); // TODO - fix this + + std::vector<TYtTableRef> ytInputTables; + std::vector<TFmrTableRef> fmrInputTables; + GetOperationInputTables(ytInputTables, fmrInputTables, operationParams); + + TPartitionResult partitionResult = PartitionInputTablesIntoTasks(ytInputTables, fmrInputTables, fmrPartitioner, YtCoordinatorService_, clusterConnections, ytPartitionerSettings); + if (!partitionResult.PartitionStatus) { + ythrow yexception() << "Failed to partition input tables into tasks"; + // TODO - return FAILED_PARTITIONING status instead. } - return taskInputTables; + return GetOutputTaskParams(partitionResult, operationParams); } - std::vector<TFmrTableOutputRef> TaskOutputTablesFromOperationOutputTables(const std::vector<TFmrTableRef>& operationTables) { - std::vector<TFmrTableOutputRef> taskOutputTables; - for (auto& fmrTableRef: operationTables) { - TString outputTableId = fmrTableRef.FmrTableId.Id; - TFmrTableOutputRef tableOutput{ - .TableId = outputTableId, - .PartId = GetTableRangeFromId(outputTableId).PartId - }; - taskOutputTables.emplace_back(tableOutput); + void GetOperationInputTables(std::vector<TYtTableRef>& ytInputTables, std::vector<TFmrTableRef>& fmrInputTables, const TOperationParams& operationParams) { + TOperationInputTablesGetter tablesGetter{}; + std::visit(tablesGetter, operationParams); + + auto& inputTables = tablesGetter.OperationTableRef; + for (auto& table: inputTables) { + auto ytTable = std::get_if<TYtTableRef>(&table); + auto fmrTable = std::get_if<TFmrTableRef>(&table); + if (ytTable) { + ytInputTables.emplace_back(*ytTable); + } else { + fmrInputTables.emplace_back(*fmrTable); } - return taskOutputTables; + } } - TTaskParams MakeDefaultTaskParamsFromOperation(const TOperationParams& operationParams) { - if (const TUploadOperationParams* uploadOperationParams = std::get_if<TUploadOperationParams>(&operationParams)) { - TUploadTaskParams uploadTaskParams{}; - uploadTaskParams.Output = uploadOperationParams->Output; - TString inputTableId = uploadOperationParams->Input.FmrTableId.Id; - TFmrTableInputRef fmrTableInput{ - .TableId = inputTableId, - .TableRanges = {GetTableRangeFromId(inputTableId)} - }; - uploadTaskParams.Input = fmrTableInput; - return uploadTaskParams; - } else if (const TDownloadOperationParams* downloadOperationParams = std::get_if<TDownloadOperationParams>(&operationParams)) { - TDownloadTaskParams downloadTaskParams{}; - downloadTaskParams.Input = downloadOperationParams->Input; - TString outputTableId = downloadOperationParams->Output.FmrTableId.Id; - TFmrTableOutputRef fmrTableOutput{ - .TableId = outputTableId, - .PartId = GetTableRangeFromId(outputTableId).PartId - }; - downloadTaskParams.Output = fmrTableOutput; - return downloadTaskParams; - } else if (const TMergeOperationParams* mergeOperationParams = std::get_if<TMergeOperationParams>(&operationParams)) { - TMergeTaskParams mergeTaskParams; - mergeTaskParams.Input = TaskInputTablesFromOperationInputTables(mergeOperationParams->Input); - TFmrTableOutputRef outputTable; - mergeTaskParams.Output = TFmrTableOutputRef{.TableId = mergeOperationParams->Output.FmrTableId.Id}; - return mergeTaskParams; - } else if (const TMapOperationParams* mapOperationParams = std::get_if<TMapOperationParams>(&operationParams)) { - TMapTaskParams mapTaskParams; - mapTaskParams.Input = TaskInputTablesFromOperationInputTables(mapOperationParams->Input); - mapTaskParams.Output = TaskOutputTablesFromOperationOutputTables(mapOperationParams->Output); - mapTaskParams.Executable = mapOperationParams->Executable; - return mapTaskParams; - } else { - ythrow yexception() << "Unknown operation params"; - } + std::vector<TTaskParams> GetOutputTaskParams(const TPartitionResult& partitionResult, const TOperationParams& operationParams) { + TOutputTaskParamsGetter taskGetter{.PartitionResult = partitionResult}; + std::visit(taskGetter, operationParams); + return taskGetter.TaskParams; } - NYT::TNode GetJobSettings(const TMaybe<NYT::TNode>& currentFmrOperationSpec) { - // For now fmr operation spec only consists of job settings + NYT::TNode GetMergedFmrOperationSpec(const TMaybe<NYT::TNode>& currentFmrOperationSpec) { + // just pass whole merged operation spec for simplicity here if (!currentFmrOperationSpec) { return DefaultFmrOperationSpec_; } @@ -369,7 +355,76 @@ private: return resultFmrOperationSpec; } + void SetPartIdsForTask(TTask::TPtr task) { + // TODO - add failover, clearing previous partId if exists + TString partId = GenerateId(); + + auto* downloadTaskParams = std::get_if<TDownloadTaskParams>(&task->TaskParams); + auto* mergeTaskParams = std::get_if<TMergeTaskParams>(&task->TaskParams); + auto* mapTaskParams = std::get_if<TMapTaskParams>(&task->TaskParams); + if (downloadTaskParams) { + TString tableId = downloadTaskParams->Output.TableId; + downloadTaskParams->Output.PartId = partId; + PartIdsForTables_[tableId].emplace_back(partId); + } else if (mergeTaskParams) { + TString tableId = mergeTaskParams->Output.TableId; + mergeTaskParams->Output.PartId = partId; + PartIdsForTables_[tableId].emplace_back(partId); + } else if (mapTaskParams) { + for (auto& fmrTableOutputRef: mapTaskParams->Output) { + TString tableId = fmrTableOutputRef.TableId; + fmrTableOutputRef.PartId = partId; + PartIdsForTables_[tableId].emplace_back(partId); + } + } + } + + TTableStats CalculateTableStats(const TString& tableId, bool isOperationFinished = false) { + if (OperationTableStats_.contains(tableId)) { + return OperationTableStats_[tableId]; + } + TTableStats tableStats{}; + auto& partIds = PartIdsForTables_.at(tableId); + YQL_CLOG(DEBUG, FastMapReduce) << "Calculating table stats for table with id " << tableId << " with " << partIds.size() << " part ids"; + for (auto& part: partIds) { + auto& partStats = PartIdStats_[part]; + tableStats.Chunks += partStats.size(); + YQL_CLOG(DEBUG, FastMapReduce) << " Gotten " << partStats.size() << " chunks for part id " << part; + for (auto& chunkStats: PartIdStats_[part]) { + tableStats.DataWeight += chunkStats.DataWeight; + tableStats.Rows += chunkStats.Rows; + } + } + if (isOperationFinished) { + // Stats for table won't change, inserting into map for caching + OperationTableStats_[tableId] = tableStats; + } + return tableStats; + } + + ////////////////////////////////////////////////////////////////////////////////////////////////////////// + + struct TCoordinatorTaskInfo { + TTask::TPtr Task; + ETaskStatus TaskStatus; + TString OperationId; + }; + + struct TOperationInfo { + std::unordered_set<TString> TaskIds; + EOperationStatus OperationStatus; + std::vector<TFmrError> ErrorMessages; + TString SessionId; + std::unordered_set<TString> OutputTableIds = {}; + }; + + struct TIdempotencyKeyInfo { + TString OperationId; + TInstant OperationCreationTime; + }; + std::unordered_map<TString, TCoordinatorTaskInfo> Tasks_; // TaskId -> current info about it + std::queue<std::pair<TTask::TPtr, TString>> TasksToRun_; // Task, and TaskId std::unordered_set<TString> TaskToDeleteIds_; // TaskIds we want to pass to worker for deletion std::unordered_map<TString, TOperationInfo> Operations_; // OperationId -> current info about it std::unordered_map<TString, TIdempotencyKeyInfo> IdempotencyKeys_; // IdempotencyKey -> current info about it @@ -382,14 +437,90 @@ private: std::atomic<bool> StopCoordinator_; TDuration TimeToSleepBetweenClearKeyRequests_; TDuration IdempotencyKeyStoreTime_; - std::unordered_map<TFmrTableId, TCoordinatorFmrTableStats> FmrTableStatistics_; // TableId -> Statistics + + std::unordered_map<TFmrTableId, std::vector<TString>> PartIdsForTables_; // TableId -> List of all corresponding partIds + std::unordered_map<TString, std::vector<TChunkStats>> PartIdStats_; // PartId -> Detailed statistic for each chunk + std::unordered_map<TString, TTableStats> OperationTableStats_; // TableId -> Statistic for fmr table, filled when operation completes + + NYT::TNode DefaultFmrOperationSpec_; + IYtCoordinatorService::TPtr YtCoordinatorService_; // Needed for partitioning of yt tables + + ////////////////////////////////////////////////////////////////////////////////////////////////////////// + + // Helper structs for partitioning operation into tasks + + struct TOperationInputTablesGetter { + std::vector<TOperationTableRef> OperationTableRef; // will be filled when std::visit is called + + void operator () (const TUploadOperationParams& uploadOperationParams) { + OperationTableRef.emplace_back(uploadOperationParams.Input); + } + void operator () (const TDownloadOperationParams& downloadOperationParams) { + OperationTableRef.emplace_back(downloadOperationParams.Input); + } + void operator () (const TMergeOperationParams& mergeOperationParams) { + OperationTableRef = mergeOperationParams.Input; + } + void operator () (const TMapOperationParams& mapOperationParams) { + OperationTableRef = mapOperationParams.Input; + } + }; + + struct TOutputTaskParamsGetter { + std::vector<TTaskParams> TaskParams; // Will be filled when std::visit is called + TPartitionResult PartitionResult; + + void operator () (const TUploadOperationParams& uploadOperationParams) { + for (auto& task: PartitionResult.TaskInputs) { + TUploadTaskParams uploadTaskParams; + YQL_ENSURE(task.Inputs.size() == 1, "Upload task should have exactly one fmr table partition input"); + auto& fmrTablePart = task.Inputs[0]; + uploadTaskParams.Input = std::get<TFmrTableInputRef>(fmrTablePart); + uploadTaskParams.Output = uploadOperationParams.Output; + TaskParams.emplace_back(uploadTaskParams); + } + } + void operator () (const TDownloadOperationParams& downloadOperationParams) { + for (auto& task: PartitionResult.TaskInputs) { + TDownloadTaskParams downloadTaskParams; + YQL_ENSURE(task.Inputs.size() == 1, "Download task should have exactly one yt table partition input"); + auto& ytTablePart = task.Inputs[0]; + downloadTaskParams.Input = std::get<TYtTableTaskRef>(ytTablePart); + downloadTaskParams.Output = TFmrTableOutputRef{.TableId = downloadOperationParams.Output.FmrTableId.Id}; + // PartId for tasks which write to table data service will be set later + TaskParams.emplace_back(downloadTaskParams); + } + } + void operator () (const TMergeOperationParams& mergeOperationParams) { + for (auto& task: PartitionResult.TaskInputs) { + TMergeTaskParams mergeTaskParams; + mergeTaskParams.Input = task; + mergeTaskParams.Output = TFmrTableOutputRef{.TableId = mergeOperationParams.Output.FmrTableId.Id}; + TaskParams.emplace_back(mergeTaskParams); + } + } + void operator () (const TMapOperationParams& mapOperationParams) { + for (auto& task: PartitionResult.TaskInputs) { + TMapTaskParams mapTaskParams; + mapTaskParams.Input = task; + std::vector<TFmrTableOutputRef> fmrTableOutputRefs; + std::transform(mapOperationParams.Output.begin(), mapOperationParams.Output.end(), std::back_inserter(fmrTableOutputRefs), [] (const TFmrTableRef& fmrTableRef) { + return TFmrTableOutputRef{.TableId = fmrTableRef.FmrTableId.Id}; + }); + + mapTaskParams.Output = fmrTableOutputRefs; + mapTaskParams.Executable = mapOperationParams.Executable; // TODO - change Executable to mapper + TaskParams.emplace_back(mapTaskParams); + } + } + }; }; } // namespace -IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings) { - return MakeIntrusive<TFmrCoordinator>(settings); +IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings, IYtCoordinatorService::TPtr ytCoordinatorService) { + return MakeIntrusive<TFmrCoordinator>(settings, ytCoordinatorService); } } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h index 563752dfc7b..ceac2a7b8cc 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h @@ -6,6 +6,8 @@ #include <util/system/guard.h> #include <util/generic/queue.h> #include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> +#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h> +#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.h> namespace NYql::NFmr { @@ -19,6 +21,9 @@ struct TFmrCoordinatorSettings { TFmrCoordinatorSettings(); }; -IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings = TFmrCoordinatorSettings()); +IFmrCoordinator::TPtr MakeFmrCoordinator( + const TFmrCoordinatorSettings& settings = TFmrCoordinatorSettings(), + IYtCoordinatorService::TPtr ytCoordinatorService = MakeYtCoordinatorService() +); } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.cpp new file mode 100644 index 00000000000..9f3f305130a --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.cpp @@ -0,0 +1,168 @@ +#include "yql_yt_partitioner.h" +#include <library/cpp/iterator/enumerate.h> +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/utils/yql_panic.h> +#include <yt/cpp/mapreduce/common/helpers.h> + +namespace NYql::NFmr { + +TFmrPartitioner::TFmrPartitioner( + const std::unordered_map<TFmrTableId, std::vector<TString>>& partIdsForTables, + const std::unordered_map<TString, std::vector<TChunkStats>>& partIdStats, + const TFmrPartitionerSettings& settings +) + : PartIdsForTables_(partIdsForTables), PartIdStats_(partIdStats), Settings_(settings) +{ +} + +std::pair<std::vector<TTaskTableInputRef>, bool> TFmrPartitioner::PartitionFmrTablesIntoTasks(const std::vector<TFmrTableRef>& fmrTables) { + // TODO - return matrix with ranges for all tables, in order to support table_index correctly. + if (fmrTables.empty()) { + return {{}, true}; + } + const ui64 maxDataWeightPerPart = Settings_.MaxDataWeightPerPart; + std::vector<TTaskTableInputRef> currentFmrTasks; + std::vector<TLeftoverRange> leftoverRanges; + // First try to create tasks in which all chunks have the same partId, then handle leftovers (end of chunks for each partId) + for (const auto& fmrTable: fmrTables) { + YQL_ENSURE(PartIdsForTables_.contains(fmrTable.FmrTableId)); + auto partIds = PartIdsForTables_.at(fmrTable.FmrTableId); + for (auto& partId: partIds) { + std::vector<TChunkStats> stats = PartIdStats_.at(partId); + HandleFmrPartition(fmrTable.FmrTableId, partId, stats, maxDataWeightPerPart, currentFmrTasks, leftoverRanges); + if (!CheckMaxTasksSize(currentFmrTasks)) { + return {{}, false}; + } + } + } + HandleFmrLeftoverRanges(maxDataWeightPerPart, currentFmrTasks, leftoverRanges); + if (!CheckMaxTasksSize(currentFmrTasks)) { + return {{}, false}; + } + return {currentFmrTasks, true}; +} + +void TFmrPartitioner::HandleFmrPartition( + const TFmrTableId& fmrTable, + const TString& partId, + const std::vector<TChunkStats> stats, + ui64 maxDataWeightPerPart, + std::vector<TTaskTableInputRef>& currentFmrTasks, + std::vector<TLeftoverRange>& leftoverRanges +) { + ui64 curDataWeight = 0; + i64 curMinChunk = -1; + + for (ui64 i = 0; i < stats.size();) { + if (curDataWeight + stats[i].DataWeight <= maxDataWeightPerPart) { + // check if we can add this chunk to current task, or have to split + curDataWeight += stats[i].DataWeight; + if (curMinChunk == -1) { + curMinChunk = i; + } + ++i; + } else { + if (curMinChunk != -1) { + std::vector<TTableRange> tableRange{TTableRange{.PartId = partId, .MinChunk = static_cast<ui64>(curMinChunk), .MaxChunk = i}}; + TFmrTableInputRef fmrTableInput{.TableId = fmrTable.Id, .TableRanges = tableRange}; + currentFmrTasks.emplace_back(TTaskTableInputRef{.Inputs = {fmrTableInput}}); + } + curMinChunk = -1; + curDataWeight = 0; + ui64 j = i; + while (j < stats.size()) { + // iterate to create separate tasks for all chunks which are larger then maxDataWeight + if (stats[j].DataWeight < maxDataWeightPerPart) { + break; + } + std::vector<TTableRange> tableRange{TTableRange{.PartId = partId, .MinChunk = j, .MaxChunk = j + 1}}; + TFmrTableInputRef fmrTableInput{.TableId = fmrTable.Id, .TableRanges = tableRange}; + currentFmrTasks.emplace_back(TTaskTableInputRef{.Inputs = {fmrTableInput}}); + ++j; + } + i = j; + } + } + + if (curMinChunk != -1) { + TTableRange leftoverTableRange{.PartId = partId, .MinChunk = static_cast<ui64>(curMinChunk), .MaxChunk = stats.size()}; + leftoverRanges.emplace_back(TLeftoverRange{.TableId = fmrTable.Id, .TableRange = leftoverTableRange, .DataWeight = curDataWeight}); + } +} + +void TFmrPartitioner::HandleFmrLeftoverRanges( + ui64 maxDataWeightPerPart, + std::vector<TTaskTableInputRef>& fmrTasks, + std::vector<TLeftoverRange>& leftoverRanges +) { + TTaskTableInputRef currentTask{}; + ui64 curDataWeight = 0; + TFmrTableInputRef curFmrTable; + TString curTableId; + for (auto& range: leftoverRanges) { + if (curDataWeight + range.DataWeight > maxDataWeightPerPart) { + if (curFmrTable != TFmrTableInputRef()) { + currentTask.Inputs.emplace_back(curFmrTable); + curFmrTable = TFmrTableInputRef(); + curTableId = range.TableId; + } + fmrTasks.emplace_back(currentTask); + currentTask = TTaskTableInputRef(); + curDataWeight = 0; + } + if (range.TableId != curTableId && curFmrTable != TFmrTableInputRef()) { + currentTask.Inputs.emplace_back(curFmrTable); + curFmrTable = TFmrTableInputRef(); + } + curTableId = range.TableId; + curFmrTable.TableId = curTableId; + curFmrTable.TableRanges.emplace_back(range.TableRange); + curDataWeight += range.DataWeight; + } + + currentTask.Inputs.emplace_back(curFmrTable); + fmrTasks.emplace_back(currentTask); +} + +bool TFmrPartitioner::CheckMaxTasksSize(const std::vector<TTaskTableInputRef>& currentFmrTasks) { + return currentFmrTasks.size() <= Settings_.MaxParts; +} + +TPartitionResult PartitionInputTablesIntoTasks( + const std::vector<TYtTableRef>& ytInputTables, + const std::vector<TFmrTableRef> fmrInputTables, + TFmrPartitioner& partitioner, + IYtCoordinatorService::TPtr ytCoordinatorService, + const std::unordered_map<TFmrTableId, TClusterConnection> &clusterConnections, + const TYtPartitionerSettings& ytPartitionSettings +) { + + std::vector<TTaskTableRef> tasks; + std::vector<TTaskTableInputRef> currentTasks; + + auto [gottenFmrTasks, fmrPartitionStatus] = partitioner.PartitionFmrTablesIntoTasks(fmrInputTables); + if (!fmrPartitionStatus) { + return TPartitionResult{.PartitionStatus = false}; + } + YQL_CLOG(INFO, FastMapReduce) << "Successfully partitioned input fmr tables into " << gottenFmrTasks.size() << " tasks"; + for (auto& fmrTask: gottenFmrTasks) { + YQL_CLOG(DEBUG, FastMapReduce) << fmrTask; + currentTasks.emplace_back(fmrTask); + } + if (ytInputTables.empty()) { + return TPartitionResult{.TaskInputs = currentTasks, .PartitionStatus = true}; + } + auto settings = ytPartitionSettings; + if (settings.MaxParts <= gottenFmrTasks.size()) { + return TPartitionResult{.PartitionStatus = false}; + } + settings.MaxParts = ytPartitionSettings.MaxParts - gottenFmrTasks.size(); + auto [gottenYtTasks, ytPartitionStatus] = ytCoordinatorService->PartitionYtTables(ytInputTables, clusterConnections, settings); + for (auto& ytTask: gottenYtTasks) { + currentTasks.emplace_back(TTaskTableInputRef{.Inputs = {ytTask}}); + } + YQL_CLOG(INFO, FastMapReduce) << "Gotten " << currentTasks.size() << " yt and fmr tasks to run from operation input tables"; + return TPartitionResult{.TaskInputs = currentTasks, .PartitionStatus = ytPartitionStatus}; +} + +} diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.h new file mode 100644 index 00000000000..d053ecbcfdd --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.h @@ -0,0 +1,66 @@ +#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> +#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h> + +namespace NYql::NFmr { + +struct TPartitionResult { + std::vector<TTaskTableInputRef> TaskInputs; + bool PartitionStatus = false; +}; + +struct TFmrPartitionerSettings { + ui64 MaxDataWeightPerPart = 0; + ui64 MaxParts = 0; +}; + +class TFmrPartitioner { +public: + TFmrPartitioner( + const std::unordered_map<TFmrTableId, std::vector<TString>>& partIdsForTables, + const std::unordered_map<TString, std::vector<TChunkStats>>& partIdStats, + const TFmrPartitionerSettings& settings + ); + + std::pair<std::vector<TTaskTableInputRef>, bool> PartitionFmrTablesIntoTasks(const std::vector<TFmrTableRef>& fmrTables); + +private: + struct TLeftoverRange { + TString TableId; + TTableRange TableRange; + ui64 DataWeight; + }; + + void HandleFmrPartition( + const TFmrTableId& fmrTable, + const TString& partId, + const std::vector<TChunkStats> stats, + ui64 maxDataWeightPerPart, + std::vector<TTaskTableInputRef>& currentFmrTasks, + std::vector<TLeftoverRange>& leftoverRanges + ); + + void HandleFmrLeftoverRanges( + ui64 maxDataWeightPerPart, + std::vector<TTaskTableInputRef>& fmrTasks, + std::vector<TLeftoverRange>& leftoverRanges + ); + + bool CheckMaxTasksSize(const std::vector<TTaskTableInputRef>& currentFmrTasks); + +private: + const std::unordered_map<TFmrTableId, std::vector<TString>> PartIdsForTables_; // TableId -> all corresponding part ids. + const std::unordered_map<TString, std::vector<TChunkStats>> PartIdStats_; // PartId -> statistics for all existing chunks in it. + const TFmrPartitionerSettings Settings_; +}; + +TPartitionResult PartitionInputTablesIntoTasks( + const std::vector<TYtTableRef>& ytInputTables, + const std::vector<TFmrTableRef> fmrInputTables, + TFmrPartitioner& partitioner, + IYtCoordinatorService::TPtr ytCoordinatorService, + const std::unordered_map<TFmrTableId, TClusterConnection> &clusterConnections, + const TYtPartitionerSettings& ytPartitionSettings +); + +} // namespace NYql::NFmr + diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp index 1e2407b1ea5..d237adab447 100644 --- a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp @@ -13,6 +13,7 @@ NProto::THeartbeatRequest HeartbeatRequestToProto(const THeartbeatRequest& heart protoHeartbeatRequest.AddTaskStates(); protoHeartbeatRequest.MutableTaskStates(i)->Swap(&protoTaskState); } + protoHeartbeatRequest.SetAvailableSlots(heartbeatRequest.AvailableSlots); return protoHeartbeatRequest; } @@ -26,6 +27,7 @@ THeartbeatRequest HeartbeatRequestFromProto(const NProto::THeartbeatRequest prot taskStates.emplace_back(TIntrusivePtr<TTaskState>(new TTaskState(curTaskState))); } heartbeatRequest.TaskStates = taskStates; + heartbeatRequest.AvailableSlots = protoHeartbeatRequest.GetAvailableSlots(); return heartbeatRequest; } @@ -70,7 +72,7 @@ NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperatio protoStartOperationRequest.SetIdempotencyKey(*startOperationRequest.IdempotencyKey); } protoStartOperationRequest.SetNumRetries(startOperationRequest.NumRetries); - auto clusterConnections = *protoStartOperationRequest.MutableClusterConnections(); + auto& clusterConnections = *protoStartOperationRequest.MutableClusterConnections(); for (auto& [tableName, conn]: startOperationRequest.ClusterConnections) { clusterConnections[tableName.Id] = ClusterConnectionToProto(conn); } @@ -122,6 +124,11 @@ NProto::TGetOperationResponse GetOperationResponseToProto(const TGetOperationRes auto protoError = FmrErrorToProto(errorMessage); curError->Swap(&protoError); } + for (auto& tableStats: getOperationResponse.OutputTablesStats) { + auto* curTableStats = protoGetOperationResponse.AddTableStats(); + auto protoTableStats = TableStatsToProto(tableStats); + curTableStats->Swap(&protoTableStats); + } return protoGetOperationResponse; } @@ -129,11 +136,17 @@ TGetOperationResponse GetOperationResponseFromProto(const NProto::TGetOperationR TGetOperationResponse getOperationResponse; getOperationResponse.Status = static_cast<EOperationStatus>(protoGetOperationReponse.GetStatus()); std::vector<TFmrError> errorMessages; + std::vector<TTableStats> outputTableStats; for (size_t i = 0; i < protoGetOperationReponse.ErrorMessagesSize(); ++i) { TFmrError errorMessage = FmrErrorFromProto(protoGetOperationReponse.GetErrorMessages(i)); errorMessages.emplace_back(errorMessage); } + for (size_t i = 0; i < protoGetOperationReponse.TableStatsSize(); ++i) { + TTableStats tableStats = TableStatsFromProto(protoGetOperationReponse.GetTableStats(i)); + outputTableStats.emplace_back(tableStats); + } getOperationResponse.ErrorMessages = errorMessages; + getOperationResponse.OutputTablesStats = outputTableStats; return getOperationResponse; } diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h index a42cb0d35a6..be016c230b2 100644 --- a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h +++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h @@ -11,6 +11,7 @@ struct THeartbeatRequest { ui32 WorkerId; TString VolatileId; std::vector<TTaskState::TPtr> TaskStates; + ui64 AvailableSlots = 0; }; // Worker sends requests in loop or long polling @@ -57,7 +58,7 @@ struct TGetFmrTableInfoRequest { }; struct TGetFmrTableInfoResponse { - TTableStats TableStats; // for only one PartId + TTableStats TableStats; std::vector<TFmrError> ErrorMessages = {}; }; diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make new file mode 100644 index 00000000000..0f2075ad1b3 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make @@ -0,0 +1,13 @@ +UNITTEST() + +SRCS( + yql_yt_coordinator_service_ut.cpp +) + +PEERDIR( + yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/yql_yt_coordinator_service_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/yql_yt_coordinator_service_ut.cpp new file mode 100644 index 00000000000..75ade8a9e05 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/yql_yt_coordinator_service_ut.cpp @@ -0,0 +1,43 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <util/stream/file.h> +#include <util/system/tempfile.h> +#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h> + +namespace NYql::NFmr { + +Y_UNIT_TEST_SUITE(FileYtCoordinatorServiceTests) { + Y_UNIT_TEST(PartitionFiles) { + const i64 FileNums = 5; + + std::vector<THolder<TTempFileHandle>> fileHandles(FileNums); + std::vector<TYtTableRef> ytTables(FileNums, TYtTableRef()); + std::vector<i64> fileLengths = {30, 10, 20, 5, 40}; + + for (int i = 0; i < FileNums; ++i) { + fileHandles[i] = MakeHolder<TTempFileHandle>(); + auto curFileName= fileHandles[i]->Name(); + ytTables[i].FilePath = curFileName; + TFileOutput writer(curFileName); + writer.Write(TString("1") * fileLengths[i]); + writer.Flush(); + } + + auto fileService = MakeFileYtCoordinatorService(); + auto settings = TYtPartitionerSettings{.MaxDataWeightPerPart = 50, .MaxParts = 100}; + auto [gottenPartitions, status] = fileService->PartitionYtTables(ytTables, {}, settings); + UNIT_ASSERT_VALUES_EQUAL(status, true); + + std::vector<std::vector<TString>> expectedFilePartitions = { + {fileHandles[0]->Name(), fileHandles[1]->Name()}, + {fileHandles[2]->Name(), fileHandles[3]->Name()}, + {fileHandles[4]->Name()} + }; + std::vector<std::vector<TString>> gottenFileParititons; + for (auto& part: gottenPartitions) { + gottenFileParititons.emplace_back(part.FilePaths); + } + UNIT_ASSERT_VALUES_EQUAL(gottenFileParititons, expectedFilePartitions); + } +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ya.make b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ya.make new file mode 100644 index 00000000000..b307ab11fae --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + yql_yt_file_coordinator_service.cpp +) + +PEERDIR( + yt/cpp/mapreduce/common + yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE_FOR_TESTS(ut) diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.cpp b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.cpp new file mode 100644 index 00000000000..1d5dfb2d019 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.cpp @@ -0,0 +1,59 @@ +#include "yql_yt_file_coordinator_service.h" + +#include <library/cpp/yson/parser.h> +#include <util/stream/file.h> +#include <util/system/fstat.h> +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h> +#include <yt/yql/providers/yt/lib/yson_helpers/yson_helpers.h> +#include <yql/essentials/utils/yql_panic.h> + +namespace NYql::NFmr { + +namespace { + +class TFileYtCoordinatorService: public IYtCoordinatorService { +public: + + std::pair<std::vector<TYtTableTaskRef>, bool> PartitionYtTables( + const std::vector<TYtTableRef>& ytTables, + const std::unordered_map<TFmrTableId, TClusterConnection>& /*clusterConnections*/, + const TYtPartitionerSettings& settings + ) override { + const i64 maxDataWeightPerPart = settings.MaxDataWeightPerPart; + std::vector<TYtTableTaskRef> ytPartitions; + TYtTableTaskRef curYtTableTaskRef{}; + i64 curFileLength = 0; + for (auto& ytTable: ytTables) { + YQL_ENSURE(ytTable.FilePath); + auto fileLength = GetFileLength(*ytTable.FilePath); + if (fileLength + curFileLength > maxDataWeightPerPart) { + ytPartitions.emplace_back(curYtTableTaskRef); + if (ytPartitions.size() > settings.MaxParts) { + return {{}, false}; + } + curYtTableTaskRef = TYtTableTaskRef{}; + curFileLength = 0; + } + TString ytPath = NYT::AddPathPrefix(ytTable.Path, "//"); + auto richPath = NYT::TRichYPath(ytPath).Append(true); + // append RichPath just in case, TODO - figure out if we actually need to use it somewhere + curYtTableTaskRef.RichPaths.emplace_back(richPath); + curYtTableTaskRef.FilePaths.emplace_back(*ytTable.FilePath); + curFileLength += fileLength; + } + ytPartitions.emplace_back(curYtTableTaskRef); + if (ytPartitions.size() > settings.MaxParts) { + return {{}, false}; + } + return {ytPartitions, true}; + } +}; + +} // namespace + +IYtCoordinatorService::TPtr MakeFileYtCoordinatorService() { + return MakeIntrusive<TFileYtCoordinatorService>(); +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h new file mode 100644 index 00000000000..7d5ca59e547 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h @@ -0,0 +1,7 @@ +#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h> + +namespace NYql::NFmr { + +IYtCoordinatorService::TPtr MakeFileYtCoordinatorService(); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/ya.make b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/ya.make new file mode 100644 index 00000000000..bea26d3843a --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + yql_yt_coordinator_service_impl.cpp +) + +PEERDIR( + yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.cpp new file mode 100644 index 00000000000..bb556251ed9 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.cpp @@ -0,0 +1,96 @@ +#include "yql_yt_coordinator_service_impl.h" + +#include <library/cpp/yt/error/error.h> +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/yql/providers/yt/fmr/utils/yql_yt_client.h> +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/utils/yql_panic.h> + +namespace NYql::NFmr { + +namespace { + +class TYtCoordinatorService: public IYtCoordinatorService { +public: + + std::pair<std::vector<TYtTableTaskRef>, bool> PartitionYtTables( + const std::vector<TYtTableRef>& ytTables, + const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections, + const TYtPartitionerSettings& settings + ) override { + auto getTablePartitionsOptions = NYT::TGetTablePartitionsOptions() + .PartitionMode(NYT::ETablePartitionMode::Unordered) + .DataWeightPerPartition(settings.MaxDataWeightPerPart) + .MaxPartitionCount(settings.MaxParts) + .AdjustDataWeightPerPartition(false); // TODO - add adjust data weight into partitioner settings + + std::vector<TYtTableTaskRef> ytPartitions; + auto groupedYtTables = GroupYtTables(ytTables, clusterConnections); + for (auto& [ytTables, clusterConnection]: groupedYtTables) { + auto client = CreateClient(clusterConnection); + auto transaction = client->AttachTransaction(GetGuid(clusterConnection.TransactionId)); + TVector<NYT::TRichYPath> richPaths; + for (auto& ytTable: ytTables ) { + TString ytPath = NYT::AddPathPrefix(ytTable.Path, "//"); + richPaths.emplace_back(NYT::TRichYPath(ytPath).Cluster(ytTable.Cluster)); + } + try { + NYT::TMultiTablePartitions partitions = transaction->GetTablePartitions(richPaths, getTablePartitionsOptions); + + for (const auto& partition : partitions.Partitions) { + TYtTableTaskRef ytTableTaskRef{}; + for (const auto& richPath : partition.TableRanges) { + ytTableTaskRef.RichPaths.emplace_back(richPath); + } + ytPartitions.emplace_back(ytTableTaskRef); + } + } catch (NYT::TErrorException& ex) { + YQL_CLOG(INFO, FastMapReduce) << "Failed to partition yt tables with message: " << CurrentExceptionMessage(); + return {{}, false}; + } + } + YQL_CLOG(INFO, FastMapReduce) << "partitioned input yt tables into " << ytPartitions.size() << " tasks"; + for (auto& task: ytPartitions) { + YQL_CLOG(DEBUG, FastMapReduce) << task; + } + return {ytPartitions, true}; + } + +private: + struct TGroupedYtTablesByCluster { + std::vector<TYtTableRef> YtTables; + TClusterConnection ClusterConnection; + }; + + std::vector<TGroupedYtTablesByCluster> GroupYtTables( + const std::vector<TYtTableRef>& ytTables, + const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections + ) { + std::vector<TGroupedYtTablesByCluster> tableGroups; + std::unordered_map<TString, ui64> ytServerToGroups; + for (auto& ytTable: ytTables) { + auto fmrTableId = TFmrTableId(ytTable.Cluster, ytTable.Path); + auto clusterConnection = clusterConnections.at(fmrTableId); + auto ytServerName = clusterConnection.YtServerName; + if (!ytServerToGroups.contains(ytServerName)) { + tableGroups.emplace_back(TGroupedYtTablesByCluster{ + .YtTables = {ytTable}, + .ClusterConnection = clusterConnection + }); + ytServerToGroups[ytServerName] = tableGroups.size() - 1; + } else { + auto index = ytServerToGroups[ytServerName]; + tableGroups[index].YtTables.emplace_back(ytTable); + } + } + return tableGroups; + } +}; + +} // namespace + +IYtCoordinatorService::TPtr MakeYtCoordinatorService() { + return MakeIntrusive<TYtCoordinatorService>(); +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.h b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.h new file mode 100644 index 00000000000..44a95233140 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.h @@ -0,0 +1,7 @@ +#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h> + +namespace NYql::NFmr { + +IYtCoordinatorService::TPtr MakeYtCoordinatorService(); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/ya.make b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/ya.make new file mode 100644 index 00000000000..612f56f02d0 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + yql_yt_coordinator_service_interface.cpp +) + +PEERDIR( + yt/cpp/mapreduce/interface + yt/yql/providers/yt/fmr/request_options +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.cpp b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.cpp new file mode 100644 index 00000000000..450c685eced --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.cpp @@ -0,0 +1 @@ +#include "yql_yt_coordinator_service_interface.h" diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h new file mode 100644 index 00000000000..13c01f5d8a8 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h @@ -0,0 +1,25 @@ +#pragma once + +#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> + +namespace NYql::NFmr { + +struct TYtPartitionerSettings { + ui64 MaxDataWeightPerPart = 0; + ui64 MaxParts = 0; +}; + +class IYtCoordinatorService: public TThrRefBase { +public: + virtual ~IYtCoordinatorService() = default; + + using TPtr = TIntrusivePtr<IYtCoordinatorService>; + + virtual std::pair<std::vector<TYtTableTaskRef>, bool> PartitionYtTables( + const std::vector<TYtTableRef>& ytTables, + const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections, + const TYtPartitionerSettings& settings + ) = 0; +}; + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make b/yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make index b0c5e5cdf4f..dbada26432e 100644 --- a/yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make +++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make @@ -8,12 +8,14 @@ PEERDIR( yt/yql/providers/yt/gateway/fmr yt/yql/providers/yt/fmr/coordinator/client yt/yql/providers/yt/fmr/coordinator/impl + yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file + yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl yt/yql/providers/yt/fmr/job/impl yt/yql/providers/yt/fmr/job_factory/impl yt/yql/providers/yt/fmr/table_data_service/local yt/yql/providers/yt/fmr/worker/impl - yt/yql/providers/yt/fmr/yt_service/file - yt/yql/providers/yt/fmr/yt_service/impl + yt/yql/providers/yt/fmr/yt_job_service/file + yt/yql/providers/yt/fmr/yt_job_service/impl ) YQL_LAST_ABI_VERSION() diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp index 1d6bc504a5e..7849baa2517 100644 --- a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp +++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp @@ -11,7 +11,7 @@ std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(IYtGateway::T coordinatorSettings.DefaultFmrOperationSpec = fmrOperationSpec; } - auto coordinator = MakeFmrCoordinator(coordinatorSettings); + auto coordinator = isFileGateway ? MakeFmrCoordinator(coordinatorSettings, MakeFileYtCoordinatorService()) : MakeFmrCoordinator(coordinatorSettings, MakeYtCoordinatorService()); if (!coordinatorServerUrl.empty()) { TFmrCoordinatorClientSettings coordinatorClientSettings; THttpURL parsedUrl; @@ -26,10 +26,10 @@ std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(IYtGateway::T IFmrWorker::TPtr worker = nullptr; if (!disableLocalFmrWorker) { auto tableDataService = MakeLocalTableDataService(TLocalTableDataServiceSettings(3)); - auto fmrYtSerivce = isFileGateway ? MakeFileYtSerivce() : MakeFmrYtSerivce(); + auto fmrYtJobSerivce = isFileGateway ? MakeFileYtJobSerivce() : MakeYtJobSerivce(); - auto func = [tableDataService, fmrYtSerivce] (NFmr::TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) mutable { - return RunJob(task, tableDataService, fmrYtSerivce, cancelFlag); + auto func = [tableDataService, fmrYtJobSerivce] (NFmr::TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) mutable { + return RunJob(task, tableDataService, fmrYtJobSerivce, cancelFlag); }; NFmr::TFmrJobFactorySettings settings{.Function=func}; diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h index 24f17928e17..743f856eb8c 100644 --- a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h +++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h @@ -4,11 +4,14 @@ #include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h> #include <yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h> #include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h> +#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h> +#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.h> #include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h> #include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> #include <yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.h> -#include <yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h> -#include <yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h> +#include <yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h> +#include <yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.h> + namespace NYql::NFmr { diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make index a997d1c1983..6b372d758b3 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make +++ b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make @@ -7,8 +7,9 @@ SRCS( ) PEERDIR( + yt/cpp/mapreduce/common yt/yql/providers/yt/fmr/job/impl - yt/yql/providers/yt/fmr/yt_service/mock + yt/yql/providers/yt/fmr/yt_job_service/mock yt/yql/providers/yt/fmr/table_data_service/local yql/essentials/utils/log yql/essentials/parser/pg_wrapper diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp index 6b9ce068652..614fe30a44d 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp @@ -1,8 +1,9 @@ #include <library/cpp/testing/unittest/registar.h> +#include <yt/cpp/mapreduce/common/helpers.h> #include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h> #include <yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.h> #include <yt/yql/providers/yt/fmr/utils/yql_yt_table_data_service_key.h> -#include <yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h> +#include <yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.h> namespace NYql::NFmr { @@ -36,12 +37,13 @@ TString GetTextYson(const TString& binaryYsonContent) { Y_UNIT_TEST_SUITE(FmrJobTests) { Y_UNIT_TEST(DownloadTable) { ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); - TYtTableRef input = TYtTableRef("test_cluster", "test_path"); - std::unordered_map<TYtTableRef, TString> inputTables{{input, TableContent_1}}; + auto richPath = NYT::TRichYPath("//test_path").Cluster("test_cluster"); + TYtTableTaskRef input = TYtTableTaskRef{.RichPaths = {richPath}}; + std::unordered_map<TString, TString> inputTables{{NYT::NodeToCanonicalYsonString(NYT::PathToNode(richPath)), TableContent_1}}; std::unordered_map<TYtTableRef, TString> outputTables; - NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables); + NYql::NFmr::IYtJobService::TPtr ytJobService = MakeMockYtJobService(inputTables, outputTables); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService); + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytJobService); TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id"); TDownloadTaskParams params = TDownloadTaskParams(input, output); @@ -53,7 +55,10 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { auto statistics = std::get_if<TStatistics>(&res); UNIT_ASSERT_C(!err, err->ErrorMessage); - UNIT_ASSERT_EQUAL(statistics->OutputTables.at(output).Rows, 4); + auto detailedChunkStats = statistics->OutputTables.at(output).PartIdChunkStats; + UNIT_ASSERT_VALUES_EQUAL(detailedChunkStats.size(), 1); // coordinator settings taken from file with default values, so large chunk size + UNIT_ASSERT_VALUES_EQUAL(detailedChunkStats[0].Rows, 4); + auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync(); UNIT_ASSERT_C(resultTableContent, "Result table content is empty"); UNIT_ASSERT_NO_DIFF(*resultTableContent, GetBinaryYson(TableContent_1)); @@ -61,11 +66,12 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { Y_UNIT_TEST(UploadTable) { ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); - std::unordered_map<TYtTableRef, TString> inputTables, outputTables; - NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables); + std::unordered_map<TString, TString> inputTables; + std::unordered_map<TYtTableRef, TString> outputTables; + NYql::NFmr::IYtJobService::TPtr ytJobService = MakeMockYtJobService(inputTables, outputTables); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService); + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytJobService); TYtTableRef output = TYtTableRef("test_cluster", "test_path"); std::vector<TTableRange> ranges = {{"test_part_id"}}; @@ -89,20 +95,21 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { std::vector<TTableRange> ranges = {{"test_part_id"}}; TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges}; - TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster"); + auto richPath = NYT::TRichYPath("//test_path").Cluster("test_cluster"); + TYtTableTaskRef input_2 = TYtTableTaskRef{.RichPaths = {richPath}}; TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges}; - std::unordered_map<TYtTableRef, TString> inputTables{{input_2, TableContent_2}}; + std::unordered_map<TString, TString> inputTables{{NYT::NodeToCanonicalYsonString(NYT::PathToNode(richPath)), TableContent_2}}; std::unordered_map<TYtTableRef, TString> outputTables; - NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables); + NYql::NFmr::IYtJobService::TPtr ytJobService = MakeMockYtJobService(inputTables, outputTables); auto cancelFlag = std::make_shared<std::atomic<bool>>(false); - IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService); + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytJobService); TTaskTableRef input_table_ref_1 = {input_1}; TTaskTableRef input_table_ref_2 = {input_2}; TTaskTableRef input_table_ref_3 = {input_3}; TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id_output", "test_part_id"); std::vector<TTaskTableRef> inputs = {input_table_ref_1, input_table_ref_2, input_table_ref_3}; - auto params = TMergeTaskParams(inputs, output); + auto params = TMergeTaskParams{.Input = TTaskTableInputRef{.Inputs = inputs}, .Output = output}; auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0); @@ -130,17 +137,19 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { Y_UNIT_TEST_SUITE(TaskRunTests) { Y_UNIT_TEST(RunDownloadTask) { ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); - TYtTableRef input = TYtTableRef("test_cluster", "test_path"); - std::unordered_map<TYtTableRef, TString> inputTables{{input, TableContent_1}}; + auto richPath = NYT::TRichYPath("//test_path").Cluster("test_cluster"); + TYtTableTaskRef input = TYtTableTaskRef{.RichPaths = {richPath}}; + TFmrTableId inputFmrId("test_cluster", "test_path"); + std::unordered_map<TString, TString> inputTables{{NYT::NodeToCanonicalYsonString(NYT::PathToNode(richPath)), TableContent_1}}; std::unordered_map<TYtTableRef, TString> outputTables; - NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables); + NYql::NFmr::IYtJobService::TPtr ytJobService = MakeMockYtJobService(inputTables, outputTables); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id"); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); TDownloadTaskParams params = TDownloadTaskParams(input, output); TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}); - ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; + ETaskStatus status = RunJob(task, tableDataServicePtr, ytJobService, cancelFlag).TaskStatus; UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed); auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync(); @@ -151,8 +160,9 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { Y_UNIT_TEST(RunUploadTask) { ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); - std::unordered_map<TYtTableRef, TString> inputTables, outputTables; - NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables); + std::unordered_map<TString, TString> inputTables; + std::unordered_map<TYtTableRef, TString> outputTables; + NYql::NFmr::IYtJobService::TPtr ytJobService = MakeMockYtJobService(inputTables, outputTables); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); std::vector<TTableRange> ranges = {{"test_part_id"}}; @@ -163,7 +173,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}); auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0); tableDataServicePtr->Put(key, GetBinaryYson(TableContent_1)); - ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; + ETaskStatus status = RunJob(task, tableDataServicePtr, ytJobService, cancelFlag).TaskStatus; UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed); UNIT_ASSERT(outputTables.contains(output)); @@ -172,8 +182,9 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { Y_UNIT_TEST(RunUploadTaskWithNoTable) { ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); - std::unordered_map<TYtTableRef, TString> inputTables, outputTables; - NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables); + std::unordered_map<TString, TString> inputTables; + std::unordered_map<TYtTableRef, TString> outputTables; + NYql::NFmr::IYtJobService::TPtr ytJobService = MakeMockYtJobService(inputTables, outputTables); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); std::vector<TTableRange> ranges = {{"test_part_id"}}; @@ -185,7 +196,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { // No tables in tableDataService UNIT_ASSERT_EXCEPTION_CONTAINS( - RunJob(task, tableDataServicePtr, ytService, cancelFlag), + RunJob(task, tableDataServicePtr, ytJobService, cancelFlag), yexception, "No data for chunk:test_table_id:test_part_id" ); @@ -195,11 +206,13 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); std::vector<TTableRange> ranges = {{"test_part_id"}}; TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges}; - TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster"); + auto richPath = NYT::TRichYPath("//test_path").Cluster("test_cluster"); + TYtTableTaskRef input_2 = TYtTableTaskRef{.RichPaths = {richPath}}; + TFmrTableId inputFmrId_2("test_cluster", "test_path"); TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges}; - std::unordered_map<TYtTableRef, TString> inputTables{{input_2, TableContent_2}}; + std::unordered_map<TString, TString> inputTables{{NYT::NodeToCanonicalYsonString(NYT::PathToNode(richPath)), TableContent_2}}; std::unordered_map<TYtTableRef, TString> outputTables; - NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables); + NYql::NFmr::IYtJobService::TPtr ytJobService = MakeMockYtJobService(inputTables, outputTables); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); TTaskTableRef input_table_ref_1 = {input_1}; @@ -207,7 +220,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TTaskTableRef input_table_ref_3 = {input_3}; TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id_output", "test_part_id"); std::vector<TTaskTableRef> inputs = {input_table_ref_1, input_table_ref_2, input_table_ref_3}; - auto params = TMergeTaskParams(inputs, output); + auto params = TMergeTaskParams{.Input = TTaskTableInputRef{.Inputs = inputs}, .Output = output}; auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}); @@ -217,7 +230,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { tableDataServicePtr->Put(key_1, GetBinaryYson(TableContent_1)); tableDataServicePtr->Put(key_3, GetBinaryYson(TableContent_3)); - ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; + ETaskStatus status = RunJob(task, tableDataServicePtr, ytJobService, cancelFlag).TaskStatus; UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed); auto resultTableContentMaybe = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync(); diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp index 925cdfff125..4d1a9860e48 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp @@ -12,7 +12,7 @@ const std::vector<TString> TableYsonRows = { "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};" }; -TTableStats WriteDataToTableDataSerice( +TTableChunkStats WriteDataToTableDataSerice( ITableDataService::TPtr tableDataService, const std::vector<TString>& tableYsonRows, ui64 chunkSize, @@ -34,18 +34,28 @@ TTableStats WriteDataToTableDataSerice( Y_UNIT_TEST_SUITE(FmrWriterTests) { Y_UNIT_TEST(WriteYsonRows) { - ui64 totalSize = 0; - for (auto& row: TableYsonRows) { + ui64 totalSize = 0, firstPartSize = 0, secPartSize = 0; + for (ui64 i = 0; i < TableYsonRows.size(); ++i) { + auto& row = TableYsonRows[i]; totalSize += row.size(); + if (i < 2) { + firstPartSize += row.size(); + } else { + secPartSize += row.size(); + } } ui64 chunkSize = totalSize / 2; ITableDataService::TPtr tableDataService = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + auto stats = WriteDataToTableDataSerice(tableDataService, TableYsonRows, chunkSize); - auto realChunks = stats.Chunks; - auto realDataWeight =stats.DataWeight; - UNIT_ASSERT_VALUES_EQUAL(realChunks, 2); - UNIT_ASSERT_VALUES_EQUAL(realDataWeight, totalSize); + UNIT_ASSERT_VALUES_EQUAL(stats.PartId, "partId"); + std::vector<TChunkStats> gottenPartIdChunkStats = stats.PartIdChunkStats; + std::vector<TChunkStats> expectedChunkStats = { + TChunkStats{.Rows = 2, .DataWeight = firstPartSize}, + TChunkStats{.Rows = 2, .DataWeight = secPartSize} + }; + UNIT_ASSERT(gottenPartIdChunkStats == expectedChunkStats); TString expectedFirstChunkTableContent = JoinRange(TStringBuf(), TableYsonRows.begin(), TableYsonRows.begin() + 2); TString expectedSecondChunkTableContent = JoinRange(TStringBuf(), TableYsonRows.begin() + 2, TableYsonRows.end()); diff --git a/yt/yql/providers/yt/fmr/job/impl/ya.make b/yt/yql/providers/yt/fmr/job/impl/ya.make index 02b5058984a..dc29cebd265 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ya.make +++ b/yt/yql/providers/yt/fmr/job/impl/ya.make @@ -11,10 +11,10 @@ PEERDIR( library/cpp/yson/node yt/cpp/mapreduce/interface yt/yql/providers/yt/fmr/job/interface - yt/yql/providers/yt/fmr/request_options yt/yql/providers/yt/fmr/utils yt/yql/providers/yt/fmr/table_data_service/interface yql/essentials/utils + yql/essentials/utils/log ) YQL_LAST_ABI_VERSION() diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp index e3ceec83dc7..24926bb415a 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp @@ -3,12 +3,13 @@ #include <util/stream/file.h> +#include <yt/cpp/mapreduce/common/helpers.h> // Для логов, потом мб убрать #include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h> #include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h> #include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h> #include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> #include <yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h> -#include <yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h> +#include <yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h> #include <yql/essentials/utils/log/log.h> @@ -17,8 +18,8 @@ namespace NYql::NFmr { class TFmrJob: public IFmrJob { public: - TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, const TFmrJobSettings& settings) - : TableDataService_(tableDataService), YtService_(ytService), Settings_(settings) + TFmrJob(ITableDataService::TPtr tableDataService, IYtJobService::TPtr ytJobService, const TFmrJobSettings& settings) + : TableDataService_(tableDataService), YtJobService_(ytJobService), Settings_(settings) { } @@ -28,25 +29,26 @@ public: std::shared_ptr<std::atomic<bool>> cancelFlag ) override { try { - const auto ytTable = params.Input; - const auto cluster = params.Input.Cluster; - const auto path = params.Input.Path; + const auto ytTableTaskRef = params.Input; const auto output = params.Output; const auto tableId = output.TableId; const auto partId = output.PartId; - YQL_CLOG(DEBUG, FastMapReduce) << "Downloading " << cluster << '.' << path; YQL_ENSURE(clusterConnections.size() == 1); - auto ytTableReader = YtService_->MakeReader(ytTable, clusterConnections.begin()->second, Settings_.YtReaderSettings); + + std::vector<NYT::TRawTableReaderPtr> ytTableReaders = GetYtTableReaders(ytTableTaskRef, clusterConnections); auto tableDataServiceWriter = MakeIntrusive<TFmrTableDataServiceWriter>(tableId, partId, TableDataService_, Settings_.FmrWriterSettings); - ParseRecords(ytTableReader, tableDataServiceWriter, Settings_.ParseRecordSettings.DonwloadReadBlockCount, Settings_.ParseRecordSettings.DonwloadReadBlockSize, cancelFlag); + for (auto& ytTableReader: ytTableReaders) { + ParseRecords(ytTableReader, tableDataServiceWriter, Settings_.ParseRecordSettings.DonwloadReadBlockCount, Settings_.ParseRecordSettings.DonwloadReadBlockSize, cancelFlag); + } tableDataServiceWriter->Flush(); - TTableStats stats = tableDataServiceWriter->GetStats(); + TTableChunkStats stats = tableDataServiceWriter->GetStats(); auto statistics = TStatistics({{output, stats}}); return statistics; } catch (...) { + YQL_CLOG(ERROR, FastMapReduce) << "Gotten error inside download: " << CurrentExceptionMessage(); return TError(CurrentExceptionMessage()); } } @@ -63,16 +65,15 @@ public: const auto tableId = params.Input.TableId; const auto tableRanges = params.Input.TableRanges; - YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path; - auto tableDataServiceReader = MakeIntrusive<TFmrTableDataServiceReader>(tableId, tableRanges, TableDataService_, Settings_.FmrReaderSettings); YQL_ENSURE(clusterConnections.size() == 1); - auto ytTableWriter = YtService_->MakeWriter(ytTable, clusterConnections.begin()->second, Settings_.YtWriterSettings); + auto ytTableWriter = YtJobService_->MakeWriter(ytTable, clusterConnections.begin()->second, Settings_.YtWriterSettings); ParseRecords(tableDataServiceReader, ytTableWriter, Settings_.ParseRecordSettings.UploadReadBlockCount, Settings_.ParseRecordSettings.UploadReadBlockSize, cancelFlag); ytTableWriter->Flush(); return TStatistics(); } catch (...) { + YQL_CLOG(ERROR, FastMapReduce) << "Gotten error inside upload: " << CurrentExceptionMessage(); return TError(CurrentExceptionMessage()); } } @@ -83,19 +84,25 @@ public: std::shared_ptr<std::atomic<bool>> cancelFlag ) override { try { - const auto inputs = params.Input; + const auto taskTableInputRef = params.Input; const auto output = params.Output; - YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs"; auto& parseRecordSettings = Settings_.ParseRecordSettings; auto tableDataServiceWriter = MakeIntrusive<TFmrTableDataServiceWriter>(output.TableId, output.PartId, TableDataService_, Settings_.FmrWriterSettings); auto threadPool = CreateThreadPool(parseRecordSettings.MergeNumThreads); TMaybe<TMutex> mutex = TMutex(); - for (const auto& inputTableRef : inputs) { - auto inputTableReader = GetTableInputStream(inputTableRef, clusterConnections); - threadPool->SafeAddFunc([&, inputTableReader] { - ParseRecords(inputTableReader, tableDataServiceWriter, parseRecordSettings.MergeReadBlockCount, parseRecordSettings.MergeReadBlockSize, cancelFlag, mutex); + for (const auto& inputTableRef : taskTableInputRef.Inputs) { + threadPool->SafeAddFunc([&, tableDataServiceWriter] { + try { + auto inputTableReaders = GetTableInputStreams(inputTableRef, clusterConnections); + for (auto& tableReader: inputTableReaders) { + ParseRecords(tableReader, tableDataServiceWriter, parseRecordSettings.MergeReadBlockCount, parseRecordSettings.MergeReadBlockSize, cancelFlag, mutex); + } + } catch (...) { + YQL_CLOG(ERROR, FastMapReduce) << CurrentExceptionMessage(); + throw yexception() << CurrentExceptionMessage(); + } }); } threadPool->Stop(); @@ -103,6 +110,7 @@ public: tableDataServiceWriter->Flush(); return TStatistics({{output, tableDataServiceWriter->GetStats()}}); } catch (...) { + YQL_CLOG(ERROR, FastMapReduce) << "Gotten error inside merge: " << CurrentExceptionMessage(); return TError(CurrentExceptionMessage()); } } @@ -112,48 +120,68 @@ public: const std::unordered_map<TFmrTableId, TClusterConnection>& /* clusterConnections */, std::shared_ptr<std::atomic<bool>> /* cancelFlag */ ) override { - Cerr << "MAP NOT IMPLEMENTED" << Endl; YQL_CLOG(ERROR, FastMapReduce) << "MAP NOT IMPLEMENTED"; ythrow yexception() << "Not implemented"; } private: - NYT::TRawTableReaderPtr GetTableInputStream(const TTaskTableRef& tableRef, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) const { - auto ytTable = std::get_if<TYtTableRef>(&tableRef); + std::vector<NYT::TRawTableReaderPtr> GetTableInputStreams(const TTaskTableRef& tableRef, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) const { + auto ytTableTaskRef = std::get_if<TYtTableTaskRef>(&tableRef); auto fmrTable = std::get_if<TFmrTableInputRef>(&tableRef); - if (ytTable) { - TFmrTableId tableId = {ytTable->Cluster, ytTable->Path}; - auto clusterConnection = clusterConnections.at(tableId); - return YtService_->MakeReader(*ytTable, clusterConnection, Settings_.YtReaderSettings); + if (ytTableTaskRef) { + return GetYtTableReaders(*ytTableTaskRef, clusterConnections); } else if (fmrTable) { - return MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, Settings_.FmrReaderSettings); + return {MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, Settings_.FmrReaderSettings)}; } else { ythrow yexception() << "Unsupported table type"; } } + std::vector<NYT::TRawTableReaderPtr> GetYtTableReaders(const TYtTableTaskRef& ytTableTaskRef, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) const { + std::vector<NYT::TRawTableReaderPtr> ytTableReaders; + if (!ytTableTaskRef.FilePaths.empty()) { + // underlying gateway is file, so create readers from filepaths. + for (auto& filePath: ytTableTaskRef.FilePaths) { + ytTableReaders.emplace_back(YtJobService_->MakeReader(filePath)); + YQL_CLOG(DEBUG, FastMapReduce) << "Creating reader for file path " << filePath; + } + } else { + for (auto& richPath: ytTableTaskRef.RichPaths) { + YQL_ENSURE(richPath.Cluster_); + + // TODO - вместо этого написать нормальные хелперы из RichPath в структуры и назад + TStringBuf choppedPath; + YQL_ENSURE(TStringBuf(richPath.Path_).AfterPrefix("//", choppedPath)); + auto fmrTableId = TFmrTableId(*richPath.Cluster_, TString(choppedPath)); + auto clusterConnection = clusterConnections.at(fmrTableId); + ytTableReaders.emplace_back(YtJobService_->MakeReader(richPath, clusterConnection, Settings_.YtReaderSettings)); + } + } + return ytTableReaders; + } + private: ITableDataService::TPtr TableDataService_; - IYtService::TPtr YtService_; + IYtJobService::TPtr YtJobService_; TFmrJobSettings Settings_; }; IFmrJob::TPtr MakeFmrJob( ITableDataService::TPtr tableDataService, - IYtService::TPtr ytService, + IYtJobService::TPtr ytJobService, const TFmrJobSettings& settings ) { - return MakeIntrusive<TFmrJob>(tableDataService, ytService, settings); + return MakeIntrusive<TFmrJob>(tableDataService, ytJobService, settings); } TJobResult RunJob( TTask::TPtr task, ITableDataService::TPtr tableDataService, - IYtService::TPtr ytService, + IYtJobService::TPtr ytJobService, std::shared_ptr<std::atomic<bool>> cancelFlag ) { TFmrJobSettings jobSettings = GetJobSettingsFromTask(task); - IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, jobSettings); + IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytJobService, jobSettings); auto processTask = [job, task, cancelFlag] (auto&& taskParams) { using T = std::decay_t<decltype(taskParams)>; diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h index 09b17090427..93affcaf4ed 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h @@ -4,7 +4,7 @@ #include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h> #include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h> #include <yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h> -#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h> +#include <yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h> #include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> namespace NYql::NFmr { @@ -28,9 +28,9 @@ struct TFmrJobSettings { ui64 NumThreads = 0; }; -IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, const TFmrJobSettings& settings = {}); +IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtJobService::TPtr ytJobService, const TFmrJobSettings& settings = {}); -TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag); +TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtJobService::TPtr ytJobService, std::shared_ptr<std::atomic<bool>> cancelFlag); TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task); diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp index 9b2caf44aa4..48476ef9d91 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp @@ -16,6 +16,7 @@ TFmrTableDataServiceReader::TFmrTableDataServiceReader( TableDataService_(tableDataService), ReadAheadChunks_(settings.ReadAheadChunks) { + SetMinChunkInNewRange(); ReadAhead(); } @@ -40,12 +41,12 @@ size_t TFmrTableDataServiceReader::DoRead(void* buf, size_t len) { try { data = chunk.Data.GetValueSync(); } catch (...) { - ythrow yexception() << "Error reading chunk: " << chunk.Meta << "Error: " << CurrentExceptionMessage(); + ythrow yexception() << "Error reading chunk: " << chunk.Meta.ToString() << "Error: " << CurrentExceptionMessage(); } if (data) { DataBuffer_.Assign(data->data(), data->size()); } else { - ythrow yexception() << "No data for chunk:" << chunk.Meta; + ythrow yexception() << "No data for chunk:" << chunk.Meta.ToString(); } PendingChunks_.pop(); @@ -71,10 +72,17 @@ void TFmrTableDataServiceReader::ReadAhead() { CurrentChunk_++; } else { CurrentRange_++; + SetMinChunkInNewRange(); } } } +void TFmrTableDataServiceReader::SetMinChunkInNewRange() { + if (CurrentRange_ < TableRanges_.size()) { + CurrentChunk_ = TableRanges_[0].MinChunk; + } +} + bool TFmrTableDataServiceReader::Retry(const TMaybe<ui32>&, const TMaybe<ui64>&, const std::exception_ptr&) { return false; } @@ -85,4 +93,8 @@ bool TFmrTableDataServiceReader::HasRangeIndices() const { return false; } +TString TFmrTableDataServiceReader::TFmrChunkMeta::ToString() const { + return TStringBuilder() << TableId << ":" << PartId << ":" << std::to_string(Chunk); +} + } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h index d56bb0eb0d7..822865b58d7 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h @@ -13,11 +13,6 @@ struct TFmrReaderSettings { ui64 ReadAheadChunks = 1; }; -struct TPendingFmrChunk { - NThreading::TFuture<TMaybe<TString>> Data; - TFmrChunkMeta Meta; -}; - class TFmrTableDataServiceReader: public NYT::TRawTableReader { public: TFmrTableDataServiceReader( @@ -34,9 +29,24 @@ public: bool HasRangeIndices() const override; private: + struct TFmrChunkMeta { + TString TableId; + TString PartId; + ui64 Chunk = 0; + + TString ToString() const; + }; + + struct TPendingFmrChunk { + NThreading::TFuture<TMaybe<TString>> Data; + TFmrChunkMeta Meta; + }; + size_t DoRead(void* buf, size_t len) override; void ReadAhead(); + void SetMinChunkInNewRange(); + const TString TableId_; std::vector<TTableRange> TableRanges_; ITableDataService::TPtr TableDataService_; diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp index 139e57368cf..9063c48da33 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp @@ -1,6 +1,7 @@ #include "yql_yt_table_data_service_writer.h" #include <library/cpp/threading/future/wait/wait.h> #include <util/string/join.h> +#include <yql/essentials/utils/log/log.h> #include <yql/essentials/utils/yql_panic.h> @@ -27,7 +28,7 @@ void TFmrTableDataServiceWriter::DoWrite(const void* buf, size_t len) { } void TFmrTableDataServiceWriter::NotifyRowEnd() { - ++Rows_; + ++CurrentChunkRows_; if (TableContent_.size() >= MaxRowWeight_) { ythrow yexception() << "Current row size: " << TableContent_.size() << " is larger than max row weight: " << MaxRowWeight_; } @@ -74,17 +75,16 @@ void TFmrTableDataServiceWriter::PutRows() { } } ); - ++ChunkCount_; DataWeight_ += TableContent_.Size(); + PartIdChunkStats_.emplace_back(TChunkStats{.Rows = CurrentChunkRows_, .DataWeight = TableContent_.Size()}); + CurrentChunkRows_ = 0; + ++ChunkCount_; TableContent_.Clear(); } -TTableStats TFmrTableDataServiceWriter::GetStats() { - return TTableStats{ - .Chunks = ChunkCount_, - .Rows = Rows_, - .DataWeight = DataWeight_, - }; +TTableChunkStats TFmrTableDataServiceWriter::GetStats() { + YQL_CLOG(DEBUG, FastMapReduce) << " Finished writing to table data service for table Id: " << TableId_ << " and part Id " << PartId_ ; + return TTableChunkStats{.PartId = PartId_, .PartIdChunkStats = PartIdChunkStats_}; } } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h index 544b9fc3bd1..ca42498b276 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h @@ -10,8 +10,6 @@ namespace NYql::NFmr { - - struct TFmrWriterSettings { ui64 ChunkSize = 1024 * 1024; ui64 MaxInflightChunks = 1; @@ -27,7 +25,7 @@ public: const TFmrWriterSettings& settings = TFmrWriterSettings() ); - TTableStats GetStats(); + TTableChunkStats GetStats(); void NotifyRowEnd() override; @@ -44,7 +42,7 @@ private: const TString PartId_; ITableDataService::TPtr TableDataService_; ui64 DataWeight_ = 0; - ui64 Rows_ = 0; + ui64 CurrentChunkRows_ = 0; TBuffer TableContent_; const ui64 ChunkSize_; // size at which we push to table data service @@ -52,6 +50,7 @@ private: const ui64 MaxRowWeight_; ui64 ChunkCount_ = 0; + std::vector<TChunkStats> PartIdChunkStats_; struct TFmrWriterState { ui64 CurInflightChunks = 0; diff --git a/yt/yql/providers/yt/fmr/job/interface/ya.make b/yt/yql/providers/yt/fmr/job/interface/ya.make index 7d256622f7e..c2439772033 100644 --- a/yt/yql/providers/yt/fmr/job/interface/ya.make +++ b/yt/yql/providers/yt/fmr/job/interface/ya.make @@ -4,6 +4,10 @@ SRCS( yql_yt_job.cpp ) +PEERDIR( + yt/yql/providers/yt/fmr/request_options +) + YQL_LAST_ABI_VERSION() END() diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp index bcf702709c7..5977af62134 100644 --- a/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp +++ b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp @@ -10,7 +10,10 @@ namespace NYql::NFmr { TTask::TPtr CreateTestTask() { - auto input = TYtTableRef("test_cluster", "test_path"); + auto input = TYtTableTaskRef{ + .RichPaths = {NYT::TRichYPath("test_path").Cluster("test_cluster")}, + .FilePaths = {"test_file_path"} + }; auto output = TFmrTableOutputRef("test_table_id"); auto params = TDownloadTaskParams(input, output); return MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id"); diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp index 9c6c0b5b054..610d4f3b7cb 100644 --- a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp +++ b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp @@ -42,6 +42,10 @@ public: return future; } + ui64 GetMaxParallelJobCount() override { + return NumThreads_; + } + void Start() override { ThreadPool_ = CreateThreadPool(NumThreads_); } @@ -52,7 +56,7 @@ public: private: THolder<IThreadPool> ThreadPool_; - i32 NumThreads_; + ui64 NumThreads_; std::function<TJobResult(TTask::TPtr, std::shared_ptr<std::atomic<bool>>)> Function_; const TIntrusivePtr<IRandomProvider> RandomProvider_; }; diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h index 68ef1a5c379..77d41106e30 100644 --- a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h +++ b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h @@ -13,7 +13,7 @@ struct TJobResult { }; struct TFmrJobFactorySettings { - ui32 NumThreads = 3; + ui64 NumThreads = 3; std::function<TJobResult(TTask::TPtr, std::shared_ptr<std::atomic<bool>>)> Function; TIntrusivePtr<IRandomProvider> RandomProvider = CreateDefaultRandomProvider(); }; diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/ya.make b/yt/yql/providers/yt/fmr/job_factory/interface/ya.make index 32ecd264ae5..bef9aae0b2d 100644 --- a/yt/yql/providers/yt/fmr/job_factory/interface/ya.make +++ b/yt/yql/providers/yt/fmr/job_factory/interface/ya.make @@ -7,6 +7,7 @@ SRCS( PEERDIR( library/cpp/threading/future yql/essentials/utils + yt/yql/providers/yt/fmr/request_options ) YQL_LAST_ABI_VERSION() diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h index 27393ff3399..cfb2370d320 100644 --- a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h +++ b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h @@ -13,6 +13,8 @@ public: virtual ~IFmrJobFactory() = default; virtual NThreading::TFuture<TTaskState::TPtr> StartJob(TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) = 0; + + virtual ui64 GetMaxParallelJobCount() = 0; }; } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/proto/coordinator.proto b/yt/yql/providers/yt/fmr/proto/coordinator.proto index 34e16ed26b5..d3a9307dcd1 100644 --- a/yt/yql/providers/yt/fmr/proto/coordinator.proto +++ b/yt/yql/providers/yt/fmr/proto/coordinator.proto @@ -8,6 +8,7 @@ message THeartbeatRequest { uint32 WorkerId = 1; string VolatileId = 2; repeated TTaskState TaskStates = 3; + uint64 AvailableSlots = 4; } message THeartbeatResponse { @@ -33,6 +34,7 @@ message TStartOperationResponse { message TGetOperationResponse { EOperationStatus Status = 1; repeated TFmrError ErrorMessages = 2; + repeated TTableStats TableStats = 3; } message TDeleteOperationResponse { diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto index 60c8471846f..92db8d0b0e7 100644 --- a/yt/yql/providers/yt/fmr/proto/request_options.proto +++ b/yt/yql/providers/yt/fmr/proto/request_options.proto @@ -18,7 +18,6 @@ enum ETaskStatus { TASK_IN_PROGRESS = 2; TASK_FAILED = 3; TASK_COMPLETED = 4; - TASK_ABORTED = 5; } enum ETaskType { @@ -26,6 +25,7 @@ enum ETaskType { TASK_TYPE_DOWNLOAD = 1; TASK_TYPE_UPLOAD = 2; TASK_TYPE_MERGE = 3; + TASK_TYPE_MAP = 4; } enum EFmrComponent { @@ -47,6 +47,7 @@ message TFmrError { optional uint32 WorkerId = 4; optional string TaskId = 5; optional string OperationId = 6; + optional string JobId = 7; } message TYtTableRef { @@ -55,6 +56,11 @@ message TYtTableRef { optional string FilePath = 3; } +message TYtTableTaskRef { + repeated string RichPath = 1; + repeated string FilePath = 2; +} + message TFmrTableId { string Id = 1; } @@ -85,13 +91,23 @@ message TTableStats { uint64 DataWeight = 3; } -message TFmrStatisticsObject { - TFmrTableOutputRef Table = 1; - TTableStats Statistic = 2; +message TChunkStats { + uint64 Rows = 1; + uint64 DataWeight = 2; +} + +message TTableChunkStats { + string PartId = 1; + repeated TChunkStats PartIdChunkStats = 2; +} + +message TStatisticsObject { + TFmrTableOutputRef FmrTableOutputRef = 1; + TTableChunkStats TableChunkStats = 2; } message TStatistics { - repeated TFmrStatisticsObject OutputTables = 1; + repeated TStatisticsObject OutputTables = 1; } message TOperationTableRef { @@ -103,11 +119,15 @@ message TOperationTableRef { message TTaskTableRef { oneof TaskTableRef { - TYtTableRef YtTableRef = 1; + TYtTableTaskRef YtTableTaskRef = 1; TFmrTableInputRef FmrTableInputRef = 2; } } +message TTaskTableInputRef { + repeated TTaskTableRef Inputs = 1; +} + message TUploadOperationParams { TFmrTableRef Input = 1; TYtTableRef Output = 2; @@ -124,7 +144,7 @@ message TDownloadOperationParams { } message TDownloadTaskParams { - TYtTableRef Input = 1; + TYtTableTaskRef Input = 1; TFmrTableOutputRef Output = 2; } @@ -134,15 +154,28 @@ message TMergeOperationParams { } message TMergeTaskParams { - repeated TTaskTableRef Input = 1; + TTaskTableInputRef Input = 1; TFmrTableOutputRef Output = 2; } +message TMapOperationParams { + repeated TOperationTableRef Input = 1; + repeated TFmrTableRef Output = 2; + string Executable = 3; +} + +message TMapTaskParams { + TTaskTableInputRef Input = 1; + repeated TFmrTableOutputRef Output = 2; + string Executable = 3; +} + message TOperationParams { oneof TOperationParams { TUploadOperationParams UploadOperationParams = 1; TDownloadOperationParams DownloadOperationParams = 2; TMergeOperationParams MergeOperationParams = 3; + TMapOperationParams MapOperationParams = 4; } } @@ -151,6 +184,7 @@ message TTaskParams { TUploadTaskParams UploadTaskParams = 1; TDownloadTaskParams DownloadTaskParams = 2; TMergeTaskParams MergeTaskParams = 3; + TMapTaskParams MapTaskParams = 4; } } diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make b/yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make index 36f1b3ebcaf..5e1ec3d0437 100644 --- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make @@ -5,6 +5,7 @@ SRCS( ) PEERDIR( + yt/cpp/mapreduce/common yt/yql/providers/yt/fmr/proto ) diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp index b0c368008dc..34f01f4884d 100644 --- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp @@ -1,5 +1,7 @@ #include "yql_yt_request_proto_helpers.h" #include <library/cpp/yson/node/node_io.h> +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/interface/serialize.h> namespace NYql::NFmr { @@ -17,6 +19,7 @@ NProto::TFmrError FmrErrorToProto(const TFmrError& error) { if (error.OperationId) { protoError.SetOperationId(*error.OperationId); } + protoError.SetJobId(*error.JobId); return protoError; } @@ -34,6 +37,7 @@ TFmrError FmrErrorFromProto(const NProto::TFmrError& protoError) { if (protoError.HasOperationId()) { fmrError.OperationId = protoError.GetOperationId(); } + fmrError.JobId = protoError.GetJobId(); return fmrError; } @@ -57,6 +61,32 @@ TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef) { return ytTableRef; } +NProto::TYtTableTaskRef YtTableTaskRefToProto(const TYtTableTaskRef& ytTableTaskRef) { + NProto::TYtTableTaskRef protoYtTableTaskRef; + for (auto& richPath: ytTableTaskRef.RichPaths) { + TString serializedRichPath = NYT::NodeToYsonString(NYT::PathToNode(richPath)); + protoYtTableTaskRef.AddRichPath(serializedRichPath); + } + for (auto& filePath: ytTableTaskRef.FilePaths) { + protoYtTableTaskRef.AddFilePath(filePath); + } + return protoYtTableTaskRef; +} + +TYtTableTaskRef YtTableTaskRefFromProto(const NProto::TYtTableTaskRef protoYtTableTaskRef) { + TYtTableTaskRef ytTableTaskRef; + for (auto& serializedPath: protoYtTableTaskRef.GetRichPath()) { + auto node = NYT::NodeFromYsonString(serializedPath); + NYT::TRichYPath richPath; + NYT::Deserialize(richPath, node); + ytTableTaskRef.RichPaths.emplace_back(richPath); + } + for (auto& filePath: protoYtTableTaskRef.GetFilePath()) { + ytTableTaskRef.FilePaths.emplace_back(filePath); + } + return ytTableTaskRef; +} + NProto::TFmrTableId FmrTableIdToProto(const TFmrTableId& fmrTableId) { NProto::TFmrTableId protoFmrTableId; protoFmrTableId.SetId(fmrTableId.Id); @@ -148,14 +178,47 @@ TTableStats TableStatsFromProto(const NProto::TTableStats& protoTableStats) { }; } +NProto::TChunkStats ChunkStatsToProto(const TChunkStats& chunkStats) { + NProto::TChunkStats protoChunkStats; + protoChunkStats.SetRows(chunkStats.Rows); + protoChunkStats.SetDataWeight(chunkStats.DataWeight); + return protoChunkStats; +} + +TChunkStats ChunkStatsFromProto(const NProto::TChunkStats& protoChunkStats) { + return TChunkStats{.Rows = protoChunkStats.GetRows(), .DataWeight = protoChunkStats.GetDataWeight()}; +} + +NProto::TTableChunkStats TableChunkStatsToProto(const TTableChunkStats& tableChunkStats) { + NProto::TTableChunkStats protoTableChunkStats; + protoTableChunkStats.SetPartId(tableChunkStats.PartId); + for (auto& chunkStats: tableChunkStats.PartIdChunkStats) { + NProto::TChunkStats protoChunkStats = ChunkStatsToProto(chunkStats); + auto* curPartIdChunkStats = protoTableChunkStats.AddPartIdChunkStats(); + curPartIdChunkStats->Swap(&protoChunkStats); + } + return protoTableChunkStats; +} + +TTableChunkStats TableChunkStatsFromProto(const NProto::TTableChunkStats& protoTableChunkStats) { + TTableChunkStats tableChunkStats; + tableChunkStats.PartId = protoTableChunkStats.GetPartId(); + std::vector<TChunkStats> partIdChunkStats; + for (auto& stat: protoTableChunkStats.GetPartIdChunkStats()) { + partIdChunkStats.emplace_back(ChunkStatsFromProto(stat)); + } + tableChunkStats.PartIdChunkStats = partIdChunkStats; + return tableChunkStats; +} + NProto::TStatistics StatisticsToProto(const TStatistics& stats) { NProto::TStatistics protoStatistics; - for (auto& [fmrTableOutputRef, tableStat]: stats.OutputTables) { - NProto::TFmrTableOutputRef protoFmrTableOutputref = FmrTableOutputRefToProto(fmrTableOutputRef); - NProto::TTableStats protoStats = TableStatsToProto(tableStat); - NProto::TFmrStatisticsObject statTableObject; - statTableObject.MutableTable()->Swap(&protoFmrTableOutputref); - statTableObject.MutableStatistic()->Swap(&protoStats); + for (auto& [fmrTableOutputRef, tableChunkStats]: stats.OutputTables) { + NProto::TFmrTableOutputRef protoFmrTableOutputRef = FmrTableOutputRefToProto(fmrTableOutputRef); + NProto::TTableChunkStats protoTableChunkStats = TableChunkStatsToProto(tableChunkStats); + NProto::TStatisticsObject statTableObject; + statTableObject.MutableFmrTableOutputRef()->Swap(&protoFmrTableOutputRef); + statTableObject.MutableTableChunkStats()->Swap(&protoTableChunkStats); auto* curOutputTable = protoStatistics.AddOutputTables(); curOutputTable->Swap(&statTableObject); } @@ -163,12 +226,12 @@ NProto::TStatistics StatisticsToProto(const TStatistics& stats) { } TStatistics StatisticsFromProto(const NProto::TStatistics& protoStats) { - std::unordered_map<TFmrTableOutputRef, TTableStats> outputTables; + std::unordered_map<TFmrTableOutputRef, TTableChunkStats> outputTables; for (size_t i = 0; i < protoStats.OutputTablesSize(); ++i) { - NProto::TFmrStatisticsObject protoStatTableObject = protoStats.GetOutputTables(i); - TFmrTableOutputRef fmrTableOutputRef = FmrTableOutputRefFromProto(protoStatTableObject.GetTable()); - TTableStats tableStats = TableStatsFromProto(protoStatTableObject.GetStatistic()); - outputTables[fmrTableOutputRef] = tableStats; + NProto::TStatisticsObject protoStatTableObject = protoStats.GetOutputTables(i); + TFmrTableOutputRef fmrTableOutputRef = FmrTableOutputRefFromProto(protoStatTableObject.GetFmrTableOutputRef()); + TTableChunkStats tableChunkStats = TableChunkStatsFromProto(protoStatTableObject.GetTableChunkStats()); + outputTables[fmrTableOutputRef] = tableChunkStats; } return TStatistics{.OutputTables = outputTables}; } @@ -193,14 +256,14 @@ TOperationTableRef OperationTableRefFromProto(const NProto::TOperationTableRef& } else { tableRef = FmrTableRefFromProto(protoOperationTableRef.GetFmrTableRef()); } - return {tableRef}; + return tableRef; } NProto::TTaskTableRef TaskTableRefToProto(const TTaskTableRef& taskTableRef) { NProto::TTaskTableRef protoTaskTableRef; - if (auto* ytTableRefPtr = std::get_if<TYtTableRef>(&taskTableRef)) { - NProto::TYtTableRef protoYtTableRef = YtTableRefToProto(*ytTableRefPtr); - protoTaskTableRef.MutableYtTableRef()->Swap(&protoYtTableRef); + if (auto* ytTableTaskRefPtr = std::get_if<TYtTableTaskRef>(&taskTableRef)) { + NProto::TYtTableTaskRef protoYtTableTaskRef = YtTableTaskRefToProto(*ytTableTaskRefPtr); + protoTaskTableRef.MutableYtTableTaskRef()->Swap(&protoYtTableTaskRef); } else { auto* fmrTableInputRefPtr = std::get_if<TFmrTableInputRef>(&taskTableRef); NProto::TFmrTableInputRef protoFmrTableInputRef = FmrTableInputRefToProto(*fmrTableInputRefPtr); @@ -211,13 +274,31 @@ NProto::TTaskTableRef TaskTableRefToProto(const TTaskTableRef& taskTableRef) { } TTaskTableRef TaskTableRefFromProto(const NProto::TTaskTableRef& protoTaskTableRef) { - std::variant<TYtTableRef, TFmrTableInputRef> tableRef; - if (protoTaskTableRef.HasYtTableRef()) { - tableRef = YtTableRefFromProto(protoTaskTableRef.GetYtTableRef()); + std::variant<TYtTableTaskRef, TFmrTableInputRef> tableRef; + if (protoTaskTableRef.HasYtTableTaskRef()) { + tableRef = YtTableTaskRefFromProto(protoTaskTableRef.GetYtTableTaskRef()); } else { tableRef = FmrTableInputRefFromProto(protoTaskTableRef.GetFmrTableInputRef()); } - return {tableRef}; + return tableRef; +} + +NProto::TTaskTableInputRef TaskTableInputRefToProto(const TTaskTableInputRef& taskTableInputRef) { + NProto::TTaskTableInputRef protoTaskTableInputRef; + for (auto& taskTableRef: taskTableInputRef.Inputs) { + auto protoTaskTableRef = TaskTableRefToProto(taskTableRef); + auto* curInput = protoTaskTableInputRef.AddInputs(); + curInput->Swap(&protoTaskTableRef); + } + return protoTaskTableInputRef; +} + +TTaskTableInputRef TaskTableInputRefFromProto(const NProto::TTaskTableInputRef& protoTaskTableInputRef) { + std::vector<TTaskTableRef> inputs; + for (auto& protoTaskTableRef: protoTaskTableInputRef.GetInputs()) { + inputs.emplace_back(TaskTableRefFromProto(protoTaskTableRef)); + } + return TTaskTableInputRef{.Inputs = inputs}; } NProto::TUploadOperationParams UploadOperationParamsToProto(const TUploadOperationParams& uploadOperationParams) { @@ -229,6 +310,13 @@ NProto::TUploadOperationParams UploadOperationParamsToProto(const TUploadOperati return protoUploadOperationParams; } +TUploadOperationParams UploadOperationParamsFromProto(const NProto::TUploadOperationParams& protoUploadOperationParams) { + return TUploadOperationParams( + FmrTableRefFromProto(protoUploadOperationParams.GetInput()), + YtTableRefFromProto(protoUploadOperationParams.GetOutput()) + ); +} + NProto::TUploadTaskParams UploadTaskParamsToProto(const TUploadTaskParams& uploadTaskParams) { NProto::TUploadTaskParams protoUploadTaskParams; auto input = FmrTableInputRefToProto(uploadTaskParams.Input); @@ -238,13 +326,6 @@ NProto::TUploadTaskParams UploadTaskParamsToProto(const TUploadTaskParams& uploa return protoUploadTaskParams; } -TUploadOperationParams UploadOperationParamsFromProto(const NProto::TUploadOperationParams& protoUploadOperationParams) { - return TUploadOperationParams( - FmrTableRefFromProto(protoUploadOperationParams.GetInput()), - YtTableRefFromProto(protoUploadOperationParams.GetOutput()) - ); -} - TUploadTaskParams UploadTaskParamsFromProto(const NProto::TUploadTaskParams& protoUploadTaskParams) { TUploadTaskParams uploadTaskParams; uploadTaskParams.Input = FmrTableInputRefFromProto(protoUploadTaskParams.GetInput()); @@ -261,25 +342,25 @@ NProto::TDownloadOperationParams DownloadOperationParamsToProto(const TDownloadO return protoDownloadOperationParams; } +TDownloadOperationParams DownloadOperationParamsFromProto(const NProto::TDownloadOperationParams& protoDownloadOperationParams) { + return TDownloadOperationParams( + YtTableRefFromProto(protoDownloadOperationParams.GetInput()), + FmrTableRefFromProto(protoDownloadOperationParams.GetOutput()) + ); +} + NProto::TDownloadTaskParams DownloadTaskParamsToProto(const TDownloadTaskParams& downloadTaskParams) { NProto::TDownloadTaskParams protoDownloadTaskParams; - auto input = YtTableRefToProto(downloadTaskParams.Input); + auto input = YtTableTaskRefToProto(downloadTaskParams.Input); auto output = FmrTableOutputRefToProto(downloadTaskParams.Output); protoDownloadTaskParams.MutableInput()->Swap(&input); protoDownloadTaskParams.MutableOutput()->Swap(&output); return protoDownloadTaskParams; } -TDownloadOperationParams DownloadOperationParamsFromProto(const NProto::TDownloadOperationParams& protoDownloadOperationParams) { - return TDownloadOperationParams( - YtTableRefFromProto(protoDownloadOperationParams.GetInput()), - FmrTableRefFromProto(protoDownloadOperationParams.GetOutput()) - ); -} - TDownloadTaskParams DownloadTaskParamsFromProto(const NProto::TDownloadTaskParams& protoDownloadTaskParams) { TDownloadTaskParams downloadTaskParams; - downloadTaskParams.Input = YtTableRefFromProto(protoDownloadTaskParams.GetInput()); + downloadTaskParams.Input = YtTableTaskRefFromProto(protoDownloadTaskParams.GetInput()); downloadTaskParams.Output = FmrTableOutputRefFromProto(protoDownloadTaskParams.GetOutput()); return downloadTaskParams; } @@ -296,18 +377,6 @@ NProto::TMergeOperationParams MergeOperationParamsToProto(const TMergeOperationP return protoMergeOperationParams; } -NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTaskParams) { - NProto::TMergeTaskParams protoMergeTaskParams; - for (size_t i = 0; i < mergeTaskParams.Input.size(); ++i) { - auto inputTable = TaskTableRefToProto(mergeTaskParams.Input[i]); - auto* curInput = protoMergeTaskParams.AddInput(); - curInput->Swap(&inputTable); - } - auto outputTable = FmrTableOutputRefToProto(mergeTaskParams.Output); - protoMergeTaskParams.MutableOutput()->Swap(&outputTable); - return protoMergeTaskParams; -} - TMergeOperationParams MergeOperationParamsFromProto(const NProto::TMergeOperationParams& protoMergeOperationParams) { TMergeOperationParams mergeOperationParams( {}, @@ -320,18 +389,72 @@ TMergeOperationParams MergeOperationParamsFromProto(const NProto::TMergeOperatio return mergeOperationParams; } +NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTaskParams) { + NProto::TMergeTaskParams protoMergeTaskParams; + auto inputTables = TaskTableInputRefToProto(mergeTaskParams.Input); + protoMergeTaskParams.MutableInput()->Swap(&inputTables); + auto outputTable = FmrTableOutputRefToProto(mergeTaskParams.Output); + protoMergeTaskParams.MutableOutput()->Swap(&outputTable); + return protoMergeTaskParams; +} + TMergeTaskParams MergeTaskParamsFromProto(const NProto::TMergeTaskParams& protoMergeTaskParams) { TMergeTaskParams mergeTaskParams; - std::vector<TTaskTableRef> input; - for (size_t i = 0; i < protoMergeTaskParams.InputSize(); ++i) { - TTaskTableRef inputTable = TaskTableRefFromProto(protoMergeTaskParams.GetInput(i)); - input.emplace_back(inputTable); - } - mergeTaskParams.Input = input; + mergeTaskParams.Input = TaskTableInputRefFromProto(protoMergeTaskParams.GetInput()); mergeTaskParams.Output = FmrTableOutputRefFromProto(protoMergeTaskParams.GetOutput()); return mergeTaskParams; } +NProto::TMapOperationParams MapOperationParamsToProto(const TMapOperationParams& mapOperationParams) { + NProto::TMapOperationParams protoMapOperationParams; + for (auto& operationTableRef: mapOperationParams.Input) { + auto protoOperationTableRef = OperationTableRefToProto(operationTableRef); + protoMapOperationParams.AddInput()->Swap(&protoOperationTableRef); + } + for (auto& fmrTableRef: mapOperationParams.Output) { + auto protoFmrTableRef = FmrTableRefToProto(fmrTableRef); + protoMapOperationParams.AddOutput()->Swap(&protoFmrTableRef); + } + protoMapOperationParams.SetExecutable(mapOperationParams.Executable); + return protoMapOperationParams; +} + +TMapOperationParams MapOperationParamsFromProto(const NProto::TMapOperationParams& protoMapOperationParams) { + std::vector<TOperationTableRef> inputTables; + std::vector<TFmrTableRef> outputTables; + for (auto& protoOperationTableRef: protoMapOperationParams.GetInput()) { + inputTables.emplace_back(OperationTableRefFromProto(protoOperationTableRef)); + } + for (auto& protoFmrTableRef: protoMapOperationParams.GetOutput()) { + outputTables.emplace_back(FmrTableRefFromProto(protoFmrTableRef)); + } + return TMapOperationParams{.Input = inputTables, .Output = outputTables, .Executable = protoMapOperationParams.GetExecutable()}; +} + +NProto::TMapTaskParams MapTaskParamsToProto(const TMapTaskParams& mapTaskParams) { + NProto::TMapTaskParams protoMapTaskParams; + auto protoTaskTableInputRef = TaskTableInputRefToProto(mapTaskParams.Input); + protoMapTaskParams.MutableInput()->Swap(&protoTaskTableInputRef); + for (auto& fmrTableOutputRef: mapTaskParams.Output) { + auto protoFmrTableOutputRef = FmrTableOutputRefToProto(fmrTableOutputRef); + protoMapTaskParams.AddOutput()->Swap(&protoFmrTableOutputRef); + } + protoMapTaskParams.SetExecutable(mapTaskParams.Executable); + return protoMapTaskParams; +} + +TMapTaskParams MapTaskParamsFromProto(const NProto::TMapTaskParams& protoMapTaskParams) { + TMapTaskParams mapTaskParams; + mapTaskParams.Input = TaskTableInputRefFromProto(protoMapTaskParams.GetInput()); + std::vector<TFmrTableOutputRef> outputTables; + for (auto& protoFmrTableOutputRef: protoMapTaskParams.GetOutput()) { + outputTables.emplace_back(FmrTableOutputRefFromProto(protoFmrTableOutputRef)); + } + mapTaskParams.Output = outputTables; + mapTaskParams.Executable = protoMapTaskParams.GetExecutable(); + return mapTaskParams; +} + NProto::TOperationParams OperationParamsToProto(const TOperationParams& operationParams) { NProto::TOperationParams protoOperationParams; if (auto* uploadOperationParamsPtr = std::get_if<TUploadOperationParams>(&operationParams)) { @@ -340,14 +463,29 @@ NProto::TOperationParams OperationParamsToProto(const TOperationParams& operatio } else if (auto* downloadOperationParamsPtr = std::get_if<TDownloadOperationParams>(&operationParams)) { NProto::TDownloadOperationParams protoDownloadOperationParams = DownloadOperationParamsToProto(*downloadOperationParamsPtr); protoOperationParams.MutableDownloadOperationParams()->Swap(&protoDownloadOperationParams); - } else { - auto* mergeOperationParamsPtr = std::get_if<TMergeOperationParams>(&operationParams); + } else if (auto* mergeOperationParamsPtr = std::get_if<TMergeOperationParams>(&operationParams)) { NProto::TMergeOperationParams protoMergeOperationParams = MergeOperationParamsToProto(*mergeOperationParamsPtr); protoOperationParams.MutableMergeOperationParams()->Swap(&protoMergeOperationParams); + } else { + auto* mapOperationParamsPtr = std::get_if<TMapOperationParams>(&operationParams); + NProto::TMapOperationParams protoMapOperationParams = MapOperationParamsToProto(*mapOperationParamsPtr); + protoOperationParams.MutableMapOperationParams()->Swap(&protoMapOperationParams); } return protoOperationParams; } +TOperationParams OperationParamsFromProto(const NProto::TOperationParams& protoOperationParams) { + if (protoOperationParams.HasDownloadOperationParams()) { + return DownloadOperationParamsFromProto(protoOperationParams.GetDownloadOperationParams()); + } else if (protoOperationParams.HasUploadOperationParams()) { + return UploadOperationParamsFromProto(protoOperationParams.GetUploadOperationParams()); + } else if (protoOperationParams.HasMergeOperationParams()) { + return MergeOperationParamsFromProto(protoOperationParams.GetMergeOperationParams()); + } else { + return MapOperationParamsFromProto(protoOperationParams.GetMapOperationParams()); + } +} + NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams) { NProto::TTaskParams protoTaskParams; if (auto* uploadTaskParamsPtr = std::get_if<TUploadTaskParams>(&taskParams)) { @@ -356,22 +494,15 @@ NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams) { } else if (auto* downloadTaskParamsPtr = std::get_if<TDownloadTaskParams>(&taskParams)) { NProto::TDownloadTaskParams protoDownloadTaskParams = DownloadTaskParamsToProto(*downloadTaskParamsPtr); protoTaskParams.MutableDownloadTaskParams()->Swap(&protoDownloadTaskParams); - } else { - auto* mergeTaskParamsPtr = std::get_if<TMergeTaskParams>(&taskParams); + } else if (auto* mergeTaskParamsPtr = std::get_if<TMergeTaskParams>(&taskParams)) { NProto::TMergeTaskParams protoMergeTaskParams = MergeTaskParamsToProto(*mergeTaskParamsPtr); protoTaskParams.MutableMergeTaskParams()->Swap(&protoMergeTaskParams); - } - return protoTaskParams; -} - -TOperationParams OperationParamsFromProto(const NProto::TOperationParams& protoOperationParams) { - if (protoOperationParams.HasDownloadOperationParams()) { - return DownloadOperationParamsFromProto(protoOperationParams.GetDownloadOperationParams()); - } else if (protoOperationParams.HasUploadOperationParams()) { - return UploadOperationParamsFromProto(protoOperationParams.GetUploadOperationParams()); } else { - return MergeOperationParamsFromProto(protoOperationParams.GetMergeOperationParams()); + auto* mapTaskParamsPtr = std::get_if<TMapTaskParams>(&taskParams); + NProto::TMapTaskParams protoMapTaskParams = MapTaskParamsToProto(*mapTaskParamsPtr); + protoTaskParams.MutableMapTaskParams()->Swap(&protoMapTaskParams); } + return protoTaskParams; } TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) { @@ -380,8 +511,10 @@ TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) { taskParams = DownloadTaskParamsFromProto(protoTaskParams.GetDownloadTaskParams()); } else if (protoTaskParams.HasUploadTaskParams()) { taskParams = UploadTaskParamsFromProto(protoTaskParams.GetUploadTaskParams()); - } else { + } else if (protoTaskParams.HasMergeTaskParams()) { taskParams = MergeTaskParamsFromProto(protoTaskParams.GetMergeTaskParams()); + } else { + taskParams = MapTaskParamsFromProto(protoTaskParams.GetMapTaskParams()); } return taskParams; } @@ -414,7 +547,7 @@ NProto::TTask TaskToProto(const TTask& task) { protoTask.MutableTaskParams()->Swap(&taskParams); protoTask.SetSessionId(task.SessionId); protoTask.SetNumRetries(task.NumRetries); - auto clusterConnections = *protoTask.MutableClusterConnections(); + auto& clusterConnections = *protoTask.MutableClusterConnections(); for (auto& [tableName, conn]: task.ClusterConnections) { clusterConnections[tableName.Id] = ClusterConnectionToProto(conn); } diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h index 3f7ee93725b..cfeb5a424e9 100644 --- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h @@ -13,6 +13,14 @@ NProto::TYtTableRef YtTableRefToProto(const TYtTableRef& ytTableRef); TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef); +NProto::TYtTableTaskRef YtTableTaskRefToProto(const TYtTableTaskRef& ytTableTaskRef); + +TYtTableTaskRef YtTableTaskRefFromProto(const NProto::TYtTableTaskRef protoYtTableTaskRef); + +NProto::TFmrTableId FmrTableIdToProto(const TFmrTableId& fmrTableId); + +TFmrTableId FmrTableIdFromProto(const NProto::TFmrTableId& protoFmrTableId); + NProto::TFmrTableRef FmrTableRefToProto(const TFmrTableRef& fmrTableRef); TFmrTableRef FmrTableRefFromProto(const NProto::TFmrTableRef protoFmrTableRef); @@ -33,6 +41,14 @@ NProto::TTableStats TableStatsToProto(const TTableStats& tableStats); TTableStats TableStatsFromProto(const NProto::TTableStats& protoTableStats); +NProto::TChunkStats ChunkStatsToProto(const TChunkStats& chunkStats); + +TChunkStats ChunkStatsFromProto(const NProto::TChunkStats& protoChunkStats); + +NProto::TTableChunkStats TableChunkStatsToProto(const TTableChunkStats& tableChunkStats); + +TTableChunkStats TableChunkStatsFromProto(const NProto::TTableChunkStats& protoTableChunkStats); + NProto::TStatistics StatisticsToProto(const TStatistics& stats); TStatistics StatisticsFromProto(const NProto::TStatistics& protoStats); @@ -45,6 +61,10 @@ NProto::TTaskTableRef TaskTableRefToProto(const TTaskTableRef& taskTableRef); TTaskTableRef TaskTableRefFromProto(const NProto::TTaskTableRef& protoTaskTableRef); +NProto::TTaskTableInputRef TaskTableInputRefToProto(const TTaskTableInputRef& taskTableInputRef); + +TTaskTableInputRef TaskTableInputRefFromProto(const NProto::TTaskTableInputRef& protoTaskTableInputRef); + NProto::TUploadOperationParams UploadOperationParamsToProto(const TUploadOperationParams& uploadOperationParams); TUploadOperationParams UploadOperationParamsFromProto(const NProto::TUploadOperationParams& protoUploadOperationParams); @@ -69,6 +89,14 @@ NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTas TMergeTaskParams MergeTaskParamsFromProto(const NProto::TMergeTaskParams& protoMergeTaskParams); +NProto::TMapOperationParams MapOperationParamsToProto(const TMapOperationParams& mapOperationParams); + +TMapOperationParams MapOperationParamsFromProto(const NProto::TMapOperationParams& protoMapOperationParams); + +NProto::TMapTaskParams MapTaskParamsToProto(const TMapTaskParams& mapTaskParams); + +TMapTaskParams MapTaskParamsFromProto(const NProto::TMapTaskParams& protoMapTaskParams); + NProto::TOperationParams OperationParamsToProto(const TOperationParams& operationParams); TOperationParams OperationParamsFromProto(const NProto::TOperationParams& protoOperationParams); diff --git a/yt/yql/providers/yt/fmr/request_options/ya.make b/yt/yql/providers/yt/fmr/request_options/ya.make index 4e74eb8b185..ec57848ad29 100644 --- a/yt/yql/providers/yt/fmr/request_options/ya.make +++ b/yt/yql/providers/yt/fmr/request_options/ya.make @@ -7,6 +7,8 @@ SRCS( PEERDIR( library/cpp/yson/node library/cpp/threading/future + yt/cpp/mapreduce/common + yt/cpp/mapreduce/interface yql/essentials/public/issue ) diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp index 6da844918da..a013f67329a 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp @@ -1,4 +1,5 @@ #include "yql_yt_request_options.h" +#include <yt/cpp/mapreduce/common/helpers.h> namespace NYql::NFmr { @@ -18,10 +19,6 @@ TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, co return MakeIntrusive<TTaskState>(taskStatus, taskId, taskErrorMessage, stats); } -TString TFmrChunkMeta::ToString() const { - return TStringBuilder() << TableId << ":" << PartId << ":" << std::to_string(Chunk); -} - } // namespace NYql::NFmr template<> @@ -41,11 +38,56 @@ void Out<NYql::NFmr::TFmrError>(IOutputStream& out, const NYql::NFmr::TFmrError& } template<> -void Out<NYql::NFmr::TFmrChunkMeta>(IOutputStream& out, const NYql::NFmr::TFmrChunkMeta& meta) { - out << meta.ToString(); +void Out<NYql::NFmr::TTableStats>(IOutputStream& out, const NYql::NFmr::TTableStats& tableStats) { + out << tableStats.Chunks << " chunks, " << tableStats.Rows << " rows, " << tableStats.DataWeight << " data weight"; } template<> -void Out<NYql::NFmr::TTableStats>(IOutputStream& out, const NYql::NFmr::TTableStats& tableStats) { - out << tableStats.Chunks << " chunks, " << tableStats.Rows << " rows, " << tableStats.DataWeight << " data weight"; +void Out<NYql::NFmr::TTableRange>(IOutputStream& out, const NYql::NFmr::TTableRange& range) { + out << "TableRange with part id: " << range.PartId << " , min chunk: " << range.MinChunk << " , max chunk: " << range.MaxChunk << "\n"; +} + +template<> +void Out<NYql::NFmr::TFmrTableInputRef>(IOutputStream& out, const NYql::NFmr::TFmrTableInputRef& inputRef) { + out << "FmrTableInputRef consisting of " << inputRef.TableRanges.size() << " table ranges:\n"; + out << "TableId: " << inputRef.TableId << "\n"; + for (auto& range: inputRef.TableRanges) { + out << range; + } +} + +template<> +void Out<NYql::NFmr::TYtTableTaskRef>(IOutputStream& out, const NYql::NFmr::TYtTableTaskRef& ytTableTaskRef) { + if (!ytTableTaskRef.FilePaths.empty()) { + out << "YtTableTaskRef consisting of " << ytTableTaskRef.FilePaths.size() << " file paths:\n"; + for (auto& filePath: ytTableTaskRef.FilePaths) { + out << filePath << " "; + } + } else { + out << "YtTableTaskRef consisting of " << ytTableTaskRef.RichPaths.size() << " rich yt paths:\n"; + for (auto& richPath: ytTableTaskRef.RichPaths) { + out << NodeToYsonString(NYT::PathToNode(richPath)) << "\n"; + } + } +} + +template<> +void Out<NYql::NFmr::TTaskTableRef>(IOutputStream& out, const NYql::NFmr::TTaskTableRef& taskTableRef) { + if (auto* ytTableTaskRef = std::get_if<NYql::NFmr::TYtTableTaskRef>(&taskTableRef)) { + out << *ytTableTaskRef; + } else { + out << std::get<NYql::NFmr::TFmrTableInputRef>(taskTableRef); + } +} + +template<> +void Out<NYql::NFmr::TTaskTableInputRef>(IOutputStream& out, const NYql::NFmr::TTaskTableInputRef& taskTableInputRef) { + for (auto& taskTableRef: taskTableInputRef.Inputs) { + out << taskTableRef; + } +} + +template<> +void Out<NYql::NFmr::TChunkStats>(IOutputStream& out, const NYql::NFmr::TChunkStats& chunkStats) { + out << chunkStats.Rows << " rows " << chunkStats.DataWeight << " dataWeight\n"; } diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h index 743c20ea339..53bf1d3d4b5 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h @@ -7,6 +7,8 @@ #include <util/string/builder.h> #include <vector> +#include <yt/cpp/mapreduce/interface/common.h> + namespace NYql::NFmr { enum class EOperationStatus { @@ -44,7 +46,8 @@ enum class EFmrComponent { enum class EFmrErrorReason { ReasonUnknown, - UserError // TODO Add more reasons + UserError + // TODO - return FallbackQuery or FallbackOperation instead of UserError, pass info to gateway. }; struct TFmrError { @@ -66,9 +69,18 @@ struct TYtTableRef { TString Cluster; TMaybe<TString> FilePath = Nothing(); + // TODO - maybe just use TRichYPath here also instead of Path and Cluster? + bool operator == (const TYtTableRef&) const = default; }; +struct TYtTableTaskRef { + std::vector<NYT::TRichYPath> RichPaths; + std::vector<TString> FilePaths; + + bool operator == (const TYtTableTaskRef&) const = default; +}; // corresponds to a partition of several yt input tables. + struct TFmrTableId { TString Id; @@ -83,26 +95,21 @@ struct TFmrTableId { struct TFmrTableRef { TFmrTableId FmrTableId; + bool operator == (const TFmrTableRef&) const = default; }; struct TTableRange { TString PartId; ui64 MinChunk = 0; ui64 MaxChunk = 1; -}; - -struct TFmrChunkMeta { - TString TableId; - TString PartId; - ui64 Chunk = 0; - - TString ToString() const; -}; + bool operator == (const TTableRange&) const = default; +}; // Corresnponds to range [MinChunk, MaxChunk) struct TFmrTableInputRef { TString TableId; std::vector<TTableRange> TableRanges; -}; + bool operator == (const TFmrTableInputRef&) const = default; +}; // Corresponds to part of table with fixed TableId but several PartIds, Empty TablesRanges means that this table is not present in task. struct TFmrTableOutputRef { TString TableId; @@ -118,6 +125,18 @@ struct TTableStats { bool operator == (const TTableStats&) const = default; }; +struct TChunkStats { + ui64 Rows = 0; + ui64 DataWeight = 0; + bool operator == (const TChunkStats&) const = default; +}; + +struct TTableChunkStats { + TString PartId; + std::vector<TChunkStats> PartIdChunkStats; + bool operator == (const TTableChunkStats&) const = default; +}; // detailed statistics for all chunks in partition + } // namespace NYql::NFmr namespace std { @@ -147,12 +166,12 @@ namespace std { namespace NYql::NFmr { struct TStatistics { - std::unordered_map<TFmrTableOutputRef, TTableStats> OutputTables; + std::unordered_map<TFmrTableOutputRef, TTableChunkStats> OutputTables; }; using TOperationTableRef = std::variant<TYtTableRef, TFmrTableRef>; -using TTaskTableRef = std::variant<TYtTableRef, TFmrTableInputRef>; +using TTaskTableRef = std::variant<TYtTableTaskRef, TFmrTableInputRef>; struct TUploadOperationParams { TFmrTableRef Input; @@ -170,7 +189,7 @@ struct TDownloadOperationParams { }; struct TDownloadTaskParams { - TYtTableRef Input; + TYtTableTaskRef Input; TFmrTableOutputRef Output; }; @@ -179,8 +198,12 @@ struct TMergeOperationParams { TFmrTableRef Output; }; +struct TTaskTableInputRef { + std::vector<TTaskTableRef> Inputs; +}; // Corresponds to task input tables, which can consist parts of either fmr or yt input tables. + struct TMergeTaskParams { - std::vector<TTaskTableRef> Input; + TTaskTableInputRef Input; TFmrTableOutputRef Output; }; @@ -191,7 +214,7 @@ struct TMapOperationParams { }; struct TMapTaskParams { - std::vector<TTaskTableRef> Input; + TTaskTableInputRef Input; std::vector<TFmrTableOutputRef> Output; TString Executable; }; diff --git a/yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.cpp b/yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.cpp index cd1e1a21699..b901fd3de89 100644 --- a/yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.cpp +++ b/yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.cpp @@ -1,5 +1,7 @@ #include "yql_yt_table_data_service_local.h" +#include <util/system/mutex.h> #include <yt/yql/providers/yt/fmr/utils/yql_yt_table_data_service_key.h> +#include <yql/essentials/utils/log/log.h> namespace NYql::NFmr { @@ -12,6 +14,8 @@ public: } NThreading::TFuture<void> Put(const TString& key, const TString& value) { + TGuard<TMutex> guard(Mutex_); + YQL_CLOG(TRACE, FastMapReduce) << "Putting key " << key << " to local table data service"; auto& map = Data_[std::hash<TString>()(key) % NumParts_]; auto it = map.find(key); if (it != map.end()) { @@ -22,6 +26,8 @@ public: } NThreading::TFuture<TMaybe<TString>> Get(const TString& key) { + TGuard<TMutex> guard(Mutex_); + YQL_CLOG(TRACE, FastMapReduce) << "Getting key " << key << " from local table data service"; TMaybe<TString> value = Nothing(); auto& map = Data_[std::hash<TString>()(key) % NumParts_]; auto it = map.find(key); @@ -32,6 +38,8 @@ public: } NThreading::TFuture<void> Delete(const TString& key) { + TGuard<TMutex> guard(Mutex_); + YQL_CLOG(TRACE, FastMapReduce) << "Deleting key " << key << " from local table data service"; auto& map = Data_[std::hash<TString>()(key) % NumParts_]; auto it = map.find(key); if (it == map.end()) { @@ -44,6 +52,7 @@ public: private: std::vector<std::unordered_map<TString, TString>> Data_; const ui32 NumParts_; + TMutex Mutex_ = TMutex(); }; } // namespace diff --git a/yt/yql/providers/yt/fmr/utils/ut/ya.make b/yt/yql/providers/yt/fmr/utils/ut/ya.make index 2e7c9b94a65..b2c94c2502a 100644 --- a/yt/yql/providers/yt/fmr/utils/ut/ya.make +++ b/yt/yql/providers/yt/fmr/utils/ut/ya.make @@ -7,7 +7,7 @@ SRCS( PEERDIR( yt/yql/providers/yt/fmr/utils yt/yql/providers/yt/fmr/request_options - yt/yql/providers/yt/fmr/yt_service/mock + yt/yql/providers/yt/fmr/yt_job_service/mock yql/essentials/parser/pg_wrapper yql/essentials/parser/pg_wrapper/interface yql/essentials/public/udf diff --git a/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp b/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp index a0736aa96f3..d642ca7a6f8 100644 --- a/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp +++ b/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp @@ -1,9 +1,9 @@ #include <library/cpp/testing/unittest/registar.h> +#include <yt/cpp/mapreduce/common/helpers.h> #include <yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h> -#include <yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h> +#include <yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.h> #include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> -#include <yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h> using namespace NYql::NFmr; @@ -12,13 +12,15 @@ Y_UNIT_TEST_SUITE(ParseRecordTests) { TString inputYsonContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};\n" "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};\n"; TYtTableRef testYtTable = TYtTableRef{.Path = "test_path", .Cluster = "hahn"}; - std::unordered_map<TYtTableRef, TString> inputTables{{testYtTable, inputYsonContent}}; + auto richPath = NYT::TRichYPath("test_path").Cluster("test_cluster"); + std::unordered_map<TString, TString> inputTables{{NYT::NodeToCanonicalYsonString(NYT::PathToNode(richPath)), inputYsonContent}}; + std::unordered_map<TYtTableRef, TString> outputTables; - auto ytService = MakeMockYtService(inputTables, outputTables); + auto ytJobService = MakeMockYtJobService(inputTables, outputTables); - auto reader = ytService->MakeReader(testYtTable, TClusterConnection()); - auto writer = ytService->MakeWriter(testYtTable, TClusterConnection()); + auto reader = ytJobService->MakeReader(richPath); + auto writer = ytJobService->MakeWriter(testYtTable, TClusterConnection()); auto cancelFlag = std::make_shared<std::atomic<bool>>(false); ParseRecords(reader, writer, 1, 10, cancelFlag); writer->Flush(); diff --git a/yt/yql/providers/yt/fmr/utils/ya.make b/yt/yql/providers/yt/fmr/utils/ya.make index 0d683128179..8bd317b26d5 100644 --- a/yt/yql/providers/yt/fmr/utils/ya.make +++ b/yt/yql/providers/yt/fmr/utils/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + yql_yt_client.cpp yql_yt_log_context.cpp yql_yt_parse_records.cpp yql_yt_table_data_service_key.cpp @@ -8,8 +9,9 @@ SRCS( PEERDIR( library/cpp/http/io + yt/cpp/mapreduce/client yt/cpp/mapreduce/interface - yt/yql/providers/yt/fmr/yt_service/impl + yt/yql/providers/yt/fmr/request_options yt/yql/providers/yt/codec yql/essentials/utils ) diff --git a/yt/yql/providers/yt/fmr/utils/yql_yt_client.cpp b/yt/yql/providers/yt/fmr/utils/yql_yt_client.cpp new file mode 100644 index 00000000000..3eaeba98d9f --- /dev/null +++ b/yt/yql/providers/yt/fmr/utils/yql_yt_client.cpp @@ -0,0 +1,14 @@ +#include "yql_yt_client.h" + +namespace NYql::NFmr { + +NYT::IClientPtr CreateClient(const TClusterConnection& clusterConnection) { + NYT::TCreateClientOptions createOpts; + auto token = clusterConnection.Token; + if (token) { + createOpts.Token(*token); + } + return NYT::CreateClient(clusterConnection.YtServerName, createOpts); +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/utils/yql_yt_client.h b/yt/yql/providers/yt/fmr/utils/yql_yt_client.h new file mode 100644 index 00000000000..3d4f2f07224 --- /dev/null +++ b/yt/yql/providers/yt/fmr/utils/yql_yt_client.h @@ -0,0 +1,10 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/client.h> +#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> + +namespace NYql::NFmr { + +NYT::IClientPtr CreateClient(const TClusterConnection& clusterConnection); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make index 790f845c3e8..7bde0ed7f9b 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make +++ b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make @@ -6,6 +6,7 @@ SRCS( PEERDIR( yt/yql/providers/yt/fmr/coordinator/impl + yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file yt/yql/providers/yt/fmr/job_factory/impl yt/yql/providers/yt/fmr/worker/impl ) diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp index f19c5365bdf..79ccfa46f70 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp +++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp @@ -6,12 +6,13 @@ #include <util/thread/pool.h> #include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h> #include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h> +#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service//file/yql_yt_file_coordinator_service.h> #include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> namespace NYql::NFmr { TDownloadOperationParams downloadOperationParams{ - .Input = TYtTableRef{"Path","Cluster"}, + .Input = TYtTableRef{"Path","Cluster", "FilePath"}, .Output = TFmrTableRef{{"Cluster", "Path"}} }; @@ -26,7 +27,7 @@ TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Do Y_UNIT_TEST_SUITE(FmrWorkerTests) { Y_UNIT_TEST(GetSuccessfulOperationResult) { - auto coordinator = MakeFmrCoordinator(); + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); auto operationResults = std::make_shared<TString>("no_result_yet"); auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { while (!cancelFlag->load()) { @@ -48,7 +49,7 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) { } Y_UNIT_TEST(CancelOperation) { - auto coordinator = MakeFmrCoordinator(); + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); auto operationResults = std::make_shared<TString>("no_result_yet"); auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { int numIterations = 0; @@ -79,7 +80,7 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) { TFmrCoordinatorSettings coordinatorSettings{}; coordinatorSettings.WorkersNum = 2; coordinatorSettings.RandomProvider = CreateDeterministicRandomProvider(3); - auto coordinator = MakeFmrCoordinator(coordinatorSettings); + auto coordinator = MakeFmrCoordinator(coordinatorSettings, MakeFileYtCoordinatorService()); std::shared_ptr<std::atomic<ui32>> operationResult = std::make_shared<std::atomic<ui32>>(0); auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { while (!cancelFlag->load()) { diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp index ce06e6d0783..ee8ce692f00 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp +++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp @@ -50,15 +50,20 @@ public: } } + ui64 maxParallelJobCount = JobFactory_->GetMaxParallelJobCount(); + YQL_ENSURE(maxParallelJobCount >= WorkerState_->TaskStatuses.size()); + ui64 availableSlots = maxParallelJobCount - WorkerState_->TaskStatuses.size(); auto heartbeatRequest = THeartbeatRequest( WorkerId_, VolatileId_, - taskStates + taskStates, + availableSlots ); auto heartbeatResponseFuture = Coordinator_->SendHeartbeatResponse(heartbeatRequest); auto heartbeatResponse = heartbeatResponseFuture.GetValueSync(); std::vector<TTask::TPtr> tasksToRun = heartbeatResponse.TasksToRun; std::unordered_set<TString> taskToDeleteIds = heartbeatResponse.TaskToDeleteIds; + YQL_ENSURE(tasksToRun.size() <= availableSlots); with_lock(WorkerState_->Mutex) { for (auto task: tasksToRun) { diff --git a/yt/yql/providers/yt/fmr/yt_service/file/ut/ya.make b/yt/yql/providers/yt/fmr/yt_job_service/file/ut/ya.make index 6f4ff700b0d..884cda6f788 100644 --- a/yt/yql/providers/yt/fmr/yt_service/file/ut/ya.make +++ b/yt/yql/providers/yt/fmr/yt_job_service/file/ut/ya.make @@ -1,11 +1,11 @@ UNITTEST() SRCS( - yql_yt_file_yt_service_ut.cpp + yql_yt_file_yt_job_service_ut.cpp ) PEERDIR( - yt/yql/providers/yt/fmr/yt_service/file + yt/yql/providers/yt/fmr/yt_job_service/file yt/yql/providers/yt/gateway/file ) diff --git a/yt/yql/providers/yt/fmr/yt_service/file/ut/yql_yt_file_yt_service_ut.cpp b/yt/yql/providers/yt/fmr/yt_job_service/file/ut/yql_yt_file_yt_job_service_ut.cpp index acc5867c454..6a013963cd7 100644 --- a/yt/yql/providers/yt/fmr/yt_service/file/ut/yql_yt_file_yt_service_ut.cpp +++ b/yt/yql/providers/yt/fmr/yt_job_service/file/ut/yql_yt_file_yt_job_service_ut.cpp @@ -1,26 +1,26 @@ #include <library/cpp/testing/unittest/registar.h> #include <util/stream/file.h> -#include <yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h> -#include <yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h> // TODO - REMOVE +#include <yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h> +#include <yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h> namespace NYql::NFmr { -Y_UNIT_TEST_SUITE(FileYtServiceTest) { - Y_UNIT_TEST(CheckReaderAndWriter) { +Y_UNIT_TEST_SUITE(FileYtServiceTests) { + Y_UNIT_TEST(CheckFileReaderAndWriter) { TString inputYsonContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};\n" "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};\n"; TTempFileHandle file{}; TYtTableRef ytTable{.Path = "test_path", .Cluster = "hahn", .FilePath = file.Name()}; - auto fileService = MakeFileYtSerivce(); + auto fileService = MakeFileYtJobSerivce(); auto writer = fileService->MakeWriter(ytTable, TClusterConnection()); writer->Write(inputYsonContent.data(), inputYsonContent.size()); writer->Flush(); TFileInput input(file.Name()); - auto reader = fileService->MakeReader(ytTable, TClusterConnection()); + auto reader = fileService->MakeReader(file.Name()); TStringStream binaryYsonStream; TStringStream textYsonStream; binaryYsonStream << reader->ReadAll(); diff --git a/yt/yql/providers/yt/fmr/yt_service/file/ya.make b/yt/yql/providers/yt/fmr/yt_job_service/file/ya.make index dcd0969e3e6..13cfe7ebd3d 100644 --- a/yt/yql/providers/yt/fmr/yt_service/file/ya.make +++ b/yt/yql/providers/yt/fmr/yt_job_service/file/ya.make @@ -1,13 +1,13 @@ LIBRARY() SRCS( - yql_yt_file_yt_service.cpp + yql_yt_file_yt_job_service.cpp ) PEERDIR( library/cpp/yson yt/yql/providers/yt/gateway/file - yt/yql/providers/yt/fmr/yt_service/interface + yt/yql/providers/yt/fmr/yt_job_service/interface yt/yql/providers/yt/lib/yson_helpers yql/essentials/utils ) diff --git a/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.cpp b/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.cpp new file mode 100644 index 00000000000..d72f76bd419 --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.cpp @@ -0,0 +1,68 @@ +#include "yql_yt_file_yt_job_service.h" +#include <library/cpp/yson/parser.h> +#include <util/stream/file.h> +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h> +#include <yt/yql/providers/yt/lib/yson_helpers/yson_helpers.h> +#include <yql/essentials/utils/yql_panic.h> + +namespace NYql::NFmr { + +namespace { + +class TFileYtTableWriter: public NYT::TRawTableWriter { +public: + TFileYtTableWriter(const TString& filePath): FilePath_(filePath) {} + + void NotifyRowEnd() override { + } + +private: + void DoWrite(const void* buf, size_t len) override { + Buffer_.Append(static_cast<const char*>(buf), len); + } + + void DoFlush() override { + TMemoryInput input(Buffer_.data(), Buffer_.size()); + TFile outputFile(FilePath_, OpenAlways | WrOnly | ForAppend); + TFileOutput outputFileStream(outputFile); + TDoubleHighPrecisionYsonWriter writer(&outputFileStream, ::NYson::EYsonType::ListFragment); + NYson::TYsonParser parser(&writer, &input, ::NYson::EYsonType::ListFragment); + parser.Parse(); + Buffer_.Clear(); + } + + TString FilePath_; + TBuffer Buffer_; +}; + +class TFileYtJobService: public NYql::NFmr::IYtJobService { +public: + +NYT::TRawTableReaderPtr MakeReader( + const std::variant<NYT::TRichYPath, TString>& inputTableRef, + const TClusterConnection& /*clusterConnection*/, + const TYtReaderSettings& /*readerSettings*/ +) override { + TString filePath = std::get<TString>(inputTableRef); + auto textYsonInputs = NFile::MakeTextYsonInputs({{filePath, NFile::TColumnsInfo{}}}, false); + return textYsonInputs[0]; + } + + NYT::TRawTableWriterPtr MakeWriter( + const TYtTableRef& ytTable, + const TClusterConnection& /*clusterConnection*/, + const TYtWriterSettings& /*writerSettings*/ + ) override { + YQL_ENSURE(ytTable.FilePath); + return MakeIntrusive<TFileYtTableWriter>(*ytTable.FilePath); + } +}; + +} // namespace + +IYtJobService::TPtr MakeFileYtJobSerivce() { + return MakeIntrusive<TFileYtJobService>(); +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h b/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h new file mode 100644 index 00000000000..a4f7c2426ce --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h @@ -0,0 +1,7 @@ +#include <yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h> + +namespace NYql::NFmr { + +IYtJobService::TPtr MakeFileYtJobSerivce(); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_job_service/impl/ya.make b/yt/yql/providers/yt/fmr/yt_job_service/impl/ya.make new file mode 100644 index 00000000000..53303655e61 --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_job_service/impl/ya.make @@ -0,0 +1,18 @@ +LIBRARY() + +SRCS( + yql_yt_job_service_impl.cpp +) + +PEERDIR( + library/cpp/yt/error + yt/cpp/mapreduce/client + yt/cpp/mapreduce/common + yt/yql/providers/yt/fmr/yt_job_service/interface + yql/essentials/utils + yql/essentials/utils/log +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp b/yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.cpp index 4634a76031b..3c21287dc5b 100644 --- a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp +++ b/yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.cpp @@ -1,28 +1,36 @@ +#include <library/cpp/yt/error/error.h> #include <yt/cpp/mapreduce/common/helpers.h> #include <yt/cpp/mapreduce/interface/client.h> +#include <yt/yql/providers/yt/fmr/utils/yql_yt_client.h> +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/utils/yql_panic.h> -#include "yql_yt_yt_service_impl.h" +#include "yql_yt_job_service_impl.h" namespace NYql::NFmr { namespace { -class TFmrYtService: public NYql::NFmr::IYtService { +class TFmrYtJobService: public IYtJobService { public: NYT::TRawTableReaderPtr MakeReader( - const TYtTableRef& ytTable, + const std::variant<NYT::TRichYPath, TString>& inputTableRef, const TClusterConnection& clusterConnection, const TYtReaderSettings& readerSettings ) override { + auto richPath = std::get<NYT::TRichYPath>(inputTableRef); + //YQL_CLOG(DEBUG, FastMapReduce) << "Creating reader for input yt table with path " << NYT::NodeToCanonicalYsonString(NYT::PathToNode(richPath)); + YQL_ENSURE(richPath.Cluster_); + TFmrTableId fmrId(*richPath.Cluster_, richPath.Path_); auto client = CreateClient(clusterConnection); auto transaction = client->AttachTransaction(GetGuid(clusterConnection.TransactionId)); - auto path = NYT::TRichYPath(NYT::AddPathPrefix(ytTable.Path, "//")); + auto controlAttributes = NYT::TControlAttributes(); if (!readerSettings.WithAttributes) { controlAttributes.EnableRangeIndex(false).EnableRowIndex(false); } auto readerOptions = NYT::TTableReaderOptions().ControlAttributes(controlAttributes); - return transaction->CreateRawReader(path, NYT::TFormat::YsonBinary(), readerOptions); + return transaction->CreateRawReader(richPath, NYT::TFormat::YsonBinary(), readerOptions); } NYT::TRawTableWriterPtr MakeWriter( @@ -33,29 +41,19 @@ public: auto client = CreateClient(clusterConnection); auto transaction = client->AttachTransaction(GetGuid(clusterConnection.TransactionId)); TString ytPath = NYT::AddPathPrefix(ytTable.Path, "//"); - auto richPath = NYT::TRichYPath(ytPath).Append(true); + auto richPath = NYT::TRichYPath(ytPath).Cluster(ytTable.Cluster).Append(true); auto writerOptions = NYT::TTableWriterOptions(); if (writerSetttings.MaxRowWeight) { writerOptions.Config(NYT::TNode()("max_row_weight", *writerSetttings.MaxRowWeight)); } return transaction->CreateRawWriter(richPath, NYT::TFormat::YsonBinary(), writerOptions); } - -private: - NYT::IClientPtr CreateClient(const TClusterConnection& clusterConnection) { - NYT::TCreateClientOptions createOpts; - auto token = clusterConnection.Token; - if (token) { - createOpts.Token(*token); - } - return NYT::CreateClient(clusterConnection.YtServerName, createOpts); - } }; } // namespace -IYtService::TPtr MakeFmrYtSerivce() { - return MakeIntrusive<TFmrYtService>(); +IYtJobService::TPtr MakeYtJobSerivce() { + return MakeIntrusive<TFmrYtJobService>(); } } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.h b/yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.h new file mode 100644 index 00000000000..bac4abf6ab7 --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.h @@ -0,0 +1,7 @@ +#include <yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h> + +namespace NYql::NFmr { + +IYtJobService::TPtr MakeYtJobSerivce(); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/ya.make b/yt/yql/providers/yt/fmr/yt_job_service/interface/ya.make index 032f645e022..a824de924f5 100644 --- a/yt/yql/providers/yt/fmr/yt_service/interface/ya.make +++ b/yt/yql/providers/yt/fmr/yt_job_service/interface/ya.make @@ -1,7 +1,7 @@ LIBRARY() SRCS( - yql_yt_yt_service.cpp + yql_yt_job_service.cpp ) PEERDIR( diff --git a/yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.cpp b/yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.cpp new file mode 100644 index 00000000000..7b3bac7a269 --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.cpp @@ -0,0 +1 @@ +#include "yql_yt_job_service.h" diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h b/yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h index 77204493e70..0975fe729c7 100644 --- a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h +++ b/yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h @@ -7,22 +7,23 @@ namespace NYql::NFmr { struct TYtReaderSettings { - bool WithAttributes = false; // Enable RowIndex and RangeIndex + bool WithAttributes = false; // Enable RowIndex and RangeIndex, for now only mode = false is supported. }; struct TYtWriterSettings { TMaybe<ui64> MaxRowWeight = Nothing(); }; -class IYtService: public TThrRefBase { +class IYtJobService: public TThrRefBase { public: - virtual ~IYtService() = default; + virtual ~IYtJobService() = default; - using TPtr = TIntrusivePtr<IYtService>; + using TPtr = TIntrusivePtr<IYtJobService>; + // Either RichPath to actual Yt table or filepath is passed depending on type of underlying gateway. virtual NYT::TRawTableReaderPtr MakeReader( - const TYtTableRef& ytTable, - const TClusterConnection& clusterConnection, + const std::variant<NYT::TRichYPath, TString>& inputTableRef, + const TClusterConnection& clusterConnection = TClusterConnection(), const TYtReaderSettings& settings = TYtReaderSettings() ) = 0; diff --git a/yt/yql/providers/yt/fmr/yt_service/mock/ya.make b/yt/yql/providers/yt/fmr/yt_job_service/mock/ya.make index e586d08ec3a..4896db2ef75 100644 --- a/yt/yql/providers/yt/fmr/yt_service/mock/ya.make +++ b/yt/yql/providers/yt/fmr/yt_job_service/mock/ya.make @@ -1,12 +1,13 @@ LIBRARY() SRCS( - yql_yt_yt_service_mock.cpp + yql_yt_job_service_mock.cpp ) PEERDIR( + yt/cpp/mapreduce/common yt/cpp/mapreduce/interface - yt/yql/providers/yt/fmr/yt_service/interface + yt/yql/providers/yt/fmr/yt_job_service/interface yql/essentials/utils ) diff --git a/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.cpp b/yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.cpp index 337c82d2b31..e31b7a22c17 100644 --- a/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.cpp +++ b/yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.cpp @@ -1,5 +1,6 @@ -#include "yql_yt_yt_service_mock.h" +#include "yql_yt_job_service_mock.h" +#include <yt/cpp/mapreduce/common/helpers.h> #include <yt/cpp/mapreduce/interface/io.h> #include <yql/essentials/utils/yql_panic.h> @@ -58,14 +59,20 @@ private: TBuffer Buffer_; }; -class TMockYtService: public NYql::NFmr::IYtService { +class TMockYtJobService: public NYql::NFmr::IYtJobService { public: - TMockYtService(const std::unordered_map<TYtTableRef, TString>& inputTables, std::unordered_map<TYtTableRef, TString>& outputTables) + TMockYtJobService(const std::unordered_map<TString, TString>& inputTables, std::unordered_map<TYtTableRef, TString>& outputTables) : InputTables_(inputTables), OutputTables_(outputTables) {} - NYT::TRawTableReaderPtr MakeReader(const TYtTableRef& ytTableRef, const TClusterConnection&, const TYtReaderSettings&) override { - YQL_ENSURE(InputTables_.contains(ytTableRef)); - return MakeIntrusive<TMockYtTableReader>(InputTables_[ytTableRef]); + virtual NYT::TRawTableReaderPtr MakeReader( + const std::variant<NYT::TRichYPath, TString>& inputTableRef, + const TClusterConnection& /*clusterConnection*/, + const TYtReaderSettings& /*settings*/ + ) override { + auto richPath = std::get<NYT::TRichYPath>(inputTableRef); + TString richPathStr = NYT::NodeToCanonicalYsonString(NYT::PathToNode(richPath)); + YQL_ENSURE(InputTables_.contains(richPathStr)); + return {MakeIntrusive<TMockYtTableReader>(InputTables_[richPathStr])}; } NYT::TRawTableWriterPtr MakeWriter(const TYtTableRef& ytTableRef, const TClusterConnection&, const TYtWriterSettings&) override { @@ -76,14 +83,14 @@ public: } private: - std::unordered_map<TYtTableRef, TString> InputTables_; // table -> textYsonContent + std::unordered_map<TString, TString> InputTables_; // rich yt path in string form -> total textYsonContent of it std::unordered_map<TYtTableRef, TString>& OutputTables_; }; } // namespace -IYtService::TPtr MakeMockYtService(const std::unordered_map<TYtTableRef, TString>& inputTables, std::unordered_map<TYtTableRef, TString>& outputTables) { - return MakeIntrusive<TMockYtService>(inputTables, outputTables); +IYtJobService::TPtr MakeMockYtJobService(const std::unordered_map<TString, TString>& inputTables, std::unordered_map<TYtTableRef, TString>& outputTables) { + return MakeIntrusive<TMockYtJobService>(inputTables, outputTables); } } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.h b/yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.h new file mode 100644 index 00000000000..f13dc79de1d --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.h @@ -0,0 +1,9 @@ +#pragma once + +#include <yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h> + +namespace NYql::NFmr { + +IYtJobService::TPtr MakeMockYtJobService(const std::unordered_map<TString, TString>& inputTables, std::unordered_map<TYtTableRef, TString>& outputTables); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.cpp b/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.cpp deleted file mode 100644 index 1c33ed430e9..00000000000 --- a/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.cpp +++ /dev/null @@ -1,68 +0,0 @@ -#include "yql_yt_file_yt_service.h" -#include <library/cpp/yson/parser.h> -#include <util/stream/file.h> -#include <yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h> -#include <yt/yql/providers/yt/lib/yson_helpers/yson_helpers.h> -#include <yql/essentials/utils/yql_panic.h> - -namespace NYql::NFmr { - -namespace { - -class TFileYtTableWriter: public NYT::TRawTableWriter { - public: - TFileYtTableWriter(const TString& filePath): FilePath_(filePath) {} - - void NotifyRowEnd() override { - } - - private: - void DoWrite(const void* buf, size_t len) override { - Buffer_.Append(static_cast<const char*>(buf), len); - } - - void DoFlush() override { - TMemoryInput input(Buffer_.data(), Buffer_.size()); - TFileOutput outputFileStream(FilePath_); - TDoubleHighPrecisionYsonWriter writer(&outputFileStream, ::NYson::EYsonType::ListFragment); - NYson::TYsonParser parser(&writer, &input, ::NYson::EYsonType::ListFragment); - parser.Parse(); - Buffer_.Clear(); - } - - TString FilePath_; - TBuffer Buffer_; - }; - - -class TFileYtService: public NYql::NFmr::IYtService { -public: - - NYT::TRawTableReaderPtr MakeReader( - const TYtTableRef& ytTable, - const TClusterConnection& /*clusterConnection*/, - const TYtReaderSettings& /*readerSettings*/ - ) override { - YQL_ENSURE(ytTable.FilePath); - auto textYsonInputs = NFile::MakeTextYsonInputs({{*ytTable.FilePath, NFile::TColumnsInfo{}}}, false); - return textYsonInputs[0]; - } - - NYT::TRawTableWriterPtr MakeWriter( - const TYtTableRef& ytTable, - const TClusterConnection& /*clusterConnection*/, - const TYtWriterSettings& /*writerSettings*/ - ) override { - YQL_ENSURE(ytTable.FilePath); - return MakeIntrusive<TFileYtTableWriter>(*ytTable.FilePath); - } - -}; - -} // namespace - -IYtService::TPtr MakeFileYtSerivce() { - return MakeIntrusive<TFileYtService>(); -} - -} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h b/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h deleted file mode 100644 index ea5e8027b9a..00000000000 --- a/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h +++ /dev/null @@ -1,7 +0,0 @@ -#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h> - -namespace NYql::NFmr { - -IYtService::TPtr MakeFileYtSerivce(); - -} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/ya.make b/yt/yql/providers/yt/fmr/yt_service/impl/ya.make deleted file mode 100644 index 1b37156ed2f..00000000000 --- a/yt/yql/providers/yt/fmr/yt_service/impl/ya.make +++ /dev/null @@ -1,15 +0,0 @@ -LIBRARY() - -SRCS( - yql_yt_yt_service_impl.cpp -) - -PEERDIR( - yt/cpp/mapreduce/client - yt/cpp/mapreduce/common - yt/yql/providers/yt/fmr/yt_service/interface -) - -YQL_LAST_ABI_VERSION() - -END() diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h deleted file mode 100644 index bf1962cac5a..00000000000 --- a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h +++ /dev/null @@ -1,7 +0,0 @@ -#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h> - -namespace NYql::NFmr { - -IYtService::TPtr MakeFmrYtSerivce(); - -} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.cpp b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.cpp deleted file mode 100644 index fbcafbc3a27..00000000000 --- a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.cpp +++ /dev/null @@ -1 +0,0 @@ -#include "yql_yt_yt_service.h" diff --git a/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h b/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h deleted file mode 100644 index b10426c2f03..00000000000 --- a/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h +++ /dev/null @@ -1,9 +0,0 @@ -#pragma once - -#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h> - -namespace NYql::NFmr { - -IYtService::TPtr MakeMockYtService(const std::unordered_map<TYtTableRef, TString>& inputTables, std::unordered_map<TYtTableRef, TString>& outputTables); - -} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h b/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h index a28606598d3..01c400d8527 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h @@ -1,4 +1,5 @@ #pragma once + #include <yt/cpp/mapreduce/interface/fwd.h> #include <yt/cpp/mapreduce/interface/common.h> diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp index ad261da0d62..6b2d145b24d 100644 --- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp @@ -134,6 +134,7 @@ public: TString sessionId = options.SessionId(); auto config = options.Config(); TRunResult result; + YQL_ENSURE(fmrOperationResult.TablesStats.size() == outputTables.size()); for (size_t i = 0; i < outputTables.size(); ++i) { auto outputTable = outputTables[i]; TFmrTableId fmrOutputTableId = {outputTable.Cluster, outputTable.Path}; |