diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-04-15 21:10:57 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-04-15 21:44:05 +0300 |
commit | b628b47d09aeb527f1387fc20b7d188ed34db36c (patch) | |
tree | 2d9fe60cd51b42451dfee5d59d711eb8e0a40739 | |
parent | 99be0c2021948af5599a38f3f205529e3a87b4f4 (diff) | |
download | ydb-b628b47d09aeb527f1387fc20b7d188ed34db36c.tar.gz |
Intermediate changes
commit_hash:c11c6ef3a9265cf7558aebc0ade265d419966dd6
25 files changed, 383 insertions, 178 deletions
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson b/yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson new file mode 100644 index 00000000000..203b3d1ce66 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson @@ -0,0 +1,26 @@ +{ + "merge" = { + "read_block_count" = 4; + "read_block_size" = 1048576; + "num_threads" = 3; + }; + "upload" = { + "read_block_count" = 4; + "read_block_size" = 1048576; + }; + "job_io" = { + "fmr_table_reader" = { + "inflight_chunks" = 1; + }; + "yt_table_reader" = { + }; + "fmr_table_writer" = { + "max_row_weight" = 16777216; + "chunk_size" = 1048576; + "inflight_chunks" = 1; + }; + "yt_table_writer" = { + "max_row_weight" = 16777216; + }; + }; +}
\ No newline at end of file diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp index c64bc88aaf2..0de9cb8fda0 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp @@ -234,8 +234,8 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::InProgress); } Y_UNIT_TEST(RetryRunningOperationAfterIdempotencyKeyClear) { - TFmrCoordinatorSettings coordinatorSettings{ - .WorkersNum = 1, .RandomProvider = CreateDeterministicRandomProvider(2), .IdempotencyKeyStoreTime = TDuration::Seconds(1)}; + auto coordinatorSettings = TFmrCoordinatorSettings(); + coordinatorSettings.IdempotencyKeyStoreTime = TDuration::Seconds(1); auto coordinator = MakeFmrCoordinator(coordinatorSettings); TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction}; diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make index 32ffb2ed14f..20abd2bbc84 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make @@ -6,6 +6,7 @@ SRCS( PEERDIR( library/cpp/random_provider + library/cpp/resource library/cpp/threading/future library/cpp/yson/node yt/cpp/mapreduce/common @@ -14,6 +15,10 @@ PEERDIR( yql/essentials/utils ) +RESOURCE( + default_coordinator_settings.yson default_coordinator_settings.yson +) + YQL_LAST_ABI_VERSION() END() diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp index 625f034c3fd..ed772647314 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp @@ -1,4 +1,5 @@ #include <thread> +#include <library/cpp/resource/resource.h> #include <yt/cpp/mapreduce/common/helpers.h> #include <yql/essentials/utils/log/log.h> #include <yql/essentials/utils/yql_panic.h> @@ -6,6 +7,14 @@ namespace NYql::NFmr { +TFmrCoordinatorSettings::TFmrCoordinatorSettings() { + DefaultFmrOperationSpec = NYT::NodeFromYsonString(NResource::Find("default_coordinator_settings.yson")); + WorkersNum = 1; + RandomProvider = CreateDefaultRandomProvider(), + IdempotencyKeyStoreTime = TDuration::Seconds(10); + TimeToSleepBetweenClearKeyRequests = TDuration::Seconds(1); +} + namespace { struct TCoordinatorTaskInfo { @@ -161,7 +170,6 @@ public: YQL_ENSURE(fmrTableId.PartId == curTableStats.PartId); if (taskStatus == ETaskStatus::Completed) { YQL_CLOG(DEBUG, FastMapReduce) << "Current statistic from table with id" << fmrTableId.TableId << "_" << fmrTableId.PartId << ": " << tableStats; - Cerr << "Current statistic from table with id" << fmrTableId.TableId << "_" << fmrTableId.PartId << ": " << tableStats; } } Operations_[operationId].OutputTableIds.emplace_back(fmrTableId.TableId); @@ -327,26 +335,14 @@ private: } } - TMaybe<NYT::TNode> GetJobSettings(const TMaybe<NYT::TNode>& currentFmrOperationSpec) { - // TODO - check this works - TMaybe<NYT::TNode> defaultJobSettings = Nothing(), currentJobSettings = Nothing(); - if (currentFmrOperationSpec && currentFmrOperationSpec->HasKey("job_settings")) { - currentJobSettings = (*currentFmrOperationSpec)["job_settings"]; - } - if (DefaultFmrOperationSpec_ && DefaultFmrOperationSpec_->HasKey("job_settings")) { - defaultJobSettings = (*DefaultFmrOperationSpec_)["job_settings"]; - } - if (defaultJobSettings && !currentJobSettings) { - return defaultJobSettings; - } - if (currentJobSettings && !defaultJobSettings) { - return currentJobSettings; - } - if (!currentJobSettings && !defaultJobSettings) { - return Nothing(); + NYT::TNode GetJobSettings(const TMaybe<NYT::TNode>& currentFmrOperationSpec) { + // For now fmr operation spec only consists of job settings + if (!currentFmrOperationSpec) { + return DefaultFmrOperationSpec_; } - NYT::MergeNodes(*currentJobSettings, *defaultJobSettings); - return currentJobSettings; + auto resultFmrOperationSpec = DefaultFmrOperationSpec_; + NYT::MergeNodes(resultFmrOperationSpec, *currentFmrOperationSpec); + return resultFmrOperationSpec; } std::unordered_map<TString, TCoordinatorTaskInfo> Tasks_; // TaskId -> current info about it @@ -363,7 +359,7 @@ private: TDuration TimeToSleepBetweenClearKeyRequests_; TDuration IdempotencyKeyStoreTime_; std::unordered_map<TFmrTableId, TCoordinatorFmrTableStats> FmrTableStatistics_; // TableId -> Statistics - TMaybe<NYT::TNode> DefaultFmrOperationSpec_; + NYT::TNode DefaultFmrOperationSpec_; }; } // namespace diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h index 3fea218981f..563752dfc7b 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h @@ -1,7 +1,7 @@ #pragma once #include <library/cpp/random_provider/random_provider.h> -#include <library/cpp/yson/node/node.h> +#include <library/cpp/yson/node/node_io.h> #include <util/system/mutex.h> #include <util/system/guard.h> #include <util/generic/queue.h> @@ -10,13 +10,15 @@ namespace NYql::NFmr { struct TFmrCoordinatorSettings { - ui32 WorkersNum; // Not supported yet + NYT::TNode DefaultFmrOperationSpec; + ui32 WorkersNum; TIntrusivePtr<IRandomProvider> RandomProvider; TDuration IdempotencyKeyStoreTime = TDuration::Seconds(10); TDuration TimeToSleepBetweenClearKeyRequests = TDuration::Seconds(1); - TMaybe<NYT::TNode> DefaultFmrOperationSpec = Nothing(); + + TFmrCoordinatorSettings(); }; -IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings = {.WorkersNum = 1, .RandomProvider = CreateDeterministicRandomProvider(2)}); +IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings = TFmrCoordinatorSettings()); } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp index 3bc8e074a3a..1d6bc504a5e 100644 --- a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp +++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp @@ -1,9 +1,17 @@ #include "yql_yt_fmr_initializer.h" +#include <util/stream/file.h> namespace NYql::NFmr { -std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(IYtGateway::TPtr slave, bool disableLocalFmrWorker, const TString& coordinatorServerUrl, bool isFileGateway) { - auto coordinator = MakeFmrCoordinator(); +std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(IYtGateway::TPtr slave, bool disableLocalFmrWorker, const TString& coordinatorServerUrl, bool isFileGateway, const TString& fmrOperationSpecFilePath) { + TFmrCoordinatorSettings coordinatorSettings{}; + if (!fmrOperationSpecFilePath.empty()) { + TFileInput input(fmrOperationSpecFilePath); + auto fmrOperationSpec = NYT::NodeFromYsonStream(&input); + coordinatorSettings.DefaultFmrOperationSpec = fmrOperationSpec; + } + + auto coordinator = MakeFmrCoordinator(coordinatorSettings); if (!coordinatorServerUrl.empty()) { TFmrCoordinatorClientSettings coordinatorClientSettings; THttpURL parsedUrl; diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h index a87522fbd71..24f17928e17 100644 --- a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h +++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h @@ -14,6 +14,12 @@ namespace NYql::NFmr { constexpr TStringBuf FastMapReduceGatewayName = "fmr"; -std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(IYtGateway::TPtr slave, bool disableLocalFmrWorker = false, const TString& coordinatorServerUrl = TString(), bool isFileGateway = false); +std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway( + IYtGateway::TPtr slave, + bool disableLocalFmrWorker = false, + const TString& coordinatorServerUrl = TString(), + bool isFileGateway = false, + const TString& fmrOperationSpecFilePath = TString() +); } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp index 11b30362ee7..6b9ce068652 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp @@ -41,13 +41,13 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { std::unordered_map<TYtTableRef, TString> outputTables; NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService); TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id"); TDownloadTaskParams params = TDownloadTaskParams(input, output); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - auto res = job->Download(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}); + auto res = job->Download(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}, cancelFlag); auto err = std::get_if<TError>(&res); auto statistics = std::get_if<TStatistics>(&res); @@ -65,7 +65,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService); TYtTableRef output = TYtTableRef("test_cluster", "test_path"); std::vector<TTableRange> ranges = {{"test_part_id"}}; @@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0); tableDataServicePtr->Put(key, GetBinaryYson(TableContent_1)); - auto res = job->Upload(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}); + auto res = job->Upload(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}, cancelFlag); auto err = std::get_if<TError>(&res); @@ -95,7 +95,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { std::unordered_map<TYtTableRef, TString> outputTables; NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables); auto cancelFlag = std::make_shared<std::atomic<bool>>(false); - IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService); TTaskTableRef input_table_ref_1 = {input_1}; TTaskTableRef input_table_ref_2 = {input_2}; @@ -110,14 +110,20 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { tableDataServicePtr->Put(key_1, GetBinaryYson(TableContent_1)); tableDataServicePtr->Put(key_3, GetBinaryYson(TableContent_3)); - auto res = job->Merge(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}); + auto res = job->Merge(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}, cancelFlag); auto err = std::get_if<TError>(&res); UNIT_ASSERT_C(!err, err->ErrorMessage); auto resultTableContentMaybe = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync(); UNIT_ASSERT_C(resultTableContentMaybe, "Result table content is empty"); TString resultTableContent = GetTextYson(*resultTableContentMaybe); - UNIT_ASSERT_NO_DIFF(resultTableContent, TableContent_1 + TableContent_2 + TableContent_3); + TString expected = TableContent_1 + TableContent_2 + TableContent_3; + UNIT_ASSERT_VALUES_EQUAL(resultTableContent.size(), expected.size()); + TString line; + TStringStream resultStream(resultTableContent); + while (resultStream.ReadLine(line)) { + UNIT_ASSERT(expected.Contains(line)); + } } } @@ -178,11 +184,11 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}); // No tables in tableDataService - try { - RunJob(task, tableDataServicePtr, ytService, cancelFlag); - } catch(...) { - UNIT_ASSERT(CurrentExceptionMessage().Contains("No data for chunk:test_table_id:test_part_id")); - } + UNIT_ASSERT_EXCEPTION_CONTAINS( + RunJob(task, tableDataServicePtr, ytService, cancelFlag), + yexception, + "No data for chunk:test_table_id:test_part_id" + ); } Y_UNIT_TEST(RunMergeTask) { @@ -214,9 +220,17 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed); - auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync(); - UNIT_ASSERT_C(resultTableContent, "Result table content is empty"); - UNIT_ASSERT_NO_DIFF(GetTextYson(*resultTableContent), TableContent_1 + TableContent_2 + TableContent_3); + auto resultTableContentMaybe = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync(); + UNIT_ASSERT_C(resultTableContentMaybe, "Result table content is empty"); + + TString resultTableContent = GetTextYson(*resultTableContentMaybe); + TString expected = TableContent_1 + TableContent_2 + TableContent_3; + UNIT_ASSERT_VALUES_EQUAL(resultTableContent.size(), expected.size()); + TString line; + TStringStream resultStream(resultTableContent); + while (resultStream.ReadLine(line)) { + UNIT_ASSERT(expected.Contains(line)); + } } } diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_reader_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_reader_ut.cpp index 8e4522db03d..3d609b888f6 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_reader_ut.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_reader_ut.cpp @@ -10,19 +10,19 @@ TString originalTableContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"ab "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; -Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) { +Y_UNIT_TEST_SUITE(FmrReaderTests) { Y_UNIT_TEST(ReadOneChunkSmallPart) { size_t chunkSize = 1024; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); - TFmrTableDataServiceWriterSettings settings{chunkSize}; + TFmrWriterSettings settings{chunkSize}; TFmrTableDataServiceWriter outputWriter("tableId", "partId", tableDataServicePtr, settings); outputWriter.Write(originalTableContent.data(), originalTableContent.size()); outputWriter.Flush(); - TFmrTableDataServiceReaderSettings readerSettings{1}; + TFmrReaderSettings readerSettings{1}; std::vector<TTableRange> tableRanges = {{"partId", 0, 1}}; TFmrTableDataServiceReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings); @@ -38,13 +38,13 @@ Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) { ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); - TFmrTableDataServiceWriterSettings settings{chunkSize}; + TFmrWriterSettings settings{.ChunkSize= chunkSize, .MaxInflightChunks = 2}; TFmrTableDataServiceWriter outputStream("tableId", "partId", tableDataServicePtr, settings); outputStream.Write(originalTableContent.data(), originalTableContent.size()); outputStream.Flush(); - TFmrTableDataServiceReaderSettings readerSettings{1}; + TFmrReaderSettings readerSettings{1}; std::vector<TTableRange> tableRanges = {{"partId", 0, 1}}; TFmrTableDataServiceReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings); @@ -56,7 +56,7 @@ Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) { size_t chunkSize = 32; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); - TFmrTableDataServiceWriterSettings settings{chunkSize}; + TFmrWriterSettings settings{.ChunkSize= chunkSize, .MaxInflightChunks = 2}; TFmrTableDataServiceWriter outputStream("tableId", "partId", tableDataServicePtr, settings); for (size_t i = 0; i < 3; ++i) { @@ -65,7 +65,7 @@ Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) { } outputStream.Flush(); - TFmrTableDataServiceReaderSettings readerSettings{1}; + TFmrReaderSettings readerSettings{1}; std::vector<TTableRange> tableRanges = {{"partId", 0, 3}}; TFmrTableDataServiceReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings); @@ -77,7 +77,7 @@ Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) { size_t chunkSize = 32; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); - TFmrTableDataServiceWriterSettings settings{chunkSize}; + TFmrWriterSettings settings{chunkSize}; TFmrTableDataServiceWriter outputStream("tableId", "partId", tableDataServicePtr, settings); for (size_t i = 0; i < 3; ++i) { @@ -86,7 +86,7 @@ Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) { } outputStream.Flush(); - TFmrTableDataServiceReaderSettings readerSettings{5}; + TFmrReaderSettings readerSettings{5}; std::vector<TTableRange> tableRanges = {{"partId", 0, 3}}; TFmrTableDataServiceReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings); diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp index 0add0e72cbb..925cdfff125 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp @@ -5,46 +5,70 @@ namespace NYql::NFmr { -Y_UNIT_TEST_SUITE(OutputStreamTests) { +const std::vector<TString> TableYsonRows = { + "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};", + "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};", + "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};", + "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};" +}; + +TTableStats WriteDataToTableDataSerice( + ITableDataService::TPtr tableDataService, + const std::vector<TString>& tableYsonRows, + ui64 chunkSize, + TMaybe<ui64> maxRowWeight = Nothing() +) { + TFmrWriterSettings settings{.ChunkSize = chunkSize}; + if (maxRowWeight) { + settings.MaxRowWeight = *maxRowWeight; + } + TFmrTableDataServiceWriter outputWriter("tableId", "partId", tableDataService, settings); + + for (auto& row: tableYsonRows) { + outputWriter.Write(row.data(), row.size()); + outputWriter.NotifyRowEnd(); + } + outputWriter.Flush(); + return outputWriter.GetStats(); +} + +Y_UNIT_TEST_SUITE(FmrWriterTests) { Y_UNIT_TEST(WriteYsonRows) { - std::vector<TString> tableYsonRows = { - "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};", - "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};", - "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};", - "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};" - }; ui64 totalSize = 0; - for (auto& row: tableYsonRows) { + for (auto& row: TableYsonRows) { totalSize += row.size(); } ui64 chunkSize = totalSize / 2; - ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); - TFmrTableDataServiceWriterSettings settings{.ChunkSize = chunkSize}; - TFmrTableDataServiceWriter outputWriter("tableId", "partId", tableDataServicePtr, settings); - - for (auto& row: tableYsonRows) { - outputWriter.Write(row.data(), row.size()); - outputWriter.NotifyRowEnd(); - } - outputWriter.Flush(); - - auto realChunks = outputWriter.GetStats().Chunks; - auto realDataWeight = outputWriter.GetStats().DataWeight; + ITableDataService::TPtr tableDataService = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + auto stats = WriteDataToTableDataSerice(tableDataService, TableYsonRows, chunkSize); + auto realChunks = stats.Chunks; + auto realDataWeight =stats.DataWeight; UNIT_ASSERT_VALUES_EQUAL(realChunks, 2); UNIT_ASSERT_VALUES_EQUAL(realDataWeight, totalSize); - TString expectedFirstChunkTableContent = JoinRange(TStringBuf(), tableYsonRows.begin(), tableYsonRows.begin() + 2); - TString expectedSecondChunkTableContent = JoinRange(TStringBuf(), tableYsonRows.begin() + 2, tableYsonRows.end()); + TString expectedFirstChunkTableContent = JoinRange(TStringBuf(), TableYsonRows.begin(), TableYsonRows.begin() + 2); + TString expectedSecondChunkTableContent = JoinRange(TStringBuf(), TableYsonRows.begin() + 2, TableYsonRows.end()); auto firstChunkTableKey = GetTableDataServiceKey("tableId", "partId", 0); - auto firstChunkTableContent = tableDataServicePtr->Get(firstChunkTableKey).GetValueSync(); + auto firstChunkTableContent = tableDataService->Get(firstChunkTableKey).GetValueSync(); auto secondChunkTableKey = GetTableDataServiceKey("tableId", "partId", 1); - auto secondChunkTableContent = tableDataServicePtr->Get(secondChunkTableKey).GetValueSync(); + auto secondChunkTableContent = tableDataService->Get(secondChunkTableKey).GetValueSync(); UNIT_ASSERT_NO_DIFF(*firstChunkTableContent, expectedFirstChunkTableContent); UNIT_ASSERT_NO_DIFF(*secondChunkTableContent, expectedSecondChunkTableContent); } + Y_UNIT_TEST(RecordIsLargerThanMaxRowWeight) { + ui64 chunkSize = 1, maxRowWeight = 3; + auto rowSize = TableYsonRows[0].size(); + ITableDataService::TPtr tableDataService = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + TString expectedErrorMessage = TStringBuilder() << rowSize << " is larger than max row weight: " << maxRowWeight; + UNIT_ASSERT_EXCEPTION_CONTAINS( + WriteDataToTableDataSerice(tableDataService, TableYsonRows, chunkSize, maxRowWeight), + yexception, + expectedErrorMessage + ); + } } } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/ya.make b/yt/yql/providers/yt/fmr/job/impl/ya.make index fbd3c8f4744..02b5058984a 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ya.make +++ b/yt/yql/providers/yt/fmr/job/impl/ya.make @@ -8,6 +8,7 @@ SRCS( PEERDIR( library/cpp/threading/future + library/cpp/yson/node yt/cpp/mapreduce/interface yt/yql/providers/yt/fmr/job/interface yt/yql/providers/yt/fmr/request_options diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp index 358c4215970..66067923131 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp @@ -1,4 +1,5 @@ #include <library/cpp/threading/future/core/future.h> +#include <library/cpp/yson/node/node_io.h> #include <util/stream/file.h> @@ -16,12 +17,16 @@ namespace NYql::NFmr { class TFmrJob: public IFmrJob { public: - TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TMaybe<TFmrJobSettings>& settings) - : TableDataService_(tableDataService), YtService_(ytService), CancelFlag_(cancelFlag), Settings_(settings) + TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, const TFmrJobSettings& settings) + : TableDataService_(tableDataService), YtService_(ytService), Settings_(settings) { } - virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) override { + virtual std::variant<TError, TStatistics> Download( + const TDownloadTaskParams& params, + const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections, + std::shared_ptr<std::atomic<bool>> cancelFlag + ) override { try { const auto ytTable = params.Input; const auto cluster = params.Input.Cluster; @@ -32,13 +37,13 @@ public: YQL_CLOG(DEBUG, FastMapReduce) << "Downloading " << cluster << '.' << path; YQL_ENSURE(clusterConnections.size() == 1); - auto ytTableReader = YtService_->MakeReader(ytTable, clusterConnections.begin()->second); // TODO - pass YtReader settings from Gateway - auto tableDataServiceWriter = TFmrTableDataServiceWriter(tableId, partId, TableDataService_, GetFmrTableDataServiceWriterSettings()); + auto ytTableReader = YtService_->MakeReader(ytTable, clusterConnections.begin()->second, Settings_.YtReaderSettings); + auto tableDataServiceWriter = MakeIntrusive<TFmrTableDataServiceWriter>(tableId, partId, TableDataService_, Settings_.FmrWriterSettings); - ParseRecords(*ytTableReader, tableDataServiceWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize); - tableDataServiceWriter.Flush(); + ParseRecords(ytTableReader, tableDataServiceWriter, Settings_.ParseRecordSettings.DonwloadReadBlockCount, Settings_.ParseRecordSettings.DonwloadReadBlockSize, cancelFlag); + tableDataServiceWriter->Flush(); - TTableStats stats = tableDataServiceWriter.GetStats(); + TTableStats stats = tableDataServiceWriter->GetStats(); auto statistics = TStatistics({{output, stats}}); return statistics; } catch (...) { @@ -46,7 +51,11 @@ public: } } - virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) override { + virtual std::variant<TError, TStatistics> Upload( + const TUploadTaskParams& params, + const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections, + std::shared_ptr<std::atomic<bool>> cancelFlag + ) override { try { const auto ytTable = params.Output; const auto cluster = params.Output.Cluster; @@ -56,10 +65,10 @@ public: YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path; - auto tableDataServiceReader = TFmrTableDataServiceReader(tableId, tableRanges, TableDataService_, GetFmrTableDataServiceReaderSettings()); + auto tableDataServiceReader = MakeIntrusive<TFmrTableDataServiceReader>(tableId, tableRanges, TableDataService_, Settings_.FmrReaderSettings); YQL_ENSURE(clusterConnections.size() == 1); - auto ytTableWriter = YtService_->MakeWriter(ytTable, clusterConnections.begin()->second); - ParseRecords(tableDataServiceReader, *ytTableWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize); + auto ytTableWriter = YtService_->MakeWriter(ytTable, clusterConnections.begin()->second, Settings_.YtWriterSettings); + ParseRecords(tableDataServiceReader, ytTableWriter, Settings_.ParseRecordSettings.UploadReadBlockCount, Settings_.ParseRecordSettings.UploadReadBlockSize, cancelFlag); ytTableWriter->Flush(); return TStatistics(); @@ -68,28 +77,34 @@ public: } } - virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) override { - // расширить таск парамс. добавить туда мету + virtual std::variant<TError, TStatistics> Merge( + const TMergeTaskParams& params, + const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections, + std::shared_ptr<std::atomic<bool>> cancelFlag + ) override { try { const auto inputs = params.Input; const auto output = params.Output; YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs"; + auto& parseRecordSettings = Settings_.ParseRecordSettings; - auto tableDataServiceWriter = TFmrTableDataServiceWriter(output.TableId, output.PartId, TableDataService_, GetFmrTableDataServiceWriterSettings()); + auto tableDataServiceWriter = MakeIntrusive<TFmrTableDataServiceWriter>(output.TableId, output.PartId, TableDataService_, Settings_.FmrWriterSettings); + auto threadPool = CreateThreadPool(parseRecordSettings.MergeNumThreads); + TMaybe<TMutex> mutex = TMutex(); for (const auto& inputTableRef : inputs) { - if (CancelFlag_->load()) { - return TError("Canceled"); - } auto inputTableReader = GetTableInputStream(inputTableRef, clusterConnections); - ParseRecords(*inputTableReader, tableDataServiceWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize); + threadPool->SafeAddFunc([&, inputTableReader] { + ParseRecords(inputTableReader, tableDataServiceWriter, parseRecordSettings.MergeReadBlockCount, parseRecordSettings.MergeReadBlockSize, cancelFlag, mutex); + }); } - tableDataServiceWriter.Flush(); - return TStatistics({{output, tableDataServiceWriter.GetStats()}}); + threadPool->Stop(); + + tableDataServiceWriter->Flush(); + return TStatistics({{output, tableDataServiceWriter->GetStats()}}); } catch (...) { return TError(CurrentExceptionMessage()); } - return TError{"not implemented yet"}; } private: @@ -99,61 +114,46 @@ private: if (ytTable) { TFmrTableId tableId = {ytTable->Cluster, ytTable->Path}; auto clusterConnection = clusterConnections.at(tableId); - return YtService_->MakeReader(*ytTable, clusterConnection); // TODO - pass YtReader settings from Gateway + return YtService_->MakeReader(*ytTable, clusterConnection, Settings_.YtReaderSettings); } else if (fmrTable) { - return MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, GetFmrTableDataServiceReaderSettings()); + return MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, Settings_.FmrReaderSettings); } else { ythrow yexception() << "Unsupported table type"; } } - TParseRecordSettings GetParseRecordSettings() const { - return Settings_ ? Settings_->ParseRecordSettings : TParseRecordSettings(); - } - - TFmrTableDataServiceReaderSettings GetFmrTableDataServiceReaderSettings() const { - return Settings_ ? Settings_->FmrTableDataServiceReaderSettings : TFmrTableDataServiceReaderSettings(); - } - - TFmrTableDataServiceWriterSettings GetFmrTableDataServiceWriterSettings() const { - return Settings_ ? Settings_->FmrTableDataServiceWriterSettings : TFmrTableDataServiceWriterSettings(); - } - private: ITableDataService::TPtr TableDataService_; IYtService::TPtr YtService_; - std::shared_ptr<std::atomic<bool>> CancelFlag_; - TMaybe<TFmrJobSettings> Settings_; + TFmrJobSettings Settings_; }; IFmrJob::TPtr MakeFmrJob( ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, - std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings ) { - return MakeIntrusive<TFmrJob>(tableDataService, ytService, cancelFlag, settings); + return MakeIntrusive<TFmrJob>(tableDataService, ytService, settings); } TJobResult RunJob( TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, - std::shared_ptr<std::atomic<bool>> cancelFlag, - const TMaybe<TFmrJobSettings>& settings + std::shared_ptr<std::atomic<bool>> cancelFlag ) { - TFmrJobSettings jobSettings = settings ? *settings : GetJobSettingsFromTask(task); - IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag, jobSettings); + TFmrJobSettings jobSettings = GetJobSettingsFromTask(task); + IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, jobSettings); - auto processTask = [job, task] (auto&& taskParams) { + auto processTask = [job, task, cancelFlag] (auto&& taskParams) { using T = std::decay_t<decltype(taskParams)>; if constexpr (std::is_same_v<T, TUploadTaskParams>) { - return job->Upload(taskParams, task->ClusterConnections); + return job->Upload(taskParams, task->ClusterConnections, cancelFlag); } else if constexpr (std::is_same_v<T, TDownloadTaskParams>) { - return job->Download(taskParams, task->ClusterConnections); + return job->Download(taskParams, task->ClusterConnections, cancelFlag); } else if constexpr (std::is_same_v<T, TMergeTaskParams>) { - return job->Merge(taskParams, task->ClusterConnections); + return job->Merge(taskParams, task->ClusterConnections, cancelFlag); } else { throw std::runtime_error{"Unsupported task type"}; } @@ -176,28 +176,26 @@ TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task) { auto jobSettings = *task->JobSettings; YQL_ENSURE(jobSettings.IsMap()); TFmrJobSettings resultSettings{}; - if (jobSettings.HasKey("parse_record_settings")) { - auto& parseRecordSettings = jobSettings["parse_record_settings"]; - if (parseRecordSettings.HasKey("block_count")) { - resultSettings.ParseRecordSettings.BlockCount = parseRecordSettings["block_count"].AsInt64(); - } - if (parseRecordSettings.HasKey("block_size")) { - resultSettings.ParseRecordSettings.BlockSize = parseRecordSettings["block_size"].AsInt64(); - // TODO - support different formats (B, MB, ...) - } - } - if (jobSettings.HasKey("fmr_reader_settings")) { - auto& fmrReaderSettings = jobSettings["fmr_reader_settings"]; - if (fmrReaderSettings.HasKey("read_ahead_chunks")) { - resultSettings.FmrTableDataServiceReaderSettings.ReadAheadChunks = fmrReaderSettings["read_ahead_chunks"].AsInt64(); - } - } - if (jobSettings.HasKey("fmr_writer_settings")) { - auto& fmrWriterSettings = jobSettings["fmr_writer_settings"]; - if (fmrWriterSettings.HasKey("chunk_size")) { - resultSettings.FmrTableDataServiceWriterSettings.ChunkSize = fmrWriterSettings["chunk_size"].AsInt64(); - } - } + + auto& parseRecordSettings = resultSettings.ParseRecordSettings; + parseRecordSettings.MergeReadBlockCount = jobSettings["merge"]["read_block_count"].AsInt64(); + parseRecordSettings.MergeReadBlockSize = jobSettings["merge"]["read_block_size"].AsInt64(); + parseRecordSettings.MergeNumThreads = jobSettings["merge"]["num_threads"].AsInt64(); + + parseRecordSettings.UploadReadBlockCount = jobSettings["upload"]["read_block_count"].AsInt64(); + parseRecordSettings.UploadReadBlockSize = jobSettings["upload"]["read_block_size"].AsInt64(); + + auto& jobIoSettings = jobSettings["job_io"]; + resultSettings.FmrReaderSettings.ReadAheadChunks = jobIoSettings["fmr_table_reader"]["inflight_chunks"].AsInt64(); + + auto& fmrWriterSettings = resultSettings.FmrWriterSettings; + fmrWriterSettings.MaxInflightChunks = jobIoSettings["fmr_table_writer"]["inflight_chunks"].AsInt64(); + fmrWriterSettings.ChunkSize = jobIoSettings["fmr_table_writer"]["chunk_size"].AsInt64(); + fmrWriterSettings.MaxRowWeight = jobIoSettings["fmr_table_writer"]["max_row_weight"].AsInt64(); + + resultSettings.YtWriterSettings.MaxRowWeight = jobIoSettings["yt_table_writer"]["max_row_weight"].AsInt64(); + + // TODO - maybe pass other optional settings here. return resultSettings; } diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h index cb21e95f9c9..09b17090427 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h @@ -1,3 +1,5 @@ +#pragma once + #include <yt/cpp/mapreduce/interface/fwd.h> #include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h> #include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h> @@ -8,19 +10,27 @@ namespace NYql::NFmr { struct TParseRecordSettings { - ui64 BlockCount = 1; - ui64 BlockSize = 1024 * 1024; + ui64 MergeReadBlockCount = 1; + ui64 MergeReadBlockSize = 1024 * 1024; + ui64 MergeNumThreads = 3; + ui64 UploadReadBlockCount = 1; + ui64 UploadReadBlockSize = 1024 * 1024; + ui64 DonwloadReadBlockCount = 1; + ui64 DonwloadReadBlockSize = 1024 * 1024; // TODO - remove download }; struct TFmrJobSettings { TParseRecordSettings ParseRecordSettings = TParseRecordSettings(); - TFmrTableDataServiceReaderSettings FmrTableDataServiceReaderSettings = TFmrTableDataServiceReaderSettings(); - TFmrTableDataServiceWriterSettings FmrTableDataServiceWriterSettings = TFmrTableDataServiceWriterSettings(); + TFmrReaderSettings FmrReaderSettings = TFmrReaderSettings(); + TFmrWriterSettings FmrWriterSettings = TFmrWriterSettings(); + TYtReaderSettings YtReaderSettings = TYtReaderSettings(); + TYtWriterSettings YtWriterSettings = TYtWriterSettings(); + ui64 NumThreads = 0; }; -IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings = {}); +IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, const TFmrJobSettings& settings = {}); -TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TMaybe<TFmrJobSettings>& settings = Nothing()); +TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag); TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task); diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp index 1223debe79a..9b2caf44aa4 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp @@ -9,7 +9,7 @@ TFmrTableDataServiceReader::TFmrTableDataServiceReader( const TString& tableId, const std::vector<TTableRange>& tableRanges, ITableDataService::TPtr tableDataService, - const TFmrTableDataServiceReaderSettings& settings + const TFmrReaderSettings& settings ) : TableId_(tableId), TableRanges_(tableRanges), diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h index c64b7b20745..d56bb0eb0d7 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h @@ -9,7 +9,7 @@ namespace NYql::NFmr { -struct TFmrTableDataServiceReaderSettings { +struct TFmrReaderSettings { ui64 ReadAheadChunks = 1; }; @@ -24,7 +24,7 @@ public: const TString& tableId, const std::vector<TTableRange>& tableRanges, ITableDataService::TPtr tableDataService, - const TFmrTableDataServiceReaderSettings& settings = TFmrTableDataServiceReaderSettings{} + const TFmrReaderSettings& settings = TFmrReaderSettings{} ); bool Retry(const TMaybe<ui32>&, const TMaybe<ui64>&, const std::exception_ptr&) override; diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp index 7b1ffe70856..139e57368cf 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp @@ -1,5 +1,8 @@ #include "yql_yt_table_data_service_writer.h" +#include <library/cpp/threading/future/wait/wait.h> #include <util/string/join.h> +#include <yql/essentials/utils/yql_panic.h> + namespace NYql::NFmr { @@ -7,25 +10,71 @@ TFmrTableDataServiceWriter::TFmrTableDataServiceWriter( const TString& tableId, const TString& partId, ITableDataService::TPtr tableDataService, - const TFmrTableDataServiceWriterSettings& settings + const TFmrWriterSettings& settings ) - : TableId_(tableId), PartId_(partId), TableDataService_(tableDataService), ChunkSize_(settings.ChunkSize) {} + : TableId_(tableId), + PartId_(partId), + TableDataService_(tableDataService), + ChunkSize_(settings.ChunkSize), + MaxInflightChunks_(settings.MaxInflightChunks), + MaxRowWeight_(settings.MaxRowWeight) +{ + YQL_ENSURE(MaxRowWeight_ >= ChunkSize_); +} void TFmrTableDataServiceWriter::DoWrite(const void* buf, size_t len) { TableContent_.Append(static_cast<const char*>(buf), len); } void TFmrTableDataServiceWriter::NotifyRowEnd() { - Rows_ += 1; + ++Rows_; + if (TableContent_.size() >= MaxRowWeight_) { + ythrow yexception() << "Current row size: " << TableContent_.size() << " is larger than max row weight: " << MaxRowWeight_; + } if (TableContent_.size() >= ChunkSize_) { - DoFlush(); + PutRows(); } } void TFmrTableDataServiceWriter::DoFlush() { + PutRows(); + with_lock(State_->Mutex) { + if (State_->Exception) { + std::rethrow_exception(State_->Exception); + } + State_->CondVar.Wait(State_->Mutex, [this] { + return State_->CurInflightChunks == 0; + }); + } +} + +void TFmrTableDataServiceWriter::PutRows() { + with_lock(State_->Mutex) { + State_->CondVar.Wait(State_->Mutex, [&] { + return State_->CurInflightChunks < MaxInflightChunks_; + }); + ++State_->CurInflightChunks; + } TString chunkKey = GetTableDataServiceKey(TableId_, PartId_, ChunkCount_); - TableDataService_->Put(chunkKey, TString(TableContent_.Data(), TableContent_.Size())).Wait(); - ChunkCount_++; + TableDataService_->Put(chunkKey, TString(TableContent_.Data(), TableContent_.Size())).Subscribe( + [weakState = std::weak_ptr(State_)] (const auto& putFuture) mutable { + std::shared_ptr<TFmrWriterState> state = weakState.lock(); + if (state) { + with_lock(state->Mutex) { + --state->CurInflightChunks; + try { + putFuture.GetValue(); + } catch (...) { + if (!state->Exception) { + state->Exception = std::current_exception(); + } + } + state->CondVar.Signal(); + } + } + } + ); + ++ChunkCount_; DataWeight_ += TableContent_.Size(); TableContent_.Clear(); } diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h index 2e27cd4f072..544b9fc3bd1 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h @@ -2,6 +2,7 @@ #include <util/generic/buffer.h> #include <util/stream/output.h> +#include <util/system/condvar.h> #include <yt/cpp/mapreduce/interface/io.h> #include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> #include <yt/yql/providers/yt/fmr/table_data_service/interface/yql_yt_table_data_service.h> @@ -9,8 +10,12 @@ namespace NYql::NFmr { -struct TFmrTableDataServiceWriterSettings { + + +struct TFmrWriterSettings { ui64 ChunkSize = 1024 * 1024; + ui64 MaxInflightChunks = 1; + ui64 MaxRowWeight = 1024 * 1024 * 16; }; class TFmrTableDataServiceWriter: public NYT::TRawTableWriter { @@ -19,7 +24,7 @@ public: const TString& tableId, const TString& partId, ITableDataService::TPtr tableDataService, - const TFmrTableDataServiceWriterSettings& settings = TFmrTableDataServiceWriterSettings() + const TFmrWriterSettings& settings = TFmrWriterSettings() ); TTableStats GetStats(); @@ -32,6 +37,9 @@ protected: void DoFlush() override; private: + void PutRows(); + +private: const TString TableId_; const TString PartId_; ITableDataService::TPtr TableDataService_; @@ -40,7 +48,18 @@ private: TBuffer TableContent_; const ui64 ChunkSize_; // size at which we push to table data service + const ui64 MaxInflightChunks_; + const ui64 MaxRowWeight_; + ui64 ChunkCount_ = 0; + + struct TFmrWriterState { + ui64 CurInflightChunks = 0; + TMutex Mutex = TMutex(); + TCondVar CondVar; + std::exception_ptr Exception; + }; + std::shared_ptr<TFmrWriterState> State_ = std::make_shared<TFmrWriterState>(); }; } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h index 18d9553ba01..e6a8ab6ed88 100644 --- a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h +++ b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h @@ -10,11 +10,11 @@ public: virtual ~IFmrJob() = default; - virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}) = 0; + virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}, std::shared_ptr<std::atomic<bool>> cancelFlag = nullptr) = 0; - virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}) = 0; + virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}, std::shared_ptr<std::atomic<bool>> cancelFlag = nullptr) = 0; - virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}) = 0; + virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}, std::shared_ptr<std::atomic<bool>> cancelFlag = nullptr) = 0; }; } // namespace NYql diff --git a/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp b/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp index 85e9599f8f8..a0736aa96f3 100644 --- a/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp +++ b/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp @@ -19,7 +19,8 @@ Y_UNIT_TEST_SUITE(ParseRecordTests) { auto reader = ytService->MakeReader(testYtTable, TClusterConnection()); auto writer = ytService->MakeWriter(testYtTable, TClusterConnection()); - ParseRecords(*reader, *writer, 1, 10); + auto cancelFlag = std::make_shared<std::atomic<bool>>(false); + ParseRecords(reader, writer, 1, 10, cancelFlag); writer->Flush(); UNIT_ASSERT_VALUES_EQUAL(outputTables.size(), 1); UNIT_ASSERT(outputTables.contains(testYtTable)); diff --git a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp index 78878f18ea7..c744f7f5ae7 100644 --- a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp +++ b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp @@ -1,15 +1,33 @@ #include "yql_yt_parse_records.h" +#include <yql/essentials/utils/log/log.h> using namespace NYql; namespace NYql::NFmr { -void ParseRecords(NYT::TRawTableReader& reader, NYT::TRawTableWriter& writer, ui64 blockCount, ui64 blockSize) { - auto blockReader = MakeBlockReader(reader, blockCount, blockSize); +void CheckCancelled(std::shared_ptr<std::atomic<bool>> cancelFlag) { + if (cancelFlag->load()) { + ythrow yexception() << " Job was cancelled, aborting"; + } +} + +void ParseRecords( + NYT::TRawTableReaderPtr reader, + NYT::TRawTableWriterPtr writer, + ui64 blockCount, + ui64 blockSize, + std::shared_ptr<std::atomic<bool>> cancelFlag, + const TMaybe<TMutex>& writeMutex +) { + auto blockReader = MakeBlockReader(*reader, blockCount, blockSize); NCommon::TInputBuf inputBuf(*blockReader, nullptr); TVector<char> curYsonRow; char cmd; - while (inputBuf.TryRead(cmd)) { + while (true) { + CheckCancelled(cancelFlag); + if (!inputBuf.TryRead(cmd)) { + break; + } curYsonRow.clear(); CopyYson(cmd, inputBuf, curYsonRow); bool needBreak = false; @@ -19,8 +37,16 @@ void ParseRecords(NYT::TRawTableReader& reader, NYT::TRawTableWriter& writer, ui YQL_ENSURE(cmd == ';'); curYsonRow.emplace_back(cmd); } - writer.Write(curYsonRow.data(), curYsonRow.size()); - writer.NotifyRowEnd(); + CheckCancelled(cancelFlag); + if (writeMutex) { + with_lock(*writeMutex) { + writer->Write(curYsonRow.data(), curYsonRow.size()); + writer->NotifyRowEnd(); + } + } else { + writer->Write(curYsonRow.data(), curYsonRow.size()); + writer->NotifyRowEnd(); + } if (needBreak) { break; } diff --git a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h index 9d00d2c9e15..79d81dfa72d 100644 --- a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h +++ b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h @@ -1,9 +1,18 @@ #include <yt/cpp/mapreduce/interface/io.h> #include <yt/yql/providers/yt/codec/yt_codec_io.h> +#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h> namespace NYql::NFmr { -void ParseRecords(NYT::TRawTableReader& reader, NYT::TRawTableWriter& writer, ui64 blockCount, ui64 blockSize); +void CheckCancelled(std::shared_ptr<std::atomic<bool>> cancelFlag); + +void ParseRecords( + NYT::TRawTableReaderPtr reader, + NYT::TRawTableWriterPtr writer, + ui64 blockCount, + ui64 blockSize, + std::shared_ptr<std::atomic<bool>> cancelFlag, + const TMaybe<TMutex>& writeMutex = Nothing()); } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp index d618766edeb..4634a76031b 100644 --- a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp +++ b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp @@ -28,13 +28,17 @@ public: NYT::TRawTableWriterPtr MakeWriter( const TYtTableRef& ytTable, const TClusterConnection& clusterConnection, - const TYtWriterSettings& /*writerSetttings*/ + const TYtWriterSettings& writerSetttings ) override { auto client = CreateClient(clusterConnection); auto transaction = client->AttachTransaction(GetGuid(clusterConnection.TransactionId)); TString ytPath = NYT::AddPathPrefix(ytTable.Path, "//"); auto richPath = NYT::TRichYPath(ytPath).Append(true); - return transaction->CreateRawWriter(richPath, NYT::TFormat::YsonBinary()); // TODO - support writerOptions + auto writerOptions = NYT::TTableWriterOptions(); + if (writerSetttings.MaxRowWeight) { + writerOptions.Config(NYT::TNode()("max_row_weight", *writerSetttings.MaxRowWeight)); + } + return transaction->CreateRawWriter(richPath, NYT::TFormat::YsonBinary(), writerOptions); } private: diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h index 7ed2a47fc00..77204493e70 100644 --- a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h +++ b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h @@ -11,6 +11,7 @@ struct TYtReaderSettings { }; struct TYtWriterSettings { + TMaybe<ui64> MaxRowWeight = Nothing(); }; class IYtService: public TThrRefBase { diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp index 260c8b69647..25bda1b40fc 100644 --- a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp +++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp @@ -119,6 +119,11 @@ TYtRunTool::TYtRunTool(TString name) .Optional() .NoArgument() .SetFlag(&DisableLocalFmrWorker_); + + opts.AddLongOption( "fmr-operation-spec-path", "Path to file with fmr operation spec settings") + .Optional() + .StoreResult(&FmrOperationSpecFilePath_); + }); GetRunOptions().AddOptHandler([this](const NLastGetopt::TOptsParseResult& res) { @@ -186,7 +191,7 @@ IYtGateway::TPtr TYtRunTool::CreateYtGateway() { return ytGateway; } - auto [fmrGateway, worker] = NFmr::InitializeFmrGateway(ytGateway, DisableLocalFmrWorker_, FmrCoordinatorServerUrl_); + auto [fmrGateway, worker] = NFmr::InitializeFmrGateway(ytGateway, DisableLocalFmrWorker_, FmrCoordinatorServerUrl_, false, FmrOperationSpecFilePath_); FmrWorker_ = std::move(worker); return fmrGateway; } diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.h b/yt/yql/tools/ytrun/lib/ytrun_lib.h index 9fa4ee3df5a..b90fddcb6ba 100644 --- a/yt/yql/tools/ytrun/lib/ytrun_lib.h +++ b/yt/yql/tools/ytrun/lib/ytrun_lib.h @@ -34,6 +34,7 @@ protected: NFmr::IFmrWorker::TPtr FmrWorker_; TString FmrCoordinatorServerUrl_; bool DisableLocalFmrWorker_ = false; + TString FmrOperationSpecFilePath_; }; } // NYql |