aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2025-04-15 21:10:57 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-04-15 21:44:05 +0300
commitb628b47d09aeb527f1387fc20b7d188ed34db36c (patch)
tree2d9fe60cd51b42451dfee5d59d711eb8e0a40739
parent99be0c2021948af5599a38f3f205529e3a87b4f4 (diff)
downloadydb-b628b47d09aeb527f1387fc20b7d188ed34db36c.tar.gz
Intermediate changes
commit_hash:c11c6ef3a9265cf7558aebc0ade265d419966dd6
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson26
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp4
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ya.make5
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp38
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h10
-rw-r--r--yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp12
-rw-r--r--yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h8
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp44
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_reader_ut.cpp18
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp72
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp138
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h22
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp2
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h4
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp61
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h23
-rw-r--r--yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h6
-rw-r--r--yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp3
-rw-r--r--yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp36
-rw-r--r--yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h11
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp8
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h1
-rw-r--r--yt/yql/tools/ytrun/lib/ytrun_lib.cpp7
-rw-r--r--yt/yql/tools/ytrun/lib/ytrun_lib.h1
25 files changed, 383 insertions, 178 deletions
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson b/yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson
new file mode 100644
index 00000000000..203b3d1ce66
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson
@@ -0,0 +1,26 @@
+{
+ "merge" = {
+ "read_block_count" = 4;
+ "read_block_size" = 1048576;
+ "num_threads" = 3;
+ };
+ "upload" = {
+ "read_block_count" = 4;
+ "read_block_size" = 1048576;
+ };
+ "job_io" = {
+ "fmr_table_reader" = {
+ "inflight_chunks" = 1;
+ };
+ "yt_table_reader" = {
+ };
+ "fmr_table_writer" = {
+ "max_row_weight" = 16777216;
+ "chunk_size" = 1048576;
+ "inflight_chunks" = 1;
+ };
+ "yt_table_writer" = {
+ "max_row_weight" = 16777216;
+ };
+ };
+} \ No newline at end of file
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
index c64bc88aaf2..0de9cb8fda0 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
@@ -234,8 +234,8 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::InProgress);
}
Y_UNIT_TEST(RetryRunningOperationAfterIdempotencyKeyClear) {
- TFmrCoordinatorSettings coordinatorSettings{
- .WorkersNum = 1, .RandomProvider = CreateDeterministicRandomProvider(2), .IdempotencyKeyStoreTime = TDuration::Seconds(1)};
+ auto coordinatorSettings = TFmrCoordinatorSettings();
+ coordinatorSettings.IdempotencyKeyStoreTime = TDuration::Seconds(1);
auto coordinator = MakeFmrCoordinator(coordinatorSettings);
TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction};
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make
index 32ffb2ed14f..20abd2bbc84 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make
@@ -6,6 +6,7 @@ SRCS(
PEERDIR(
library/cpp/random_provider
+ library/cpp/resource
library/cpp/threading/future
library/cpp/yson/node
yt/cpp/mapreduce/common
@@ -14,6 +15,10 @@ PEERDIR(
yql/essentials/utils
)
+RESOURCE(
+ default_coordinator_settings.yson default_coordinator_settings.yson
+)
+
YQL_LAST_ABI_VERSION()
END()
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
index 625f034c3fd..ed772647314 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
@@ -1,4 +1,5 @@
#include <thread>
+#include <library/cpp/resource/resource.h>
#include <yt/cpp/mapreduce/common/helpers.h>
#include <yql/essentials/utils/log/log.h>
#include <yql/essentials/utils/yql_panic.h>
@@ -6,6 +7,14 @@
namespace NYql::NFmr {
+TFmrCoordinatorSettings::TFmrCoordinatorSettings() {
+ DefaultFmrOperationSpec = NYT::NodeFromYsonString(NResource::Find("default_coordinator_settings.yson"));
+ WorkersNum = 1;
+ RandomProvider = CreateDefaultRandomProvider(),
+ IdempotencyKeyStoreTime = TDuration::Seconds(10);
+ TimeToSleepBetweenClearKeyRequests = TDuration::Seconds(1);
+}
+
namespace {
struct TCoordinatorTaskInfo {
@@ -161,7 +170,6 @@ public:
YQL_ENSURE(fmrTableId.PartId == curTableStats.PartId);
if (taskStatus == ETaskStatus::Completed) {
YQL_CLOG(DEBUG, FastMapReduce) << "Current statistic from table with id" << fmrTableId.TableId << "_" << fmrTableId.PartId << ": " << tableStats;
- Cerr << "Current statistic from table with id" << fmrTableId.TableId << "_" << fmrTableId.PartId << ": " << tableStats;
}
}
Operations_[operationId].OutputTableIds.emplace_back(fmrTableId.TableId);
@@ -327,26 +335,14 @@ private:
}
}
- TMaybe<NYT::TNode> GetJobSettings(const TMaybe<NYT::TNode>& currentFmrOperationSpec) {
- // TODO - check this works
- TMaybe<NYT::TNode> defaultJobSettings = Nothing(), currentJobSettings = Nothing();
- if (currentFmrOperationSpec && currentFmrOperationSpec->HasKey("job_settings")) {
- currentJobSettings = (*currentFmrOperationSpec)["job_settings"];
- }
- if (DefaultFmrOperationSpec_ && DefaultFmrOperationSpec_->HasKey("job_settings")) {
- defaultJobSettings = (*DefaultFmrOperationSpec_)["job_settings"];
- }
- if (defaultJobSettings && !currentJobSettings) {
- return defaultJobSettings;
- }
- if (currentJobSettings && !defaultJobSettings) {
- return currentJobSettings;
- }
- if (!currentJobSettings && !defaultJobSettings) {
- return Nothing();
+ NYT::TNode GetJobSettings(const TMaybe<NYT::TNode>& currentFmrOperationSpec) {
+ // For now fmr operation spec only consists of job settings
+ if (!currentFmrOperationSpec) {
+ return DefaultFmrOperationSpec_;
}
- NYT::MergeNodes(*currentJobSettings, *defaultJobSettings);
- return currentJobSettings;
+ auto resultFmrOperationSpec = DefaultFmrOperationSpec_;
+ NYT::MergeNodes(resultFmrOperationSpec, *currentFmrOperationSpec);
+ return resultFmrOperationSpec;
}
std::unordered_map<TString, TCoordinatorTaskInfo> Tasks_; // TaskId -> current info about it
@@ -363,7 +359,7 @@ private:
TDuration TimeToSleepBetweenClearKeyRequests_;
TDuration IdempotencyKeyStoreTime_;
std::unordered_map<TFmrTableId, TCoordinatorFmrTableStats> FmrTableStatistics_; // TableId -> Statistics
- TMaybe<NYT::TNode> DefaultFmrOperationSpec_;
+ NYT::TNode DefaultFmrOperationSpec_;
};
} // namespace
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
index 3fea218981f..563752dfc7b 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
@@ -1,7 +1,7 @@
#pragma once
#include <library/cpp/random_provider/random_provider.h>
-#include <library/cpp/yson/node/node.h>
+#include <library/cpp/yson/node/node_io.h>
#include <util/system/mutex.h>
#include <util/system/guard.h>
#include <util/generic/queue.h>
@@ -10,13 +10,15 @@
namespace NYql::NFmr {
struct TFmrCoordinatorSettings {
- ui32 WorkersNum; // Not supported yet
+ NYT::TNode DefaultFmrOperationSpec;
+ ui32 WorkersNum;
TIntrusivePtr<IRandomProvider> RandomProvider;
TDuration IdempotencyKeyStoreTime = TDuration::Seconds(10);
TDuration TimeToSleepBetweenClearKeyRequests = TDuration::Seconds(1);
- TMaybe<NYT::TNode> DefaultFmrOperationSpec = Nothing();
+
+ TFmrCoordinatorSettings();
};
-IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings = {.WorkersNum = 1, .RandomProvider = CreateDeterministicRandomProvider(2)});
+IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings = TFmrCoordinatorSettings());
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp
index 3bc8e074a3a..1d6bc504a5e 100644
--- a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp
+++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp
@@ -1,9 +1,17 @@
#include "yql_yt_fmr_initializer.h"
+#include <util/stream/file.h>
namespace NYql::NFmr {
-std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(IYtGateway::TPtr slave, bool disableLocalFmrWorker, const TString& coordinatorServerUrl, bool isFileGateway) {
- auto coordinator = MakeFmrCoordinator();
+std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(IYtGateway::TPtr slave, bool disableLocalFmrWorker, const TString& coordinatorServerUrl, bool isFileGateway, const TString& fmrOperationSpecFilePath) {
+ TFmrCoordinatorSettings coordinatorSettings{};
+ if (!fmrOperationSpecFilePath.empty()) {
+ TFileInput input(fmrOperationSpecFilePath);
+ auto fmrOperationSpec = NYT::NodeFromYsonStream(&input);
+ coordinatorSettings.DefaultFmrOperationSpec = fmrOperationSpec;
+ }
+
+ auto coordinator = MakeFmrCoordinator(coordinatorSettings);
if (!coordinatorServerUrl.empty()) {
TFmrCoordinatorClientSettings coordinatorClientSettings;
THttpURL parsedUrl;
diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h
index a87522fbd71..24f17928e17 100644
--- a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h
+++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h
@@ -14,6 +14,12 @@ namespace NYql::NFmr {
constexpr TStringBuf FastMapReduceGatewayName = "fmr";
-std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(IYtGateway::TPtr slave, bool disableLocalFmrWorker = false, const TString& coordinatorServerUrl = TString(), bool isFileGateway = false);
+std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(
+ IYtGateway::TPtr slave,
+ bool disableLocalFmrWorker = false,
+ const TString& coordinatorServerUrl = TString(),
+ bool isFileGateway = false,
+ const TString& fmrOperationSpecFilePath = TString()
+);
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
index 11b30362ee7..6b9ce068652 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
@@ -41,13 +41,13 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
std::unordered_map<TYtTableRef, TString> outputTables;
NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService);
TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id");
TDownloadTaskParams params = TDownloadTaskParams(input, output);
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- auto res = job->Download(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}});
+ auto res = job->Download(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}, cancelFlag);
auto err = std::get_if<TError>(&res);
auto statistics = std::get_if<TStatistics>(&res);
@@ -65,7 +65,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService);
TYtTableRef output = TYtTableRef("test_cluster", "test_path");
std::vector<TTableRange> ranges = {{"test_part_id"}};
@@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0);
tableDataServicePtr->Put(key, GetBinaryYson(TableContent_1));
- auto res = job->Upload(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}});
+ auto res = job->Upload(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}, cancelFlag);
auto err = std::get_if<TError>(&res);
@@ -95,7 +95,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
std::unordered_map<TYtTableRef, TString> outputTables;
NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables);
auto cancelFlag = std::make_shared<std::atomic<bool>>(false);
- IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService);
TTaskTableRef input_table_ref_1 = {input_1};
TTaskTableRef input_table_ref_2 = {input_2};
@@ -110,14 +110,20 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
tableDataServicePtr->Put(key_1, GetBinaryYson(TableContent_1));
tableDataServicePtr->Put(key_3, GetBinaryYson(TableContent_3));
- auto res = job->Merge(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}});
+ auto res = job->Merge(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}, cancelFlag);
auto err = std::get_if<TError>(&res);
UNIT_ASSERT_C(!err, err->ErrorMessage);
auto resultTableContentMaybe = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync();
UNIT_ASSERT_C(resultTableContentMaybe, "Result table content is empty");
TString resultTableContent = GetTextYson(*resultTableContentMaybe);
- UNIT_ASSERT_NO_DIFF(resultTableContent, TableContent_1 + TableContent_2 + TableContent_3);
+ TString expected = TableContent_1 + TableContent_2 + TableContent_3;
+ UNIT_ASSERT_VALUES_EQUAL(resultTableContent.size(), expected.size());
+ TString line;
+ TStringStream resultStream(resultTableContent);
+ while (resultStream.ReadLine(line)) {
+ UNIT_ASSERT(expected.Contains(line));
+ }
}
}
@@ -178,11 +184,11 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}});
// No tables in tableDataService
- try {
- RunJob(task, tableDataServicePtr, ytService, cancelFlag);
- } catch(...) {
- UNIT_ASSERT(CurrentExceptionMessage().Contains("No data for chunk:test_table_id:test_part_id"));
- }
+ UNIT_ASSERT_EXCEPTION_CONTAINS(
+ RunJob(task, tableDataServicePtr, ytService, cancelFlag),
+ yexception,
+ "No data for chunk:test_table_id:test_part_id"
+ );
}
Y_UNIT_TEST(RunMergeTask) {
@@ -214,9 +220,17 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed);
- auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync();
- UNIT_ASSERT_C(resultTableContent, "Result table content is empty");
- UNIT_ASSERT_NO_DIFF(GetTextYson(*resultTableContent), TableContent_1 + TableContent_2 + TableContent_3);
+ auto resultTableContentMaybe = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync();
+ UNIT_ASSERT_C(resultTableContentMaybe, "Result table content is empty");
+
+ TString resultTableContent = GetTextYson(*resultTableContentMaybe);
+ TString expected = TableContent_1 + TableContent_2 + TableContent_3;
+ UNIT_ASSERT_VALUES_EQUAL(resultTableContent.size(), expected.size());
+ TString line;
+ TStringStream resultStream(resultTableContent);
+ while (resultStream.ReadLine(line)) {
+ UNIT_ASSERT(expected.Contains(line));
+ }
}
}
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_reader_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_reader_ut.cpp
index 8e4522db03d..3d609b888f6 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_reader_ut.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_reader_ut.cpp
@@ -10,19 +10,19 @@ TString originalTableContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"ab
"{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
"{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
-Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) {
+Y_UNIT_TEST_SUITE(FmrReaderTests) {
Y_UNIT_TEST(ReadOneChunkSmallPart) {
size_t chunkSize = 1024;
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
- TFmrTableDataServiceWriterSettings settings{chunkSize};
+ TFmrWriterSettings settings{chunkSize};
TFmrTableDataServiceWriter outputWriter("tableId", "partId", tableDataServicePtr, settings);
outputWriter.Write(originalTableContent.data(), originalTableContent.size());
outputWriter.Flush();
- TFmrTableDataServiceReaderSettings readerSettings{1};
+ TFmrReaderSettings readerSettings{1};
std::vector<TTableRange> tableRanges = {{"partId", 0, 1}};
TFmrTableDataServiceReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings);
@@ -38,13 +38,13 @@ Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) {
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
- TFmrTableDataServiceWriterSettings settings{chunkSize};
+ TFmrWriterSettings settings{.ChunkSize= chunkSize, .MaxInflightChunks = 2};
TFmrTableDataServiceWriter outputStream("tableId", "partId", tableDataServicePtr, settings);
outputStream.Write(originalTableContent.data(), originalTableContent.size());
outputStream.Flush();
- TFmrTableDataServiceReaderSettings readerSettings{1};
+ TFmrReaderSettings readerSettings{1};
std::vector<TTableRange> tableRanges = {{"partId", 0, 1}};
TFmrTableDataServiceReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings);
@@ -56,7 +56,7 @@ Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) {
size_t chunkSize = 32;
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
- TFmrTableDataServiceWriterSettings settings{chunkSize};
+ TFmrWriterSettings settings{.ChunkSize= chunkSize, .MaxInflightChunks = 2};
TFmrTableDataServiceWriter outputStream("tableId", "partId", tableDataServicePtr, settings);
for (size_t i = 0; i < 3; ++i) {
@@ -65,7 +65,7 @@ Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) {
}
outputStream.Flush();
- TFmrTableDataServiceReaderSettings readerSettings{1};
+ TFmrReaderSettings readerSettings{1};
std::vector<TTableRange> tableRanges = {{"partId", 0, 3}};
TFmrTableDataServiceReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings);
@@ -77,7 +77,7 @@ Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) {
size_t chunkSize = 32;
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
- TFmrTableDataServiceWriterSettings settings{chunkSize};
+ TFmrWriterSettings settings{chunkSize};
TFmrTableDataServiceWriter outputStream("tableId", "partId", tableDataServicePtr, settings);
for (size_t i = 0; i < 3; ++i) {
@@ -86,7 +86,7 @@ Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) {
}
outputStream.Flush();
- TFmrTableDataServiceReaderSettings readerSettings{5};
+ TFmrReaderSettings readerSettings{5};
std::vector<TTableRange> tableRanges = {{"partId", 0, 3}};
TFmrTableDataServiceReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings);
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp
index 0add0e72cbb..925cdfff125 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp
@@ -5,46 +5,70 @@
namespace NYql::NFmr {
-Y_UNIT_TEST_SUITE(OutputStreamTests) {
+const std::vector<TString> TableYsonRows = {
+ "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};",
+ "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};",
+ "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};",
+ "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"
+};
+
+TTableStats WriteDataToTableDataSerice(
+ ITableDataService::TPtr tableDataService,
+ const std::vector<TString>& tableYsonRows,
+ ui64 chunkSize,
+ TMaybe<ui64> maxRowWeight = Nothing()
+) {
+ TFmrWriterSettings settings{.ChunkSize = chunkSize};
+ if (maxRowWeight) {
+ settings.MaxRowWeight = *maxRowWeight;
+ }
+ TFmrTableDataServiceWriter outputWriter("tableId", "partId", tableDataService, settings);
+
+ for (auto& row: tableYsonRows) {
+ outputWriter.Write(row.data(), row.size());
+ outputWriter.NotifyRowEnd();
+ }
+ outputWriter.Flush();
+ return outputWriter.GetStats();
+}
+
+Y_UNIT_TEST_SUITE(FmrWriterTests) {
Y_UNIT_TEST(WriteYsonRows) {
- std::vector<TString> tableYsonRows = {
- "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};",
- "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};",
- "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};",
- "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"
- };
ui64 totalSize = 0;
- for (auto& row: tableYsonRows) {
+ for (auto& row: TableYsonRows) {
totalSize += row.size();
}
ui64 chunkSize = totalSize / 2;
- ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
- TFmrTableDataServiceWriterSettings settings{.ChunkSize = chunkSize};
- TFmrTableDataServiceWriter outputWriter("tableId", "partId", tableDataServicePtr, settings);
-
- for (auto& row: tableYsonRows) {
- outputWriter.Write(row.data(), row.size());
- outputWriter.NotifyRowEnd();
- }
- outputWriter.Flush();
-
- auto realChunks = outputWriter.GetStats().Chunks;
- auto realDataWeight = outputWriter.GetStats().DataWeight;
+ ITableDataService::TPtr tableDataService = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+ auto stats = WriteDataToTableDataSerice(tableDataService, TableYsonRows, chunkSize);
+ auto realChunks = stats.Chunks;
+ auto realDataWeight =stats.DataWeight;
UNIT_ASSERT_VALUES_EQUAL(realChunks, 2);
UNIT_ASSERT_VALUES_EQUAL(realDataWeight, totalSize);
- TString expectedFirstChunkTableContent = JoinRange(TStringBuf(), tableYsonRows.begin(), tableYsonRows.begin() + 2);
- TString expectedSecondChunkTableContent = JoinRange(TStringBuf(), tableYsonRows.begin() + 2, tableYsonRows.end());
+ TString expectedFirstChunkTableContent = JoinRange(TStringBuf(), TableYsonRows.begin(), TableYsonRows.begin() + 2);
+ TString expectedSecondChunkTableContent = JoinRange(TStringBuf(), TableYsonRows.begin() + 2, TableYsonRows.end());
auto firstChunkTableKey = GetTableDataServiceKey("tableId", "partId", 0);
- auto firstChunkTableContent = tableDataServicePtr->Get(firstChunkTableKey).GetValueSync();
+ auto firstChunkTableContent = tableDataService->Get(firstChunkTableKey).GetValueSync();
auto secondChunkTableKey = GetTableDataServiceKey("tableId", "partId", 1);
- auto secondChunkTableContent = tableDataServicePtr->Get(secondChunkTableKey).GetValueSync();
+ auto secondChunkTableContent = tableDataService->Get(secondChunkTableKey).GetValueSync();
UNIT_ASSERT_NO_DIFF(*firstChunkTableContent, expectedFirstChunkTableContent);
UNIT_ASSERT_NO_DIFF(*secondChunkTableContent, expectedSecondChunkTableContent);
}
+ Y_UNIT_TEST(RecordIsLargerThanMaxRowWeight) {
+ ui64 chunkSize = 1, maxRowWeight = 3;
+ auto rowSize = TableYsonRows[0].size();
+ ITableDataService::TPtr tableDataService = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+ TString expectedErrorMessage = TStringBuilder() << rowSize << " is larger than max row weight: " << maxRowWeight;
+ UNIT_ASSERT_EXCEPTION_CONTAINS(
+ WriteDataToTableDataSerice(tableDataService, TableYsonRows, chunkSize, maxRowWeight),
+ yexception,
+ expectedErrorMessage
+ );
+ }
}
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job/impl/ya.make b/yt/yql/providers/yt/fmr/job/impl/ya.make
index fbd3c8f4744..02b5058984a 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ya.make
+++ b/yt/yql/providers/yt/fmr/job/impl/ya.make
@@ -8,6 +8,7 @@ SRCS(
PEERDIR(
library/cpp/threading/future
+ library/cpp/yson/node
yt/cpp/mapreduce/interface
yt/yql/providers/yt/fmr/job/interface
yt/yql/providers/yt/fmr/request_options
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
index 358c4215970..66067923131 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
@@ -1,4 +1,5 @@
#include <library/cpp/threading/future/core/future.h>
+#include <library/cpp/yson/node/node_io.h>
#include <util/stream/file.h>
@@ -16,12 +17,16 @@ namespace NYql::NFmr {
class TFmrJob: public IFmrJob {
public:
- TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TMaybe<TFmrJobSettings>& settings)
- : TableDataService_(tableDataService), YtService_(ytService), CancelFlag_(cancelFlag), Settings_(settings)
+ TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, const TFmrJobSettings& settings)
+ : TableDataService_(tableDataService), YtService_(ytService), Settings_(settings)
{
}
- virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) override {
+ virtual std::variant<TError, TStatistics> Download(
+ const TDownloadTaskParams& params,
+ const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections,
+ std::shared_ptr<std::atomic<bool>> cancelFlag
+ ) override {
try {
const auto ytTable = params.Input;
const auto cluster = params.Input.Cluster;
@@ -32,13 +37,13 @@ public:
YQL_CLOG(DEBUG, FastMapReduce) << "Downloading " << cluster << '.' << path;
YQL_ENSURE(clusterConnections.size() == 1);
- auto ytTableReader = YtService_->MakeReader(ytTable, clusterConnections.begin()->second); // TODO - pass YtReader settings from Gateway
- auto tableDataServiceWriter = TFmrTableDataServiceWriter(tableId, partId, TableDataService_, GetFmrTableDataServiceWriterSettings());
+ auto ytTableReader = YtService_->MakeReader(ytTable, clusterConnections.begin()->second, Settings_.YtReaderSettings);
+ auto tableDataServiceWriter = MakeIntrusive<TFmrTableDataServiceWriter>(tableId, partId, TableDataService_, Settings_.FmrWriterSettings);
- ParseRecords(*ytTableReader, tableDataServiceWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize);
- tableDataServiceWriter.Flush();
+ ParseRecords(ytTableReader, tableDataServiceWriter, Settings_.ParseRecordSettings.DonwloadReadBlockCount, Settings_.ParseRecordSettings.DonwloadReadBlockSize, cancelFlag);
+ tableDataServiceWriter->Flush();
- TTableStats stats = tableDataServiceWriter.GetStats();
+ TTableStats stats = tableDataServiceWriter->GetStats();
auto statistics = TStatistics({{output, stats}});
return statistics;
} catch (...) {
@@ -46,7 +51,11 @@ public:
}
}
- virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) override {
+ virtual std::variant<TError, TStatistics> Upload(
+ const TUploadTaskParams& params,
+ const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections,
+ std::shared_ptr<std::atomic<bool>> cancelFlag
+ ) override {
try {
const auto ytTable = params.Output;
const auto cluster = params.Output.Cluster;
@@ -56,10 +65,10 @@ public:
YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path;
- auto tableDataServiceReader = TFmrTableDataServiceReader(tableId, tableRanges, TableDataService_, GetFmrTableDataServiceReaderSettings());
+ auto tableDataServiceReader = MakeIntrusive<TFmrTableDataServiceReader>(tableId, tableRanges, TableDataService_, Settings_.FmrReaderSettings);
YQL_ENSURE(clusterConnections.size() == 1);
- auto ytTableWriter = YtService_->MakeWriter(ytTable, clusterConnections.begin()->second);
- ParseRecords(tableDataServiceReader, *ytTableWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize);
+ auto ytTableWriter = YtService_->MakeWriter(ytTable, clusterConnections.begin()->second, Settings_.YtWriterSettings);
+ ParseRecords(tableDataServiceReader, ytTableWriter, Settings_.ParseRecordSettings.UploadReadBlockCount, Settings_.ParseRecordSettings.UploadReadBlockSize, cancelFlag);
ytTableWriter->Flush();
return TStatistics();
@@ -68,28 +77,34 @@ public:
}
}
- virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) override {
- // расширить таск парамс. добавить туда мету
+ virtual std::variant<TError, TStatistics> Merge(
+ const TMergeTaskParams& params,
+ const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections,
+ std::shared_ptr<std::atomic<bool>> cancelFlag
+ ) override {
try {
const auto inputs = params.Input;
const auto output = params.Output;
YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs";
+ auto& parseRecordSettings = Settings_.ParseRecordSettings;
- auto tableDataServiceWriter = TFmrTableDataServiceWriter(output.TableId, output.PartId, TableDataService_, GetFmrTableDataServiceWriterSettings());
+ auto tableDataServiceWriter = MakeIntrusive<TFmrTableDataServiceWriter>(output.TableId, output.PartId, TableDataService_, Settings_.FmrWriterSettings);
+ auto threadPool = CreateThreadPool(parseRecordSettings.MergeNumThreads);
+ TMaybe<TMutex> mutex = TMutex();
for (const auto& inputTableRef : inputs) {
- if (CancelFlag_->load()) {
- return TError("Canceled");
- }
auto inputTableReader = GetTableInputStream(inputTableRef, clusterConnections);
- ParseRecords(*inputTableReader, tableDataServiceWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize);
+ threadPool->SafeAddFunc([&, inputTableReader] {
+ ParseRecords(inputTableReader, tableDataServiceWriter, parseRecordSettings.MergeReadBlockCount, parseRecordSettings.MergeReadBlockSize, cancelFlag, mutex);
+ });
}
- tableDataServiceWriter.Flush();
- return TStatistics({{output, tableDataServiceWriter.GetStats()}});
+ threadPool->Stop();
+
+ tableDataServiceWriter->Flush();
+ return TStatistics({{output, tableDataServiceWriter->GetStats()}});
} catch (...) {
return TError(CurrentExceptionMessage());
}
- return TError{"not implemented yet"};
}
private:
@@ -99,61 +114,46 @@ private:
if (ytTable) {
TFmrTableId tableId = {ytTable->Cluster, ytTable->Path};
auto clusterConnection = clusterConnections.at(tableId);
- return YtService_->MakeReader(*ytTable, clusterConnection); // TODO - pass YtReader settings from Gateway
+ return YtService_->MakeReader(*ytTable, clusterConnection, Settings_.YtReaderSettings);
} else if (fmrTable) {
- return MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, GetFmrTableDataServiceReaderSettings());
+ return MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, Settings_.FmrReaderSettings);
} else {
ythrow yexception() << "Unsupported table type";
}
}
- TParseRecordSettings GetParseRecordSettings() const {
- return Settings_ ? Settings_->ParseRecordSettings : TParseRecordSettings();
- }
-
- TFmrTableDataServiceReaderSettings GetFmrTableDataServiceReaderSettings() const {
- return Settings_ ? Settings_->FmrTableDataServiceReaderSettings : TFmrTableDataServiceReaderSettings();
- }
-
- TFmrTableDataServiceWriterSettings GetFmrTableDataServiceWriterSettings() const {
- return Settings_ ? Settings_->FmrTableDataServiceWriterSettings : TFmrTableDataServiceWriterSettings();
- }
-
private:
ITableDataService::TPtr TableDataService_;
IYtService::TPtr YtService_;
- std::shared_ptr<std::atomic<bool>> CancelFlag_;
- TMaybe<TFmrJobSettings> Settings_;
+ TFmrJobSettings Settings_;
};
IFmrJob::TPtr MakeFmrJob(
ITableDataService::TPtr tableDataService,
IYtService::TPtr ytService,
- std::shared_ptr<std::atomic<bool>> cancelFlag,
const TFmrJobSettings& settings
) {
- return MakeIntrusive<TFmrJob>(tableDataService, ytService, cancelFlag, settings);
+ return MakeIntrusive<TFmrJob>(tableDataService, ytService, settings);
}
TJobResult RunJob(
TTask::TPtr task,
ITableDataService::TPtr tableDataService,
IYtService::TPtr ytService,
- std::shared_ptr<std::atomic<bool>> cancelFlag,
- const TMaybe<TFmrJobSettings>& settings
+ std::shared_ptr<std::atomic<bool>> cancelFlag
) {
- TFmrJobSettings jobSettings = settings ? *settings : GetJobSettingsFromTask(task);
- IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag, jobSettings);
+ TFmrJobSettings jobSettings = GetJobSettingsFromTask(task);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, jobSettings);
- auto processTask = [job, task] (auto&& taskParams) {
+ auto processTask = [job, task, cancelFlag] (auto&& taskParams) {
using T = std::decay_t<decltype(taskParams)>;
if constexpr (std::is_same_v<T, TUploadTaskParams>) {
- return job->Upload(taskParams, task->ClusterConnections);
+ return job->Upload(taskParams, task->ClusterConnections, cancelFlag);
} else if constexpr (std::is_same_v<T, TDownloadTaskParams>) {
- return job->Download(taskParams, task->ClusterConnections);
+ return job->Download(taskParams, task->ClusterConnections, cancelFlag);
} else if constexpr (std::is_same_v<T, TMergeTaskParams>) {
- return job->Merge(taskParams, task->ClusterConnections);
+ return job->Merge(taskParams, task->ClusterConnections, cancelFlag);
} else {
throw std::runtime_error{"Unsupported task type"};
}
@@ -176,28 +176,26 @@ TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task) {
auto jobSettings = *task->JobSettings;
YQL_ENSURE(jobSettings.IsMap());
TFmrJobSettings resultSettings{};
- if (jobSettings.HasKey("parse_record_settings")) {
- auto& parseRecordSettings = jobSettings["parse_record_settings"];
- if (parseRecordSettings.HasKey("block_count")) {
- resultSettings.ParseRecordSettings.BlockCount = parseRecordSettings["block_count"].AsInt64();
- }
- if (parseRecordSettings.HasKey("block_size")) {
- resultSettings.ParseRecordSettings.BlockSize = parseRecordSettings["block_size"].AsInt64();
- // TODO - support different formats (B, MB, ...)
- }
- }
- if (jobSettings.HasKey("fmr_reader_settings")) {
- auto& fmrReaderSettings = jobSettings["fmr_reader_settings"];
- if (fmrReaderSettings.HasKey("read_ahead_chunks")) {
- resultSettings.FmrTableDataServiceReaderSettings.ReadAheadChunks = fmrReaderSettings["read_ahead_chunks"].AsInt64();
- }
- }
- if (jobSettings.HasKey("fmr_writer_settings")) {
- auto& fmrWriterSettings = jobSettings["fmr_writer_settings"];
- if (fmrWriterSettings.HasKey("chunk_size")) {
- resultSettings.FmrTableDataServiceWriterSettings.ChunkSize = fmrWriterSettings["chunk_size"].AsInt64();
- }
- }
+
+ auto& parseRecordSettings = resultSettings.ParseRecordSettings;
+ parseRecordSettings.MergeReadBlockCount = jobSettings["merge"]["read_block_count"].AsInt64();
+ parseRecordSettings.MergeReadBlockSize = jobSettings["merge"]["read_block_size"].AsInt64();
+ parseRecordSettings.MergeNumThreads = jobSettings["merge"]["num_threads"].AsInt64();
+
+ parseRecordSettings.UploadReadBlockCount = jobSettings["upload"]["read_block_count"].AsInt64();
+ parseRecordSettings.UploadReadBlockSize = jobSettings["upload"]["read_block_size"].AsInt64();
+
+ auto& jobIoSettings = jobSettings["job_io"];
+ resultSettings.FmrReaderSettings.ReadAheadChunks = jobIoSettings["fmr_table_reader"]["inflight_chunks"].AsInt64();
+
+ auto& fmrWriterSettings = resultSettings.FmrWriterSettings;
+ fmrWriterSettings.MaxInflightChunks = jobIoSettings["fmr_table_writer"]["inflight_chunks"].AsInt64();
+ fmrWriterSettings.ChunkSize = jobIoSettings["fmr_table_writer"]["chunk_size"].AsInt64();
+ fmrWriterSettings.MaxRowWeight = jobIoSettings["fmr_table_writer"]["max_row_weight"].AsInt64();
+
+ resultSettings.YtWriterSettings.MaxRowWeight = jobIoSettings["yt_table_writer"]["max_row_weight"].AsInt64();
+
+ // TODO - maybe pass other optional settings here.
return resultSettings;
}
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
index cb21e95f9c9..09b17090427 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
@@ -1,3 +1,5 @@
+#pragma once
+
#include <yt/cpp/mapreduce/interface/fwd.h>
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h>
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h>
@@ -8,19 +10,27 @@
namespace NYql::NFmr {
struct TParseRecordSettings {
- ui64 BlockCount = 1;
- ui64 BlockSize = 1024 * 1024;
+ ui64 MergeReadBlockCount = 1;
+ ui64 MergeReadBlockSize = 1024 * 1024;
+ ui64 MergeNumThreads = 3;
+ ui64 UploadReadBlockCount = 1;
+ ui64 UploadReadBlockSize = 1024 * 1024;
+ ui64 DonwloadReadBlockCount = 1;
+ ui64 DonwloadReadBlockSize = 1024 * 1024; // TODO - remove download
};
struct TFmrJobSettings {
TParseRecordSettings ParseRecordSettings = TParseRecordSettings();
- TFmrTableDataServiceReaderSettings FmrTableDataServiceReaderSettings = TFmrTableDataServiceReaderSettings();
- TFmrTableDataServiceWriterSettings FmrTableDataServiceWriterSettings = TFmrTableDataServiceWriterSettings();
+ TFmrReaderSettings FmrReaderSettings = TFmrReaderSettings();
+ TFmrWriterSettings FmrWriterSettings = TFmrWriterSettings();
+ TYtReaderSettings YtReaderSettings = TYtReaderSettings();
+ TYtWriterSettings YtWriterSettings = TYtWriterSettings();
+ ui64 NumThreads = 0;
};
-IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings = {});
+IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, const TFmrJobSettings& settings = {});
-TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TMaybe<TFmrJobSettings>& settings = Nothing());
+TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag);
TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task);
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp
index 1223debe79a..9b2caf44aa4 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp
@@ -9,7 +9,7 @@ TFmrTableDataServiceReader::TFmrTableDataServiceReader(
const TString& tableId,
const std::vector<TTableRange>& tableRanges,
ITableDataService::TPtr tableDataService,
- const TFmrTableDataServiceReaderSettings& settings
+ const TFmrReaderSettings& settings
)
: TableId_(tableId),
TableRanges_(tableRanges),
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h
index c64b7b20745..d56bb0eb0d7 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h
@@ -9,7 +9,7 @@
namespace NYql::NFmr {
-struct TFmrTableDataServiceReaderSettings {
+struct TFmrReaderSettings {
ui64 ReadAheadChunks = 1;
};
@@ -24,7 +24,7 @@ public:
const TString& tableId,
const std::vector<TTableRange>& tableRanges,
ITableDataService::TPtr tableDataService,
- const TFmrTableDataServiceReaderSettings& settings = TFmrTableDataServiceReaderSettings{}
+ const TFmrReaderSettings& settings = TFmrReaderSettings{}
);
bool Retry(const TMaybe<ui32>&, const TMaybe<ui64>&, const std::exception_ptr&) override;
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp
index 7b1ffe70856..139e57368cf 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp
@@ -1,5 +1,8 @@
#include "yql_yt_table_data_service_writer.h"
+#include <library/cpp/threading/future/wait/wait.h>
#include <util/string/join.h>
+#include <yql/essentials/utils/yql_panic.h>
+
namespace NYql::NFmr {
@@ -7,25 +10,71 @@ TFmrTableDataServiceWriter::TFmrTableDataServiceWriter(
const TString& tableId,
const TString& partId,
ITableDataService::TPtr tableDataService,
- const TFmrTableDataServiceWriterSettings& settings
+ const TFmrWriterSettings& settings
)
- : TableId_(tableId), PartId_(partId), TableDataService_(tableDataService), ChunkSize_(settings.ChunkSize) {}
+ : TableId_(tableId),
+ PartId_(partId),
+ TableDataService_(tableDataService),
+ ChunkSize_(settings.ChunkSize),
+ MaxInflightChunks_(settings.MaxInflightChunks),
+ MaxRowWeight_(settings.MaxRowWeight)
+{
+ YQL_ENSURE(MaxRowWeight_ >= ChunkSize_);
+}
void TFmrTableDataServiceWriter::DoWrite(const void* buf, size_t len) {
TableContent_.Append(static_cast<const char*>(buf), len);
}
void TFmrTableDataServiceWriter::NotifyRowEnd() {
- Rows_ += 1;
+ ++Rows_;
+ if (TableContent_.size() >= MaxRowWeight_) {
+ ythrow yexception() << "Current row size: " << TableContent_.size() << " is larger than max row weight: " << MaxRowWeight_;
+ }
if (TableContent_.size() >= ChunkSize_) {
- DoFlush();
+ PutRows();
}
}
void TFmrTableDataServiceWriter::DoFlush() {
+ PutRows();
+ with_lock(State_->Mutex) {
+ if (State_->Exception) {
+ std::rethrow_exception(State_->Exception);
+ }
+ State_->CondVar.Wait(State_->Mutex, [this] {
+ return State_->CurInflightChunks == 0;
+ });
+ }
+}
+
+void TFmrTableDataServiceWriter::PutRows() {
+ with_lock(State_->Mutex) {
+ State_->CondVar.Wait(State_->Mutex, [&] {
+ return State_->CurInflightChunks < MaxInflightChunks_;
+ });
+ ++State_->CurInflightChunks;
+ }
TString chunkKey = GetTableDataServiceKey(TableId_, PartId_, ChunkCount_);
- TableDataService_->Put(chunkKey, TString(TableContent_.Data(), TableContent_.Size())).Wait();
- ChunkCount_++;
+ TableDataService_->Put(chunkKey, TString(TableContent_.Data(), TableContent_.Size())).Subscribe(
+ [weakState = std::weak_ptr(State_)] (const auto& putFuture) mutable {
+ std::shared_ptr<TFmrWriterState> state = weakState.lock();
+ if (state) {
+ with_lock(state->Mutex) {
+ --state->CurInflightChunks;
+ try {
+ putFuture.GetValue();
+ } catch (...) {
+ if (!state->Exception) {
+ state->Exception = std::current_exception();
+ }
+ }
+ state->CondVar.Signal();
+ }
+ }
+ }
+ );
+ ++ChunkCount_;
DataWeight_ += TableContent_.Size();
TableContent_.Clear();
}
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h
index 2e27cd4f072..544b9fc3bd1 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h
@@ -2,6 +2,7 @@
#include <util/generic/buffer.h>
#include <util/stream/output.h>
+#include <util/system/condvar.h>
#include <yt/cpp/mapreduce/interface/io.h>
#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
#include <yt/yql/providers/yt/fmr/table_data_service/interface/yql_yt_table_data_service.h>
@@ -9,8 +10,12 @@
namespace NYql::NFmr {
-struct TFmrTableDataServiceWriterSettings {
+
+
+struct TFmrWriterSettings {
ui64 ChunkSize = 1024 * 1024;
+ ui64 MaxInflightChunks = 1;
+ ui64 MaxRowWeight = 1024 * 1024 * 16;
};
class TFmrTableDataServiceWriter: public NYT::TRawTableWriter {
@@ -19,7 +24,7 @@ public:
const TString& tableId,
const TString& partId,
ITableDataService::TPtr tableDataService,
- const TFmrTableDataServiceWriterSettings& settings = TFmrTableDataServiceWriterSettings()
+ const TFmrWriterSettings& settings = TFmrWriterSettings()
);
TTableStats GetStats();
@@ -32,6 +37,9 @@ protected:
void DoFlush() override;
private:
+ void PutRows();
+
+private:
const TString TableId_;
const TString PartId_;
ITableDataService::TPtr TableDataService_;
@@ -40,7 +48,18 @@ private:
TBuffer TableContent_;
const ui64 ChunkSize_; // size at which we push to table data service
+ const ui64 MaxInflightChunks_;
+ const ui64 MaxRowWeight_;
+
ui64 ChunkCount_ = 0;
+
+ struct TFmrWriterState {
+ ui64 CurInflightChunks = 0;
+ TMutex Mutex = TMutex();
+ TCondVar CondVar;
+ std::exception_ptr Exception;
+ };
+ std::shared_ptr<TFmrWriterState> State_ = std::make_shared<TFmrWriterState>();
};
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h
index 18d9553ba01..e6a8ab6ed88 100644
--- a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h
+++ b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h
@@ -10,11 +10,11 @@ public:
virtual ~IFmrJob() = default;
- virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}) = 0;
+ virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}, std::shared_ptr<std::atomic<bool>> cancelFlag = nullptr) = 0;
- virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}) = 0;
+ virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}, std::shared_ptr<std::atomic<bool>> cancelFlag = nullptr) = 0;
- virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}) = 0;
+ virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}, std::shared_ptr<std::atomic<bool>> cancelFlag = nullptr) = 0;
};
} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp b/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp
index 85e9599f8f8..a0736aa96f3 100644
--- a/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp
+++ b/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp
@@ -19,7 +19,8 @@ Y_UNIT_TEST_SUITE(ParseRecordTests) {
auto reader = ytService->MakeReader(testYtTable, TClusterConnection());
auto writer = ytService->MakeWriter(testYtTable, TClusterConnection());
- ParseRecords(*reader, *writer, 1, 10);
+ auto cancelFlag = std::make_shared<std::atomic<bool>>(false);
+ ParseRecords(reader, writer, 1, 10, cancelFlag);
writer->Flush();
UNIT_ASSERT_VALUES_EQUAL(outputTables.size(), 1);
UNIT_ASSERT(outputTables.contains(testYtTable));
diff --git a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp
index 78878f18ea7..c744f7f5ae7 100644
--- a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp
+++ b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp
@@ -1,15 +1,33 @@
#include "yql_yt_parse_records.h"
+#include <yql/essentials/utils/log/log.h>
using namespace NYql;
namespace NYql::NFmr {
-void ParseRecords(NYT::TRawTableReader& reader, NYT::TRawTableWriter& writer, ui64 blockCount, ui64 blockSize) {
- auto blockReader = MakeBlockReader(reader, blockCount, blockSize);
+void CheckCancelled(std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ if (cancelFlag->load()) {
+ ythrow yexception() << " Job was cancelled, aborting";
+ }
+}
+
+void ParseRecords(
+ NYT::TRawTableReaderPtr reader,
+ NYT::TRawTableWriterPtr writer,
+ ui64 blockCount,
+ ui64 blockSize,
+ std::shared_ptr<std::atomic<bool>> cancelFlag,
+ const TMaybe<TMutex>& writeMutex
+) {
+ auto blockReader = MakeBlockReader(*reader, blockCount, blockSize);
NCommon::TInputBuf inputBuf(*blockReader, nullptr);
TVector<char> curYsonRow;
char cmd;
- while (inputBuf.TryRead(cmd)) {
+ while (true) {
+ CheckCancelled(cancelFlag);
+ if (!inputBuf.TryRead(cmd)) {
+ break;
+ }
curYsonRow.clear();
CopyYson(cmd, inputBuf, curYsonRow);
bool needBreak = false;
@@ -19,8 +37,16 @@ void ParseRecords(NYT::TRawTableReader& reader, NYT::TRawTableWriter& writer, ui
YQL_ENSURE(cmd == ';');
curYsonRow.emplace_back(cmd);
}
- writer.Write(curYsonRow.data(), curYsonRow.size());
- writer.NotifyRowEnd();
+ CheckCancelled(cancelFlag);
+ if (writeMutex) {
+ with_lock(*writeMutex) {
+ writer->Write(curYsonRow.data(), curYsonRow.size());
+ writer->NotifyRowEnd();
+ }
+ } else {
+ writer->Write(curYsonRow.data(), curYsonRow.size());
+ writer->NotifyRowEnd();
+ }
if (needBreak) {
break;
}
diff --git a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h
index 9d00d2c9e15..79d81dfa72d 100644
--- a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h
+++ b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h
@@ -1,9 +1,18 @@
#include <yt/cpp/mapreduce/interface/io.h>
#include <yt/yql/providers/yt/codec/yt_codec_io.h>
+#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
namespace NYql::NFmr {
-void ParseRecords(NYT::TRawTableReader& reader, NYT::TRawTableWriter& writer, ui64 blockCount, ui64 blockSize);
+void CheckCancelled(std::shared_ptr<std::atomic<bool>> cancelFlag);
+
+void ParseRecords(
+ NYT::TRawTableReaderPtr reader,
+ NYT::TRawTableWriterPtr writer,
+ ui64 blockCount,
+ ui64 blockSize,
+ std::shared_ptr<std::atomic<bool>> cancelFlag,
+ const TMaybe<TMutex>& writeMutex = Nothing());
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp
index d618766edeb..4634a76031b 100644
--- a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp
+++ b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp
@@ -28,13 +28,17 @@ public:
NYT::TRawTableWriterPtr MakeWriter(
const TYtTableRef& ytTable,
const TClusterConnection& clusterConnection,
- const TYtWriterSettings& /*writerSetttings*/
+ const TYtWriterSettings& writerSetttings
) override {
auto client = CreateClient(clusterConnection);
auto transaction = client->AttachTransaction(GetGuid(clusterConnection.TransactionId));
TString ytPath = NYT::AddPathPrefix(ytTable.Path, "//");
auto richPath = NYT::TRichYPath(ytPath).Append(true);
- return transaction->CreateRawWriter(richPath, NYT::TFormat::YsonBinary()); // TODO - support writerOptions
+ auto writerOptions = NYT::TTableWriterOptions();
+ if (writerSetttings.MaxRowWeight) {
+ writerOptions.Config(NYT::TNode()("max_row_weight", *writerSetttings.MaxRowWeight));
+ }
+ return transaction->CreateRawWriter(richPath, NYT::TFormat::YsonBinary(), writerOptions);
}
private:
diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h
index 7ed2a47fc00..77204493e70 100644
--- a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h
+++ b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h
@@ -11,6 +11,7 @@ struct TYtReaderSettings {
};
struct TYtWriterSettings {
+ TMaybe<ui64> MaxRowWeight = Nothing();
};
class IYtService: public TThrRefBase {
diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
index 260c8b69647..25bda1b40fc 100644
--- a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
+++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
@@ -119,6 +119,11 @@ TYtRunTool::TYtRunTool(TString name)
.Optional()
.NoArgument()
.SetFlag(&DisableLocalFmrWorker_);
+
+ opts.AddLongOption( "fmr-operation-spec-path", "Path to file with fmr operation spec settings")
+ .Optional()
+ .StoreResult(&FmrOperationSpecFilePath_);
+
});
GetRunOptions().AddOptHandler([this](const NLastGetopt::TOptsParseResult& res) {
@@ -186,7 +191,7 @@ IYtGateway::TPtr TYtRunTool::CreateYtGateway() {
return ytGateway;
}
- auto [fmrGateway, worker] = NFmr::InitializeFmrGateway(ytGateway, DisableLocalFmrWorker_, FmrCoordinatorServerUrl_);
+ auto [fmrGateway, worker] = NFmr::InitializeFmrGateway(ytGateway, DisableLocalFmrWorker_, FmrCoordinatorServerUrl_, false, FmrOperationSpecFilePath_);
FmrWorker_ = std::move(worker);
return fmrGateway;
}
diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.h b/yt/yql/tools/ytrun/lib/ytrun_lib.h
index 9fa4ee3df5a..b90fddcb6ba 100644
--- a/yt/yql/tools/ytrun/lib/ytrun_lib.h
+++ b/yt/yql/tools/ytrun/lib/ytrun_lib.h
@@ -34,6 +34,7 @@ protected:
NFmr::IFmrWorker::TPtr FmrWorker_;
TString FmrCoordinatorServerUrl_;
bool DisableLocalFmrWorker_ = false;
+ TString FmrOperationSpecFilePath_;
};
} // NYql