diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-03-13 17:13:29 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-03-13 17:25:23 +0300 |
commit | 2096b0e1fd226062be688c1d9c0cc1a2f3a633e7 (patch) | |
tree | 252549f660307039bb359e835cfb5325cc3b5b5e | |
parent | f32f9dee5085a51ac5ef9d05db59ca8e12c31921 (diff) | |
download | ydb-2096b0e1fd226062be688c1d9c0cc1a2f3a633e7.tar.gz |
Intermediate changes
commit_hash:4b64d98e7898a081dd87badffe9fcb56bc6f38f8
12 files changed, 391 insertions, 188 deletions
diff --git a/contrib/tools/cython/Cython/Compiler/Main.py b/contrib/tools/cython/Cython/Compiler/Main.py index 5ec52f6520..3f03b66ac9 100644 --- a/contrib/tools/cython/Cython/Compiler/Main.py +++ b/contrib/tools/cython/Cython/Compiler/Main.py @@ -822,12 +822,6 @@ def search_include_directories(dirs, qualified_name, suffix, pos, include=False) module_filename = module_name + suffix package_filename = "__init__" + suffix - # Fix from https://github.com/cython/cython/pull/2738 - if pos: - path = os.path.join(os.path.dirname(pos[0].filename), module_filename) - if os.path.exists(path): - return path - for dirname in dirs: path = os.path.join(dirname, dotted_filename) if os.path.exists(path): diff --git a/contrib/tools/cython/patches/search-pxd.patch b/contrib/tools/cython/patches/search-pxd.patch deleted file mode 100644 index f125fc5387..0000000000 --- a/contrib/tools/cython/patches/search-pxd.patch +++ /dev/null @@ -1,15 +0,0 @@ ---- contrib/tools/cython/Cython/Compiler/Main.py (575a8ed5bc91c246ebc298ce6de0fe4d891b3594) -+++ contrib/tools/cython/Cython/Compiler/Main.py (working tree) -@@ -822,6 +822,12 @@ def search_include_directories(dirs, qualified_name, suffix, pos, include=False) - module_filename = module_name + suffix - package_filename = "__init__" + suffix - -+ # Fix from https://github.com/cython/cython/pull/2738 -+ if pos: -+ path = os.path.join(os.path.dirname(pos[0].filename), module_filename) -+ if os.path.exists(path): -+ return path -+ - for dirname in dirs: - path = os.path.join(dirname, dotted_filename) - if os.path.exists(path): diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make index cb2ee1f19b..78813e529d 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make +++ b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make @@ -3,6 +3,7 @@ UNITTEST() SRCS( yql_yt_job_ut.cpp yql_yt_output_stream_ut.cpp + yql_yt_raw_table_reader_ut.cpp ) PEERDIR( diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp index 4b4399dcca..7be6348426 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp @@ -6,10 +6,18 @@ namespace NYql::NFmr { -TString TableContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; +TString TableContent_1 = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; +TString TableContent_2 = "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; +TString TableContent_3 = "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; Y_UNIT_TEST_SUITE(FmrJobTests) { Y_UNIT_TEST(DownloadTable) { @@ -17,14 +25,15 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + TFmrJobSettings settings = {1}; + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings); TYtTableRef input = TYtTableRef("test_cluster", "test_path"); TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id"); TDownloadTaskParams params = TDownloadTaskParams(input, output); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - ytUploadedTablesMock->AddTable(input, TableContent); + ytUploadedTablesMock->AddTable(input, TableContent_1); auto res = job->Download(params); @@ -35,57 +44,47 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { UNIT_ASSERT_EQUAL(statistics->OutputTables.at(output).Rows, 4); auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync(); UNIT_ASSERT_C(resultTableContent, "Result table content is empty"); - UNIT_ASSERT_NO_DIFF(*resultTableContent, TableContent); + UNIT_ASSERT_NO_DIFF(*resultTableContent, TableContent_1); } Y_UNIT_TEST(UploadTable) { - TString ytTableContent = TableContent; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + TFmrJobSettings settings = {1}; + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings); TYtTableRef output = TYtTableRef("test_cluster", "test_path"); - TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id"}; + std::vector<TTableRange> ranges = {{"test_part_id"}}; + TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id", .TableRanges = ranges}; auto params = TUploadTaskParams(input, output); - tableDataServicePtr->Put(input.TableId, ytTableContent); + auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0); + tableDataServicePtr->Put(key, TableContent_1); auto res = job->Upload(params); auto err = std::get_if<TError>(&res); UNIT_ASSERT_C(!err,err->ErrorMessage); - UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), ytTableContent); + UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), TableContent_1); } Y_UNIT_TEST(MergeFmrTables) { - TString TableContent_1 = - "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)"; - TString TableContent_2 = - "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; - TString TableContent_3 = - "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + TFmrJobSettings settings = {1}; + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings); - TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"}; - TFmrTableInputRef input_2 = TFmrTableInputRef{.TableId = "test_table_id_2"}; - TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"}; + std::vector<TTableRange> ranges = {{"test_part_id"}}; + + TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges}; + TFmrTableInputRef input_2 = TFmrTableInputRef{.TableId = "test_table_id_2", .TableRanges = ranges}; + TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges}; TTaskTableRef input_table_ref_1 = {input_1}; TTaskTableRef input_table_ref_2 = {input_2}; TTaskTableRef input_table_ref_3 = {input_3}; @@ -94,9 +93,12 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { auto params = TMergeTaskParams(inputs, output); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - tableDataServicePtr->Put(input_1.TableId, TableContent_1); - tableDataServicePtr->Put(input_2.TableId, TableContent_2); - tableDataServicePtr->Put(input_3.TableId, TableContent_3); + auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0); + auto key_2 = GetTableDataServiceKey(input_2.TableId, "test_part_id", 0); + auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0); + tableDataServicePtr->Put(key_1, TableContent_1); + tableDataServicePtr->Put(key_2, TableContent_2); + tableDataServicePtr->Put(key_3, TableContent_3); auto res = job->Merge(params); @@ -109,31 +111,19 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { } Y_UNIT_TEST(MergeMixedTables) { - TString TableContent_1 = - "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)"; - TString TableContent_2 = - "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; - TString TableContent_3 = - "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + TFmrJobSettings settings = {1}; + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings); + + std::vector<TTableRange> ranges = {{"test_part_id"}}; - TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"}; + TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges}; TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster"); - TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"}; + TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges}; TTaskTableRef input_table_ref_1 = {input_1}; TTaskTableRef input_table_ref_2 = {input_2}; TTaskTableRef input_table_ref_3 = {input_3}; @@ -142,9 +132,11 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { auto params = TMergeTaskParams(inputs, output); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - tableDataServicePtr->Put(input_1.TableId, TableContent_1); + auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0); + auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0); + tableDataServicePtr->Put(key_1, TableContent_1); ytUploadedTablesMock->AddTable(input_2, TableContent_2); - tableDataServicePtr->Put(input_3.TableId, TableContent_3); + tableDataServicePtr->Put(key_3, TableContent_3); auto res = job->Merge(params); @@ -159,11 +151,6 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { Y_UNIT_TEST_SUITE(TaskRunTests) { Y_UNIT_TEST(RunDownloadTask) { - TString ytTableContent = - "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}"; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); @@ -174,7 +161,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id"); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - ytUploadedTablesMock->AddTable(input, ytTableContent); + ytUploadedTablesMock->AddTable(input, TableContent_1); TDownloadTaskParams params = TDownloadTaskParams(input, output); TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id"); @@ -183,55 +170,50 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed); auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync(); UNIT_ASSERT_C(resultTableContent, "Result table content is empty"); - UNIT_ASSERT_NO_DIFF(*resultTableContent, ytTableContent); + UNIT_ASSERT_NO_DIFF(*resultTableContent, TableContent_1); } Y_UNIT_TEST(RunUploadTask) { - TString ytTableContent = - "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}"; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id"}; + std::vector<TTableRange> ranges = {{"test_part_id"}}; + TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id", .TableRanges = ranges}; TYtTableRef output = TYtTableRef("test_cluster", "test_path"); TUploadTaskParams params = TUploadTaskParams(input, output); TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); - tableDataServicePtr->Put(input.TableId, ytTableContent); + auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0); + + tableDataServicePtr->Put(key, TableContent_1); ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed); - UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), ytTableContent); + UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), TableContent_1); } Y_UNIT_TEST(RunUploadTaskWithNoTable) { - TString ytTableContent = - "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}"; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id"}; + std::vector<TTableRange> ranges = {{"test_part_id"}}; + TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id", .TableRanges = ranges}; TYtTableRef output = TYtTableRef("test_cluster", "test_path"); TUploadTaskParams params = TUploadTaskParams(input, output); TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); // No table in tableDataServicePtr - // tableDataServicePtr->Put(input.TableId, ytTableContent); + // auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0); + // tableDataServicePtr->Put(key, ytTableContent); ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; @@ -240,30 +222,16 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { } Y_UNIT_TEST(RunMergeTask) { - TString TableContent_1 = - "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)"; - TString TableContent_2 = - "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; - TString TableContent_3 = - "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"}; + std::vector<TTableRange> ranges = {{"test_part_id"}}; + TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges}; TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster"); - TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"}; + TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges}; TTaskTableRef input_table_ref_1 = {input_1}; TTaskTableRef input_table_ref_2 = {input_2}; TTaskTableRef input_table_ref_3 = {input_3}; @@ -274,9 +242,11 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); - tableDataServicePtr->Put(input_1.TableId, TableContent_1); + auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0); + auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0); + tableDataServicePtr->Put(key_1, TableContent_1); ytUploadedTablesMock->AddTable(input_2, TableContent_2); - tableDataServicePtr->Put(input_3.TableId, TableContent_3); + tableDataServicePtr->Put(key_3, TableContent_3); ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; @@ -287,30 +257,16 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { } Y_UNIT_TEST(RunMergeTaskWithNoTable) { - TString TableContent_1 = - "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)"; - TString TableContent_2 = - "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; - TString TableContent_3 = - "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"}; + std::vector<TTableRange> ranges = {{"test_part_id"}}; + TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges}; TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster"); - TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"}; + TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges}; TTaskTableRef input_table_ref_1 = {input_1}; TTaskTableRef input_table_ref_2 = {input_2}; TTaskTableRef input_table_ref_3 = {input_3}; @@ -321,10 +277,12 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); - tableDataServicePtr->Put(input_1.TableId, TableContent_1); + auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0); + auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0); + tableDataServicePtr->Put(key_1, TableContent_1); // No table in Yt // ytUploadedTablesMock->AddTable(input_2, TableContent_2); - tableDataServicePtr->Put(input_3.TableId, TableContent_3); + tableDataServicePtr->Put(key_3, TableContent_3); ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_reader_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_reader_ut.cpp new file mode 100644 index 0000000000..4d45e054b6 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_reader_ut.cpp @@ -0,0 +1,98 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_output_stream.h> +#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h> +#include <yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h> + +namespace NYql::NFmr { + +TString originalTableContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; + +Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) { + Y_UNIT_TEST(ReadOneChunkSmallPart) { + size_t chunkSize = 1024; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + + TFmrOutputStreamSettings settings{chunkSize}; + TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings); + + outputStream.Write(originalTableContent.data(), originalTableContent.size()); + outputStream.Flush(); + + TFmrRawTableReaderSettings readerSettings{1}; + std::vector<TTableRange> tableRanges = {{"partId", 0, 1}}; + TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings); + + char buffer[10]; + reader.Read(buffer, 10); + TString readTableContentPart = {buffer, 10}; + auto originalTableContentPart = originalTableContent.substr(0, 10); + UNIT_ASSERT_NO_DIFF(readTableContentPart, originalTableContentPart); + } + + Y_UNIT_TEST(ReadAllOneChunk) { + size_t chunkSize = 1024; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + + TFmrOutputStreamSettings settings{chunkSize}; + TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings); + + outputStream.Write(originalTableContent.data(), originalTableContent.size()); + outputStream.Flush(); + + TFmrRawTableReaderSettings readerSettings{1}; + std::vector<TTableRange> tableRanges = {{"partId", 0, 1}}; + TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings); + + auto readTableContent = reader.ReadAll(); + UNIT_ASSERT_NO_DIFF(readTableContent, originalTableContent); + } + + Y_UNIT_TEST(ReadAllMultipleChunks) { + size_t chunkSize = 32; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + + TFmrOutputStreamSettings settings{chunkSize}; + TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings); + + outputStream.Write(originalTableContent.data(), originalTableContent.size()); + outputStream.Flush(); + + TFmrRawTableReaderSettings readerSettings{1}; + + auto maxChunk = originalTableContent.size() / chunkSize + 1; + std::vector<TTableRange> tableRanges = {{"partId", 0, maxChunk}}; + TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings); + + auto readTableContent = reader.ReadAll(); + UNIT_ASSERT_NO_DIFF(readTableContent, originalTableContent); + } + + Y_UNIT_TEST(ReadAllMultipleChunksBigReadAhead) { + size_t chunkSize = 32; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + + TFmrOutputStreamSettings settings{chunkSize}; + TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings); + + outputStream.Write(originalTableContent.data(), originalTableContent.size()); + outputStream.Flush(); + + TFmrRawTableReaderSettings readerSettings{5}; + + auto maxChunk = originalTableContent.size() / chunkSize + 1; + std::vector<TTableRange> tableRanges = {{"partId", 0, maxChunk}}; + TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings); + + auto readTableContent = reader.ReadAll(); + UNIT_ASSERT_NO_DIFF(readTableContent, originalTableContent); + } +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/ya.make b/yt/yql/providers/yt/fmr/job/impl/ya.make index 89865c44a4..969c305ed1 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ya.make +++ b/yt/yql/providers/yt/fmr/job/impl/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( yql_yt_job_impl.cpp yql_yt_output_stream.cpp + yql_yt_raw_table_reader.cpp ) PEERDIR( diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp index 4e18a287a1..40d0a26760 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp @@ -4,6 +4,7 @@ #include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h> #include <yt/yql/providers/yt/fmr/job/impl/yql_yt_output_stream.h> +#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h> #include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> #include <yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h> #include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h> @@ -15,8 +16,8 @@ namespace NYql::NFmr { class TFmrJob: public IFmrJob { public: - TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag) - : TableDataService_(tableDataService), YtService_(ytService), CancelFlag_(cancelFlag) + TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings) + : TableDataService_(tableDataService), YtService_(ytService), CancelFlag_(cancelFlag), Settings_(settings) { } @@ -57,60 +58,68 @@ public: } virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const TClusterConnection& clusterConnection) override { - const auto ytTable = params.Output; - const auto cluster = params.Output.Cluster; - const auto path = params.Output.Path; - const auto tableId = params.Input.TableId; - - YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path; + try { + const auto ytTable = params.Output; + const auto cluster = params.Output.Cluster; + const auto path = params.Output.Path; + const auto tableId = params.Input.TableId; + const auto tableRanges = params.Input.TableRanges; - TMaybe<TString> getResult = TableDataService_->Get(tableId).GetValueSync(); + YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path; - if (!getResult) { - YQL_CLOG(ERROR, FastMapReduce) << "Table " << tableId << " not found"; - return TError("Table not found"); - } + auto res = GetFmrTableStream(params.Input); + auto err = std::get_if<TError>(&res); + if (err) { + return *err; + } + auto inputStream = std::get_if<THolder<IInputStream>>(&res); - TString tableContent = getResult.GetRef(); - TStringInput inputStream(tableContent); + // How to raise if not found - YtService_->Upload(ytTable, inputStream, clusterConnection); + YtService_->Upload(ytTable, *inputStream->get(), clusterConnection); - return TStatistics(); + return TStatistics(); + } catch (...) { + return TError(CurrentExceptionMessage()); + } } virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const TClusterConnection& clusterConnection) override { // расширить таск парамс. добавить туда мету - const auto inputs = params.Input; - const auto output = params.Output; + try { + const auto inputs = params.Input; + const auto output = params.Output; - YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs"; + YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs"; - TFmrOutputStream outputStream(output.TableId, output.PartId, TableDataService_); + TFmrOutputStream outputStream(output.TableId, output.PartId, TableDataService_); - ui32 totalRowsCount = 0; + ui32 totalRowsCount = 0; - for (const auto& inputTableRef : inputs) { - if (CancelFlag_->load()) { - return TError("Canceled"); - } - ui64 rowsCount = 0; // TMP Todo get rows count from input stats - auto res = GetTableInputStream(inputTableRef, rowsCount, clusterConnection); - totalRowsCount += rowsCount; + for (const auto& inputTableRef : inputs) { + if (CancelFlag_->load()) { + return TError("Canceled"); + } + ui64 rowsCount = 0; // TMP Todo get rows count from input stats + auto res = GetTableInputStream(inputTableRef, rowsCount, clusterConnection); + totalRowsCount += rowsCount; - auto err = std::get_if<TError>(&res); - if (err) { - return *err; + auto err = std::get_if<TError>(&res); + if (err) { + return *err; + } + auto inputStream = std::get_if<THolder<IInputStream>>(&res); + TransferData(inputStream->get(), &outputStream); } - auto inputStream = std::get_if<THolder<IInputStream>>(&res); - TransferData(inputStream->get(), &outputStream); - } - outputStream.Flush(); + outputStream.Flush(); - TTableStats stats = outputStream.GetStats(); - stats.Rows = totalRowsCount; + TTableStats stats = outputStream.GetStats(); + stats.Rows = totalRowsCount; - return TStatistics({{output, stats}}); + return TStatistics({{output, stats}}); + } catch (...) { + return TError(CurrentExceptionMessage()); + } } private: @@ -141,33 +150,40 @@ private: } std::variant<THolder<IInputStream>, TError> GetFmrTableStream(const TFmrTableInputRef& fmrTable) { - auto res = TableDataService_->Get(fmrTable.TableId).GetValueSync(); - if (!res) { - return TError("Table not found"); - } - auto tableContent = *res; - TStringStream stream; - stream << tableContent; - return MakeHolder<TStringStream>(stream); + + auto settings = TFmrRawTableReaderSettings(Settings_.ReadAheadChunks); + return MakeHolder<TFmrRawTableReader>(TFmrRawTableReader( + fmrTable.TableId, + fmrTable.TableRanges, + TableDataService_, + settings + )); } private: ITableDataService::TPtr TableDataService_; IYtService::TPtr YtService_; std::shared_ptr<std::atomic<bool>> CancelFlag_; + const TFmrJobSettings Settings_; }; -IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag) { - return MakeIntrusive<TFmrJob>(tableDataService, ytService, cancelFlag); +IFmrJob::TPtr MakeFmrJob( + ITableDataService::TPtr tableDataService, + IYtService::TPtr ytService, + std::shared_ptr<std::atomic<bool>> cancelFlag, + const TFmrJobSettings& settings +) { + return MakeIntrusive<TFmrJob>(tableDataService, ytService, cancelFlag, settings); } TJobResult RunJob( TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, - std::shared_ptr<std::atomic<bool>> cancelFlag + std::shared_ptr<std::atomic<bool>> cancelFlag, + const TFmrJobSettings& settings ) { - IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag); + IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag, settings); auto processTask = [job, task] (auto&& taskParams) { using T = std::decay_t<decltype(taskParams)>; diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h index 1da096ee23..284b68040d 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h @@ -6,8 +6,12 @@ namespace NYql::NFmr { -IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag); +struct TFmrJobSettings { + ui64 ReadAheadChunks = 1; +}; -TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag); +IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings); + +TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings = {}); } // namespace NYql diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.cpp new file mode 100644 index 0000000000..c458be66b6 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.cpp @@ -0,0 +1,83 @@ +#include "yql_yt_raw_table_reader.h" +#include <yql/essentials/utils/log/log.h> +#include <yt/yql/providers/yt/fmr/utils/table_data_service_key.h> + +namespace NYql::NFmr { + +TFmrRawTableReader::TFmrRawTableReader( + const TString& tableId, + const std::vector<TTableRange>& tableRanges, + ITableDataService::TPtr tableDataService, + const TFmrRawTableReaderSettings& settings +) + : TableId_(tableId) + , TableRanges_(tableRanges) + , TableDataService_(tableDataService) + , Settings_(settings) +{ + ReadAhead(); +} + +size_t TFmrRawTableReader::DoRead(void* buf, size_t len) { + ui64 totalRead = 0; + char* output = static_cast<char*>(buf); + + while (len > 0) { + ui64 available = DataBuffer_.size() - CurrentPosition_; + if (available > 0) { + ui64 toCopy = std::min(available, len); + + auto start = DataBuffer_.Begin() + CurrentPosition_; + auto end = start + toCopy; + std::copy(start, end, output); + + CurrentPosition_ += toCopy; + output += toCopy; + len -= toCopy; + totalRead += toCopy; + } else if (!PendingChunks_.empty()) { + auto chunk = PendingChunks_.front(); + TMaybe<TString> data; + try { + data = chunk.Data.GetValueSync(); + } catch (...) { + ythrow yexception() << "Error reading chunk:" << chunk.Meta << "Error: " << CurrentExceptionMessage(); + } + + if (data) { + DataBuffer_.Assign(data->data(), data->size()); + } else { + ythrow yexception() << "No data for chunk:" << chunk.Meta; + } + + PendingChunks_.pop(); + CurrentPosition_ = 0; + available = DataBuffer_.size(); + + ReadAhead(); + } else { + break; + } + } + return totalRead; +} + +void TFmrRawTableReader::ReadAhead() { + while (PendingChunks_.size() < Settings_.ReadAheadChunks) { + if (CurrentRange_ < TableRanges_.size()) { + auto currentPartId = TableRanges_[CurrentRange_].PartId; + if (CurrentChunk_ < TableRanges_[CurrentRange_].MaxChunk) { + auto key = GetTableDataServiceKey(TableId_, currentPartId, CurrentChunk_); + PendingChunks_.push({TableDataService_->Get(key), {TableId_, currentPartId, CurrentChunk_}}); + CurrentChunk_++; + } else { + CurrentRange_++; + } + } + else { + break; + } + } +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h new file mode 100644 index 0000000000..ccebc661b4 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h @@ -0,0 +1,45 @@ +#pragma once + +#include <queue> +#include <util/generic/buffer.h> +#include <util/stream/input.h> +#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> +#include <yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h> + +namespace NYql::NFmr { + +struct TFmrRawTableReaderSettings { + ui64 ReadAheadChunks = 1; +}; + +struct TPendingFmrChunk { + NThreading::TFuture<TMaybe<TString>> Data; + TFmrChunkMeta Meta; +}; + +class TFmrRawTableReader: public IInputStream { + public: + TFmrRawTableReader( + const TString& tableId, + const std::vector<TTableRange>& tableRanges, + ITableDataService::TPtr tableDataService, + const TFmrRawTableReaderSettings& settings + ); + protected: + size_t DoRead(void* buf, size_t len) override; + private: + void ReadAhead(); + private: + const TString TableId_; + const std::vector<TTableRange> TableRanges_; + ITableDataService::TPtr TableDataService_; + const TFmrRawTableReaderSettings Settings_; + + ui64 CurrentRange_ = 0; + ui64 CurrentChunk_ = 0; + ui64 CurrentPosition_ = 0; + TBuffer DataBuffer_; + std::queue<TPendingFmrChunk> PendingChunks_; +}; + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp index c3a182b486..319cf20801 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp @@ -10,6 +10,10 @@ TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, co return MakeIntrusive<TTaskState>(taskStatus, taskId, taskErrorMessage, stats); } +TString TFmrChunkMeta::ToString() const { + return TStringBuilder() << TableId << ":" << PartId << ":" << std::to_string(Chunk); +} + } // namespace NYql::NFmr template<> @@ -22,3 +26,8 @@ void Out<NYql::NFmr::TFmrError>(IOutputStream& out, const NYql::NFmr::TFmrError& } out << error.ErrorMessage; } + +template<> +void Out<NYql::NFmr::TFmrChunkMeta>(IOutputStream& out, const NYql::NFmr::TFmrChunkMeta& meta) { + out << meta.ToString(); +} diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h index a4b1ebf37c..4c5bd1252f 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h @@ -3,6 +3,7 @@ #include <util/digest/numeric.h> #include <util/generic/maybe.h> #include <util/generic/string.h> +#include <util/string/builder.h> #include <vector> namespace NYql::NFmr { @@ -66,6 +67,14 @@ struct TTableRange { ui64 MaxChunk = 1; // Пока такой дефолт }; +struct TFmrChunkMeta { + TString TableId; + TString PartId; + ui64 Chunk = 0; // сделать out метод + + TString ToString() const; +}; + struct TFmrTableInputRef { TString TableId; std::vector<TTableRange> TableRanges; |