diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-03-14 00:51:45 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-03-14 00:51:45 +0000 |
commit | 3e3d50dea42f66b1ba457411b8864990f90bbe21 (patch) | |
tree | 7d75df352fc045a84d46764b96b496b5775bbf44 /yt/yql | |
parent | 7778cd274683ce11e318b799ea12c7bc0b3a4bdd (diff) | |
parent | 422642b601155a296cb0a69eb9b1f7ba146ffa49 (diff) | |
download | ydb-3e3d50dea42f66b1ba457411b8864990f90bbe21.tar.gz |
Merge branch 'rightlib' into merge-libs-250314-0050
Diffstat (limited to 'yt/yql')
26 files changed, 674 insertions, 411 deletions
diff --git a/yt/yql/providers/yt/expr_nodes/ya.make b/yt/yql/providers/yt/expr_nodes/ya.make index 5c46e4ff03..744cedec7b 100644 --- a/yt/yql/providers/yt/expr_nodes/ya.make +++ b/yt/yql/providers/yt/expr_nodes/ya.make @@ -13,40 +13,21 @@ SRCDIR( yql/essentials/core/expr_nodes_gen ) -IF (EXPORT_CMAKE) - RUN_PYTHON3( - ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/gen/__main__.py - yql_expr_nodes_gen.jnj - yql_yt_expr_nodes.json - yql_yt_expr_nodes.gen.h - yql_yt_expr_nodes.decl.inl.h - yql_yt_expr_nodes.defs.inl.h - IN yql_expr_nodes_gen.jnj - IN yql_yt_expr_nodes.json - OUT yql_yt_expr_nodes.gen.h - OUT yql_yt_expr_nodes.decl.inl.h - OUT yql_yt_expr_nodes.defs.inl.h - OUTPUT_INCLUDES - ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h - ${ARCADIA_ROOT}/util/generic/hash_set.h - ) -ELSE() - RUN_PROGRAM( - yql/essentials/core/expr_nodes_gen/gen - yql_expr_nodes_gen.jnj - yql_yt_expr_nodes.json - yql_yt_expr_nodes.gen.h - yql_yt_expr_nodes.decl.inl.h - yql_yt_expr_nodes.defs.inl.h - IN yql_expr_nodes_gen.jnj - IN yql_yt_expr_nodes.json - OUT yql_yt_expr_nodes.gen.h - OUT yql_yt_expr_nodes.decl.inl.h - OUT yql_yt_expr_nodes.defs.inl.h - OUTPUT_INCLUDES - ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h - ${ARCADIA_ROOT}/util/generic/hash_set.h - ) -ENDIF() +RUN_PY3_PROGRAM( + yql/essentials/core/expr_nodes_gen/gen + yql_expr_nodes_gen.jnj + yql_yt_expr_nodes.json + yql_yt_expr_nodes.gen.h + yql_yt_expr_nodes.decl.inl.h + yql_yt_expr_nodes.defs.inl.h + IN yql_expr_nodes_gen.jnj + IN yql_yt_expr_nodes.json + OUT yql_yt_expr_nodes.gen.h + OUT yql_yt_expr_nodes.decl.inl.h + OUT yql_yt_expr_nodes.defs.inl.h + OUTPUT_INCLUDES + ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h + ${ARCADIA_ROOT}/util/generic/hash_set.h +) END() diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make index cb2ee1f19b..78813e529d 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make +++ b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make @@ -3,6 +3,7 @@ UNITTEST() SRCS( yql_yt_job_ut.cpp yql_yt_output_stream_ut.cpp + yql_yt_raw_table_reader_ut.cpp ) PEERDIR( diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp index 4b4399dcca..7be6348426 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp @@ -6,10 +6,18 @@ namespace NYql::NFmr { -TString TableContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; +TString TableContent_1 = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; +TString TableContent_2 = "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; +TString TableContent_3 = "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; Y_UNIT_TEST_SUITE(FmrJobTests) { Y_UNIT_TEST(DownloadTable) { @@ -17,14 +25,15 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + TFmrJobSettings settings = {1}; + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings); TYtTableRef input = TYtTableRef("test_cluster", "test_path"); TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id"); TDownloadTaskParams params = TDownloadTaskParams(input, output); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - ytUploadedTablesMock->AddTable(input, TableContent); + ytUploadedTablesMock->AddTable(input, TableContent_1); auto res = job->Download(params); @@ -35,57 +44,47 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { UNIT_ASSERT_EQUAL(statistics->OutputTables.at(output).Rows, 4); auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync(); UNIT_ASSERT_C(resultTableContent, "Result table content is empty"); - UNIT_ASSERT_NO_DIFF(*resultTableContent, TableContent); + UNIT_ASSERT_NO_DIFF(*resultTableContent, TableContent_1); } Y_UNIT_TEST(UploadTable) { - TString ytTableContent = TableContent; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + TFmrJobSettings settings = {1}; + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings); TYtTableRef output = TYtTableRef("test_cluster", "test_path"); - TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id"}; + std::vector<TTableRange> ranges = {{"test_part_id"}}; + TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id", .TableRanges = ranges}; auto params = TUploadTaskParams(input, output); - tableDataServicePtr->Put(input.TableId, ytTableContent); + auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0); + tableDataServicePtr->Put(key, TableContent_1); auto res = job->Upload(params); auto err = std::get_if<TError>(&res); UNIT_ASSERT_C(!err,err->ErrorMessage); - UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), ytTableContent); + UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), TableContent_1); } Y_UNIT_TEST(MergeFmrTables) { - TString TableContent_1 = - "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)"; - TString TableContent_2 = - "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; - TString TableContent_3 = - "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + TFmrJobSettings settings = {1}; + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings); - TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"}; - TFmrTableInputRef input_2 = TFmrTableInputRef{.TableId = "test_table_id_2"}; - TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"}; + std::vector<TTableRange> ranges = {{"test_part_id"}}; + + TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges}; + TFmrTableInputRef input_2 = TFmrTableInputRef{.TableId = "test_table_id_2", .TableRanges = ranges}; + TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges}; TTaskTableRef input_table_ref_1 = {input_1}; TTaskTableRef input_table_ref_2 = {input_2}; TTaskTableRef input_table_ref_3 = {input_3}; @@ -94,9 +93,12 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { auto params = TMergeTaskParams(inputs, output); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - tableDataServicePtr->Put(input_1.TableId, TableContent_1); - tableDataServicePtr->Put(input_2.TableId, TableContent_2); - tableDataServicePtr->Put(input_3.TableId, TableContent_3); + auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0); + auto key_2 = GetTableDataServiceKey(input_2.TableId, "test_part_id", 0); + auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0); + tableDataServicePtr->Put(key_1, TableContent_1); + tableDataServicePtr->Put(key_2, TableContent_2); + tableDataServicePtr->Put(key_3, TableContent_3); auto res = job->Merge(params); @@ -109,31 +111,19 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { } Y_UNIT_TEST(MergeMixedTables) { - TString TableContent_1 = - "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)"; - TString TableContent_2 = - "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; - TString TableContent_3 = - "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag); + TFmrJobSettings settings = {1}; + IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings); + + std::vector<TTableRange> ranges = {{"test_part_id"}}; - TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"}; + TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges}; TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster"); - TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"}; + TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges}; TTaskTableRef input_table_ref_1 = {input_1}; TTaskTableRef input_table_ref_2 = {input_2}; TTaskTableRef input_table_ref_3 = {input_3}; @@ -142,9 +132,11 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { auto params = TMergeTaskParams(inputs, output); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - tableDataServicePtr->Put(input_1.TableId, TableContent_1); + auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0); + auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0); + tableDataServicePtr->Put(key_1, TableContent_1); ytUploadedTablesMock->AddTable(input_2, TableContent_2); - tableDataServicePtr->Put(input_3.TableId, TableContent_3); + tableDataServicePtr->Put(key_3, TableContent_3); auto res = job->Merge(params); @@ -159,11 +151,6 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { Y_UNIT_TEST_SUITE(TaskRunTests) { Y_UNIT_TEST(RunDownloadTask) { - TString ytTableContent = - "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}"; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); @@ -174,7 +161,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id"); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - ytUploadedTablesMock->AddTable(input, ytTableContent); + ytUploadedTablesMock->AddTable(input, TableContent_1); TDownloadTaskParams params = TDownloadTaskParams(input, output); TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id"); @@ -183,55 +170,50 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed); auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync(); UNIT_ASSERT_C(resultTableContent, "Result table content is empty"); - UNIT_ASSERT_NO_DIFF(*resultTableContent, ytTableContent); + UNIT_ASSERT_NO_DIFF(*resultTableContent, TableContent_1); } Y_UNIT_TEST(RunUploadTask) { - TString ytTableContent = - "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}"; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id"}; + std::vector<TTableRange> ranges = {{"test_part_id"}}; + TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id", .TableRanges = ranges}; TYtTableRef output = TYtTableRef("test_cluster", "test_path"); TUploadTaskParams params = TUploadTaskParams(input, output); TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); - tableDataServicePtr->Put(input.TableId, ytTableContent); + auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0); + + tableDataServicePtr->Put(key, TableContent_1); ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed); - UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), ytTableContent); + UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), TableContent_1); } Y_UNIT_TEST(RunUploadTaskWithNoTable) { - TString ytTableContent = - "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}"; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id"}; + std::vector<TTableRange> ranges = {{"test_part_id"}}; + TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id", .TableRanges = ranges}; TYtTableRef output = TYtTableRef("test_cluster", "test_path"); TUploadTaskParams params = TUploadTaskParams(input, output); TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); // No table in tableDataServicePtr - // tableDataServicePtr->Put(input.TableId, ytTableContent); + // auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0); + // tableDataServicePtr->Put(key, ytTableContent); ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; @@ -240,30 +222,16 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { } Y_UNIT_TEST(RunMergeTask) { - TString TableContent_1 = - "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)"; - TString TableContent_2 = - "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; - TString TableContent_3 = - "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"}; + std::vector<TTableRange> ranges = {{"test_part_id"}}; + TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges}; TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster"); - TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"}; + TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges}; TTaskTableRef input_table_ref_1 = {input_1}; TTaskTableRef input_table_ref_2 = {input_2}; TTaskTableRef input_table_ref_3 = {input_3}; @@ -274,9 +242,11 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); - tableDataServicePtr->Put(input_1.TableId, TableContent_1); + auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0); + auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0); + tableDataServicePtr->Put(key_1, TableContent_1); ytUploadedTablesMock->AddTable(input_2, TableContent_2); - tableDataServicePtr->Put(input_3.TableId, TableContent_3); + tableDataServicePtr->Put(key_3, TableContent_3); ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; @@ -287,30 +257,16 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { } Y_UNIT_TEST(RunMergeTaskWithNoTable) { - TString TableContent_1 = - "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)"; - TString TableContent_2 = - "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; - TString TableContent_3 = - "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};" - "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" - "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};" - "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock(); NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock); std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"}; + std::vector<TTableRange> ranges = {{"test_part_id"}}; + TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges}; TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster"); - TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"}; + TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges}; TTaskTableRef input_table_ref_1 = {input_1}; TTaskTableRef input_table_ref_2 = {input_2}; TTaskTableRef input_table_ref_3 = {input_3}; @@ -321,10 +277,12 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); - tableDataServicePtr->Put(input_1.TableId, TableContent_1); + auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0); + auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0); + tableDataServicePtr->Put(key_1, TableContent_1); // No table in Yt // ytUploadedTablesMock->AddTable(input_2, TableContent_2); - tableDataServicePtr->Put(input_3.TableId, TableContent_3); + tableDataServicePtr->Put(key_3, TableContent_3); ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_reader_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_reader_ut.cpp new file mode 100644 index 0000000000..4d45e054b6 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_reader_ut.cpp @@ -0,0 +1,98 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_output_stream.h> +#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h> +#include <yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h> + +namespace NYql::NFmr { + +TString originalTableContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};" + "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};" + "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};" + "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"; + +Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) { + Y_UNIT_TEST(ReadOneChunkSmallPart) { + size_t chunkSize = 1024; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + + TFmrOutputStreamSettings settings{chunkSize}; + TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings); + + outputStream.Write(originalTableContent.data(), originalTableContent.size()); + outputStream.Flush(); + + TFmrRawTableReaderSettings readerSettings{1}; + std::vector<TTableRange> tableRanges = {{"partId", 0, 1}}; + TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings); + + char buffer[10]; + reader.Read(buffer, 10); + TString readTableContentPart = {buffer, 10}; + auto originalTableContentPart = originalTableContent.substr(0, 10); + UNIT_ASSERT_NO_DIFF(readTableContentPart, originalTableContentPart); + } + + Y_UNIT_TEST(ReadAllOneChunk) { + size_t chunkSize = 1024; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + + TFmrOutputStreamSettings settings{chunkSize}; + TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings); + + outputStream.Write(originalTableContent.data(), originalTableContent.size()); + outputStream.Flush(); + + TFmrRawTableReaderSettings readerSettings{1}; + std::vector<TTableRange> tableRanges = {{"partId", 0, 1}}; + TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings); + + auto readTableContent = reader.ReadAll(); + UNIT_ASSERT_NO_DIFF(readTableContent, originalTableContent); + } + + Y_UNIT_TEST(ReadAllMultipleChunks) { + size_t chunkSize = 32; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + + TFmrOutputStreamSettings settings{chunkSize}; + TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings); + + outputStream.Write(originalTableContent.data(), originalTableContent.size()); + outputStream.Flush(); + + TFmrRawTableReaderSettings readerSettings{1}; + + auto maxChunk = originalTableContent.size() / chunkSize + 1; + std::vector<TTableRange> tableRanges = {{"partId", 0, maxChunk}}; + TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings); + + auto readTableContent = reader.ReadAll(); + UNIT_ASSERT_NO_DIFF(readTableContent, originalTableContent); + } + + Y_UNIT_TEST(ReadAllMultipleChunksBigReadAhead) { + size_t chunkSize = 32; + + ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); + + TFmrOutputStreamSettings settings{chunkSize}; + TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings); + + outputStream.Write(originalTableContent.data(), originalTableContent.size()); + outputStream.Flush(); + + TFmrRawTableReaderSettings readerSettings{5}; + + auto maxChunk = originalTableContent.size() / chunkSize + 1; + std::vector<TTableRange> tableRanges = {{"partId", 0, maxChunk}}; + TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings); + + auto readTableContent = reader.ReadAll(); + UNIT_ASSERT_NO_DIFF(readTableContent, originalTableContent); + } +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/ya.make b/yt/yql/providers/yt/fmr/job/impl/ya.make index 89865c44a4..969c305ed1 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ya.make +++ b/yt/yql/providers/yt/fmr/job/impl/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( yql_yt_job_impl.cpp yql_yt_output_stream.cpp + yql_yt_raw_table_reader.cpp ) PEERDIR( diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp index 4e18a287a1..40d0a26760 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp @@ -4,6 +4,7 @@ #include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h> #include <yt/yql/providers/yt/fmr/job/impl/yql_yt_output_stream.h> +#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h> #include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> #include <yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h> #include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h> @@ -15,8 +16,8 @@ namespace NYql::NFmr { class TFmrJob: public IFmrJob { public: - TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag) - : TableDataService_(tableDataService), YtService_(ytService), CancelFlag_(cancelFlag) + TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings) + : TableDataService_(tableDataService), YtService_(ytService), CancelFlag_(cancelFlag), Settings_(settings) { } @@ -57,60 +58,68 @@ public: } virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const TClusterConnection& clusterConnection) override { - const auto ytTable = params.Output; - const auto cluster = params.Output.Cluster; - const auto path = params.Output.Path; - const auto tableId = params.Input.TableId; - - YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path; + try { + const auto ytTable = params.Output; + const auto cluster = params.Output.Cluster; + const auto path = params.Output.Path; + const auto tableId = params.Input.TableId; + const auto tableRanges = params.Input.TableRanges; - TMaybe<TString> getResult = TableDataService_->Get(tableId).GetValueSync(); + YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path; - if (!getResult) { - YQL_CLOG(ERROR, FastMapReduce) << "Table " << tableId << " not found"; - return TError("Table not found"); - } + auto res = GetFmrTableStream(params.Input); + auto err = std::get_if<TError>(&res); + if (err) { + return *err; + } + auto inputStream = std::get_if<THolder<IInputStream>>(&res); - TString tableContent = getResult.GetRef(); - TStringInput inputStream(tableContent); + // How to raise if not found - YtService_->Upload(ytTable, inputStream, clusterConnection); + YtService_->Upload(ytTable, *inputStream->get(), clusterConnection); - return TStatistics(); + return TStatistics(); + } catch (...) { + return TError(CurrentExceptionMessage()); + } } virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const TClusterConnection& clusterConnection) override { // расширить таск парамс. добавить туда мету - const auto inputs = params.Input; - const auto output = params.Output; + try { + const auto inputs = params.Input; + const auto output = params.Output; - YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs"; + YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs"; - TFmrOutputStream outputStream(output.TableId, output.PartId, TableDataService_); + TFmrOutputStream outputStream(output.TableId, output.PartId, TableDataService_); - ui32 totalRowsCount = 0; + ui32 totalRowsCount = 0; - for (const auto& inputTableRef : inputs) { - if (CancelFlag_->load()) { - return TError("Canceled"); - } - ui64 rowsCount = 0; // TMP Todo get rows count from input stats - auto res = GetTableInputStream(inputTableRef, rowsCount, clusterConnection); - totalRowsCount += rowsCount; + for (const auto& inputTableRef : inputs) { + if (CancelFlag_->load()) { + return TError("Canceled"); + } + ui64 rowsCount = 0; // TMP Todo get rows count from input stats + auto res = GetTableInputStream(inputTableRef, rowsCount, clusterConnection); + totalRowsCount += rowsCount; - auto err = std::get_if<TError>(&res); - if (err) { - return *err; + auto err = std::get_if<TError>(&res); + if (err) { + return *err; + } + auto inputStream = std::get_if<THolder<IInputStream>>(&res); + TransferData(inputStream->get(), &outputStream); } - auto inputStream = std::get_if<THolder<IInputStream>>(&res); - TransferData(inputStream->get(), &outputStream); - } - outputStream.Flush(); + outputStream.Flush(); - TTableStats stats = outputStream.GetStats(); - stats.Rows = totalRowsCount; + TTableStats stats = outputStream.GetStats(); + stats.Rows = totalRowsCount; - return TStatistics({{output, stats}}); + return TStatistics({{output, stats}}); + } catch (...) { + return TError(CurrentExceptionMessage()); + } } private: @@ -141,33 +150,40 @@ private: } std::variant<THolder<IInputStream>, TError> GetFmrTableStream(const TFmrTableInputRef& fmrTable) { - auto res = TableDataService_->Get(fmrTable.TableId).GetValueSync(); - if (!res) { - return TError("Table not found"); - } - auto tableContent = *res; - TStringStream stream; - stream << tableContent; - return MakeHolder<TStringStream>(stream); + + auto settings = TFmrRawTableReaderSettings(Settings_.ReadAheadChunks); + return MakeHolder<TFmrRawTableReader>(TFmrRawTableReader( + fmrTable.TableId, + fmrTable.TableRanges, + TableDataService_, + settings + )); } private: ITableDataService::TPtr TableDataService_; IYtService::TPtr YtService_; std::shared_ptr<std::atomic<bool>> CancelFlag_; + const TFmrJobSettings Settings_; }; -IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag) { - return MakeIntrusive<TFmrJob>(tableDataService, ytService, cancelFlag); +IFmrJob::TPtr MakeFmrJob( + ITableDataService::TPtr tableDataService, + IYtService::TPtr ytService, + std::shared_ptr<std::atomic<bool>> cancelFlag, + const TFmrJobSettings& settings +) { + return MakeIntrusive<TFmrJob>(tableDataService, ytService, cancelFlag, settings); } TJobResult RunJob( TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, - std::shared_ptr<std::atomic<bool>> cancelFlag + std::shared_ptr<std::atomic<bool>> cancelFlag, + const TFmrJobSettings& settings ) { - IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag); + IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag, settings); auto processTask = [job, task] (auto&& taskParams) { using T = std::decay_t<decltype(taskParams)>; diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h index 1da096ee23..284b68040d 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h @@ -6,8 +6,12 @@ namespace NYql::NFmr { -IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag); +struct TFmrJobSettings { + ui64 ReadAheadChunks = 1; +}; -TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag); +IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings); + +TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings = {}); } // namespace NYql diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.cpp new file mode 100644 index 0000000000..c458be66b6 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.cpp @@ -0,0 +1,83 @@ +#include "yql_yt_raw_table_reader.h" +#include <yql/essentials/utils/log/log.h> +#include <yt/yql/providers/yt/fmr/utils/table_data_service_key.h> + +namespace NYql::NFmr { + +TFmrRawTableReader::TFmrRawTableReader( + const TString& tableId, + const std::vector<TTableRange>& tableRanges, + ITableDataService::TPtr tableDataService, + const TFmrRawTableReaderSettings& settings +) + : TableId_(tableId) + , TableRanges_(tableRanges) + , TableDataService_(tableDataService) + , Settings_(settings) +{ + ReadAhead(); +} + +size_t TFmrRawTableReader::DoRead(void* buf, size_t len) { + ui64 totalRead = 0; + char* output = static_cast<char*>(buf); + + while (len > 0) { + ui64 available = DataBuffer_.size() - CurrentPosition_; + if (available > 0) { + ui64 toCopy = std::min(available, len); + + auto start = DataBuffer_.Begin() + CurrentPosition_; + auto end = start + toCopy; + std::copy(start, end, output); + + CurrentPosition_ += toCopy; + output += toCopy; + len -= toCopy; + totalRead += toCopy; + } else if (!PendingChunks_.empty()) { + auto chunk = PendingChunks_.front(); + TMaybe<TString> data; + try { + data = chunk.Data.GetValueSync(); + } catch (...) { + ythrow yexception() << "Error reading chunk:" << chunk.Meta << "Error: " << CurrentExceptionMessage(); + } + + if (data) { + DataBuffer_.Assign(data->data(), data->size()); + } else { + ythrow yexception() << "No data for chunk:" << chunk.Meta; + } + + PendingChunks_.pop(); + CurrentPosition_ = 0; + available = DataBuffer_.size(); + + ReadAhead(); + } else { + break; + } + } + return totalRead; +} + +void TFmrRawTableReader::ReadAhead() { + while (PendingChunks_.size() < Settings_.ReadAheadChunks) { + if (CurrentRange_ < TableRanges_.size()) { + auto currentPartId = TableRanges_[CurrentRange_].PartId; + if (CurrentChunk_ < TableRanges_[CurrentRange_].MaxChunk) { + auto key = GetTableDataServiceKey(TableId_, currentPartId, CurrentChunk_); + PendingChunks_.push({TableDataService_->Get(key), {TableId_, currentPartId, CurrentChunk_}}); + CurrentChunk_++; + } else { + CurrentRange_++; + } + } + else { + break; + } + } +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h new file mode 100644 index 0000000000..ccebc661b4 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h @@ -0,0 +1,45 @@ +#pragma once + +#include <queue> +#include <util/generic/buffer.h> +#include <util/stream/input.h> +#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> +#include <yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h> + +namespace NYql::NFmr { + +struct TFmrRawTableReaderSettings { + ui64 ReadAheadChunks = 1; +}; + +struct TPendingFmrChunk { + NThreading::TFuture<TMaybe<TString>> Data; + TFmrChunkMeta Meta; +}; + +class TFmrRawTableReader: public IInputStream { + public: + TFmrRawTableReader( + const TString& tableId, + const std::vector<TTableRange>& tableRanges, + ITableDataService::TPtr tableDataService, + const TFmrRawTableReaderSettings& settings + ); + protected: + size_t DoRead(void* buf, size_t len) override; + private: + void ReadAhead(); + private: + const TString TableId_; + const std::vector<TTableRange> TableRanges_; + ITableDataService::TPtr TableDataService_; + const TFmrRawTableReaderSettings Settings_; + + ui64 CurrentRange_ = 0; + ui64 CurrentChunk_ = 0; + ui64 CurrentPosition_ = 0; + TBuffer DataBuffer_; + std::queue<TPendingFmrChunk> PendingChunks_; +}; + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp index c3a182b486..319cf20801 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp @@ -10,6 +10,10 @@ TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, co return MakeIntrusive<TTaskState>(taskStatus, taskId, taskErrorMessage, stats); } +TString TFmrChunkMeta::ToString() const { + return TStringBuilder() << TableId << ":" << PartId << ":" << std::to_string(Chunk); +} + } // namespace NYql::NFmr template<> @@ -22,3 +26,8 @@ void Out<NYql::NFmr::TFmrError>(IOutputStream& out, const NYql::NFmr::TFmrError& } out << error.ErrorMessage; } + +template<> +void Out<NYql::NFmr::TFmrChunkMeta>(IOutputStream& out, const NYql::NFmr::TFmrChunkMeta& meta) { + out << meta.ToString(); +} diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h index a4b1ebf37c..4c5bd1252f 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h @@ -3,6 +3,7 @@ #include <util/digest/numeric.h> #include <util/generic/maybe.h> #include <util/generic/string.h> +#include <util/string/builder.h> #include <vector> namespace NYql::NFmr { @@ -66,6 +67,14 @@ struct TTableRange { ui64 MaxChunk = 1; // Пока такой дефолт }; +struct TFmrChunkMeta { + TString TableId; + TString PartId; + ui64 Chunk = 0; // сделать out метод + + TString ToString() const; +}; + struct TFmrTableInputRef { TString TableId; std::vector<TTableRange> TableRanges; diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp index ec1edd10d9..330cfbbbad 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -262,7 +262,7 @@ void GetIntegerConstraints(const TExprNode::TPtr& column, bool& isSigned, ui64& } } -void QuoteColumnForQL(const TStringBuf columnName, TStringBuilder& result) { +void QuoteColumnForQL(const TStringBuf& columnName, TStringBuilder& result) { result << '`'; if (!columnName.Contains('`')) { result << columnName; @@ -278,6 +278,14 @@ void QuoteColumnForQL(const TStringBuf columnName, TStringBuilder& result) { result << '`'; } +void ConvertComparisonForQL(const TStringBuf& opName, TStringBuilder& result) { + if (opName == "==") { + result << '='; + } else { + result << opName; + } +} + void GenerateInputQueryIntegerComparison(const TStringBuf& opName, const TExprNode::TPtr& intColumn, const TExprNode::TPtr& intValue, TStringBuilder& result) { bool columnsIsSigned; ui64 minValueAbs; @@ -310,7 +318,9 @@ void GenerateInputQueryIntegerComparison(const TStringBuf& opName, const TExprNo const auto columnName = intColumn->ChildPtr(1)->Content(); const auto valueStr = maybeInt.Cast().Literal().Value(); QuoteColumnForQL(columnName, result); - result << " " << opName << " " << valueStr; + result << " "; + ConvertComparisonForQL(opName, result); + result << " " << valueStr; } } @@ -910,6 +920,8 @@ public: execCtx->SetOutput(outputOp.Cast().Output()); } + ReportBlockStatus(opBase, execCtx); + TFuture<void> future; if (auto op = opBase.Maybe<TYtSort>()) { future = DoSort(op.Cast(), execCtx); @@ -5734,6 +5746,49 @@ private: } } + static void ReportBlockStatus(const TYtOpBase& op, const TExecContext<TRunOptions>::TPtr& execCtx) { + if (execCtx->Options_.PublicId().Empty()) { + return; + } + + auto opPublicId = *execCtx->Options_.PublicId(); + + TOperationProgress::EOpBlockStatus status; + if (auto map = op.Maybe<TYtMap>()) { + status = DetermineProgramBlockStatus(map.Cast().Mapper().Body().Ref()); + } else if (auto map = op.Maybe<TYtReduce>()) { + status = DetermineProgramBlockStatus(map.Cast().Reducer().Body().Ref()); + } else if (auto map = op.Maybe<TYtMapReduce>()) { + status = DetermineProgramBlockStatus(map.Cast().Reducer().Body().Ref()); + if (auto mapLambda = map.Cast().Mapper().Maybe<TCoLambda>()) { + status = TOperationProgress::CombineBlockStatuses(status, DetermineProgramBlockStatus(mapLambda.Cast().Body().Ref())); + } + } else if (auto fill = op.Maybe<TYtFill>()) { + status = DetermineProgramBlockStatus(fill.Cast().Content().Body().Ref()); + } else if (op.Maybe<TYtSort>()) { + return; + } else if (op.Maybe<TYtCopy>()) { + return; + } else if (op.Maybe<TYtMerge>()) { + return; + } else if (op.Maybe<TYtTouch>()) { + return; + } else if (op.Maybe<TYtDropTable>()) { + return; + } else if (op.Maybe<TYtStatOut>()) { + return; + } else if (op.Maybe<TYtDqProcessWrite>()) { + return; + } else { + YQL_ENSURE(false, "unknown operation: " << op.Ref().Content()); + } + + YQL_CLOG(INFO, ProviderYt) << "Reporting " << status << " block status for operation " << op.Ref().Content() << " with public id #" << opPublicId; + auto p = TOperationProgress(TString(YtProviderName), opPublicId, TOperationProgress::EState::InProgress); + p.BlockStatus = status; + execCtx->Session_->ProgressWriter_(p); + } + private: const TYtNativeServices Services_; const TConfigClusters::TPtr Clusters_; diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp index da0b5919ac..076237b9d7 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp @@ -347,7 +347,6 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EarlyMergeJoin(TExprBas TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBase node, TExprContext& ctx) const { auto equiJoin = node.Cast<TYtEquiJoin>(); - auto cluster = equiJoin.DataSink().Cluster().StringValue(); const bool tryReorder = State_->Types->CostBasedOptimizer != ECostBasedOptimizerType::Disable && equiJoin.Input().Size() > 2 @@ -369,12 +368,12 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBa if (tryReorder) { YQL_CLOG(INFO, ProviderYt) << "Collecting cbo stats for equiJoin"; - auto collectStatus = CollectCboStats(cluster, *tree, State_, ctx); + auto collectStatus = CollectCboStats(*tree, State_, ctx); if (collectStatus == TStatus::Repeat) { return ExportYtEquiJoin(equiJoin, *tree, ctx, State_); } - const auto optimizedTree = OrderJoins(tree, State_, cluster, ctx); + const auto optimizedTree = OrderJoins(tree, State_, ctx); if (optimizedTree != tree) { return ExportYtEquiJoin(equiJoin, *optimizedTree, ctx, State_); } diff --git a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp index 6cf823a7e1..70bb6fed87 100644 --- a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp @@ -21,7 +21,6 @@ void AddJoinColumns(THashMap<TString, THashSet<TString>>& relJoinColumns, const IGraphTransformer::TStatus ExtractInMemorySize( const TYtState::TPtr& state, - TString cluster, TExprContext& ctx, TMaybe<ui64>& leftMemorySize, TMaybe<ui64>& rightMemorySize, @@ -46,7 +45,7 @@ IGraphTransformer::TStatus ExtractInMemorySize( bool isCross = false; auto status = CollectStatsAndMapJoinSettings(mode, mapSettings, leftStats, rightStats, leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys, - leftLeaf, rightLeaf, *state, isCross, cluster, ctx); + leftLeaf, rightLeaf, *state, isCross, ctx); if (status != IGraphTransformer::TStatus::Ok) { YQL_CLOG(WARN, ProviderYt) << "Unable to collect paths and labels: " << status; return status; @@ -57,7 +56,7 @@ IGraphTransformer::TStatus ExtractInMemorySize( TVector<TString> leftJoinKeyList(leftJoinKeys.begin(), leftJoinKeys.end()); const ui64 rows = mapSettings.LeftRows; ui64 size = 0; - auto status = CalculateJoinLeafSize(size, mapSettings, leftLeaf->Section, *op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables); + auto status = CalculateJoinLeafSize(size, mapSettings, leftLeaf->Section, *op, ctx, true, leftItemType, leftJoinKeyList, state, leftTables); if (status != IGraphTransformer::TStatus::Ok) { YQL_CLOG(WARN, ProviderYt) << "Unable to calculate left join leaf size: " << status; return status; @@ -77,7 +76,7 @@ IGraphTransformer::TStatus ExtractInMemorySize( const ui64 rows = mapSettings.RightRows; ui64 size = 0; - auto status = CalculateJoinLeafSize(size, mapSettings, rightLeaf->Section, *op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables); + auto status = CalculateJoinLeafSize(size, mapSettings, rightLeaf->Section, *op, ctx, false, rightItemType, rightJoinKeyList, state, rightTables); if (status != IGraphTransformer::TStatus::Ok) { YQL_CLOG(WARN, ProviderYt) << "Unable to calculate right join leaf size: " << status; return status; @@ -94,7 +93,6 @@ IGraphTransformer::TStatus ExtractInMemorySize( IGraphTransformer::TStatus CollectCboStatsLeaf( const THashMap<TString, THashSet<TString>>& relJoinColumns, - const TString& cluster, TYtJoinNodeLeaf& leaf, const TYtState::TPtr& state, TExprContext& ctx) @@ -115,36 +113,36 @@ IGraphTransformer::TStatus CollectCboStatsLeaf( } IYtGateway::TPathStatResult result; - return TryEstimateDataSizeChecked(result, leaf.Section, cluster, tables, requestedColumnList, *state, ctx); + return TryEstimateDataSizeChecked(result, leaf.Section, tables, requestedColumnList, *state, ctx); } -IGraphTransformer::TStatus CollectCboStatsNode(THashMap<TString, THashSet<TString>>& relJoinColumns, const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) { +IGraphTransformer::TStatus CollectCboStatsNode(THashMap<TString, THashSet<TString>>& relJoinColumns, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) { TYtJoinNodeLeaf* leftLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Left.Get()); TYtJoinNodeLeaf* rightLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Right.Get()); AddJoinColumns(relJoinColumns, op); TRelSizeInfo leftSizeInfo; TRelSizeInfo rightSizeInfo; - auto result = PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, state, cluster, ctx, &op); + auto result = PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, state, ctx, &op); if (result != IGraphTransformer::TStatus::Ok) { return result; } if (leftLeaf) { - result = CollectCboStatsLeaf(relJoinColumns, cluster, *leftLeaf, state, ctx); + result = CollectCboStatsLeaf(relJoinColumns, *leftLeaf, state, ctx); } else { auto& leftOp = *dynamic_cast<TYtJoinNodeOp*>(op.Left.Get()); - result = CollectCboStatsNode(relJoinColumns, cluster, leftOp, state, ctx); + result = CollectCboStatsNode(relJoinColumns, leftOp, state, ctx); } if (result != IGraphTransformer::TStatus::Ok) { return result; } if (rightLeaf) { - result = CollectCboStatsLeaf(relJoinColumns, cluster, *rightLeaf, state, ctx); + result = CollectCboStatsLeaf(relJoinColumns, *rightLeaf, state, ctx); } else { auto& rightOp = *dynamic_cast<TYtJoinNodeOp*>(op.Right.Get()); - result = CollectCboStatsNode(relJoinColumns, cluster, rightOp, state, ctx); + result = CollectCboStatsNode(relJoinColumns, rightOp, state, ctx); } return result; } @@ -155,7 +153,6 @@ IGraphTransformer::TStatus PopulateJoinStrategySizeInfo( TRelSizeInfo& outLeft, TRelSizeInfo& outRight, const TYtState::TPtr& state, - TString cluster, TExprContext& ctx, TYtJoinNodeOp* op) { auto mapJoinUseFlow = state->Configuration->MapJoinUseFlow.Get().GetOrElse(DEFAULT_MAP_JOIN_USE_FLOW); @@ -217,22 +214,22 @@ IGraphTransformer::TStatus PopulateJoinStrategySizeInfo( return IGraphTransformer::TStatus::Ok; } - auto status = ExtractInMemorySize(state, cluster, ctx, outLeft.MapJoinMemSize, outRight.MapJoinMemSize, ESizeStatCollectMode::ColumnarSize, op, labels, + auto status = ExtractInMemorySize(state, ctx, outLeft.MapJoinMemSize, outRight.MapJoinMemSize, ESizeStatCollectMode::ColumnarSize, op, labels, numLeaves, leftLeaf, leftTablesReady, leftTables, leftJoinKeys, leftItemType, rightLeaf, rightTablesReady, rightTables, rightJoinKeys, rightItemType); if (status != IGraphTransformer::TStatus::Ok) { return status; } - status = ExtractInMemorySize(state, cluster, ctx, outLeft.LookupJoinMemSize, outRight.LookupJoinMemSize, ESizeStatCollectMode::RawSize, op, labels, + status = ExtractInMemorySize(state, ctx, outLeft.LookupJoinMemSize, outRight.LookupJoinMemSize, ESizeStatCollectMode::RawSize, op, labels, numLeaves, leftLeaf, leftTablesReady, leftTables, leftJoinKeys, leftItemType, rightLeaf, rightTablesReady, rightTables, rightJoinKeys, rightItemType); return status; } -IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) { +IGraphTransformer::TStatus CollectCboStats(TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) { THashMap<TString, THashSet<TString>> relJoinColumns; - return CollectCboStatsNode(relJoinColumns, cluster, op, state, ctx); + return CollectCboStatsNode(relJoinColumns, op, state, ctx); } TVector<TString> JoinLeafLabels(TExprNode::TPtr label) { diff --git a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h index 0d05beea7a..fd5b894bb2 100644 --- a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h +++ b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h @@ -5,9 +5,9 @@ namespace NYql { -IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx); +IGraphTransformer::TStatus CollectCboStats(TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx); -IGraphTransformer::TStatus PopulateJoinStrategySizeInfo(TRelSizeInfo& outLeft, TRelSizeInfo& outRight, const TYtState::TPtr& state, TString cluster, TExprContext& ctx, TYtJoinNodeOp* op); +IGraphTransformer::TStatus PopulateJoinStrategySizeInfo(TRelSizeInfo& outLeft, TRelSizeInfo& outRight, const TYtState::TPtr& state, TExprContext& ctx, TYtJoinNodeOp* op); TVector<TString> JoinLeafLabels(TExprNode::TPtr label); diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp index 9b691e5554..5ffc4cf1ef 100644 --- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -157,7 +157,7 @@ public: { } - TVector<TVector<ui64>> EstimateColumnStats(TExprContext& ctx, const TString& cluster, const TVector<TVector<TYtPathInfo::TPtr>>& groupIdPathInfos, ui64& sumAllTableSizes) { + TVector<TVector<ui64>> EstimateColumnStats(TExprContext& ctx, const TVector<TVector<TYtPathInfo::TPtr>>& groupIdPathInfos, ui64& sumAllTableSizes) { TVector<TVector<ui64>> groupIdColumnarStats; groupIdColumnarStats.reserve(groupIdPathInfos.size()); TVector<bool> lookupsInfo; @@ -175,7 +175,7 @@ public: flattenPaths.push_back(pathInfo); } } - auto result = EstimateDataSize(cluster, flattenPaths, Nothing(), *State_, ctx); + auto result = EstimateDataSize(flattenPaths, Nothing(), *State_, ctx); size_t statIdx = 0; size_t pathIdx = 0; for (const auto& [idx, pathInfos]: Enumerate(groupIdPathInfos)) { @@ -302,7 +302,7 @@ public: } else { TVector<TVector<std::tuple<ui64, ui64, NYT::TRichYPath>>> partitionTuplesArr; ui64 sumAllTableSizes = 0; - TVector<TVector<ui64>> groupIdColumnarStats = EstimateColumnStats(ctx, cluster, {groupIdPathInfos}, sumAllTableSizes); + TVector<TVector<ui64>> groupIdColumnarStats = EstimateColumnStats(ctx, {groupIdPathInfos}, sumAllTableSizes); ui64 parts = (sumAllTableSizes + dataSizePerJob - 1) / dataSizePerJob; if (settings.CanFallback && hasErasure && parts > maxTasks) { auto message = DqFallbackErrorMessageWrap("too big table with erasure codec"); @@ -634,7 +634,7 @@ public: } ui64 dataSize = 0; for (auto& [cluster, info]: clusterToNodesAndErasure) { - auto res = EstimateColumnStats(ctx, cluster, clusterToGroups[cluster], dataSize); + auto res = EstimateColumnStats(ctx, clusterToGroups[cluster], dataSize); auto codecCpu = State_->Configuration->ErasureCodecCpuForDq.Get(cluster); if (!codecCpu) { continue; diff --git a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp index 32c1dc2359..b1868c0a4a 100644 --- a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp @@ -188,7 +188,7 @@ bool IsYtIsolatedLambdaImpl(const TExprNode& lambdaBody, TSyncMap& syncList, TSt } IGraphTransformer::TStatus EstimateDataSize(IYtGateway::TPathStatResult& result, TSet<TString>& requestedColumns, - const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths, + const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx, bool sync) { result = IYtGateway::TPathStatResult{}; @@ -199,9 +199,10 @@ IGraphTransformer::TStatus EstimateDataSize(IYtGateway::TPathStatResult& result, const bool useColumnarStat = GetJoinCollectColumnarStatisticsMode(*state.Configuration) != EJoinCollectColumnarStatisticsMode::Disable && !state.Types->UseTableMetaFromGraph; - TVector<size_t> reqMap; TVector<IYtGateway::TPathStatReq> pathStatReqs; - ui64 totalChunkCount = 0; + THashMap<TString, TVector<size_t>> reqMapByCluster; + TMap<TString, TVector<IYtGateway::TPathStatReq>> pathStatReqsByCluster; + THashMap<TString, ui64> totalChunkCountByCluster; for (size_t i: xrange(paths.size())) { const TYtPathInfo::TPtr& pathInfo = paths[i]; YQL_ENSURE(pathInfo->Table->Stat); @@ -222,62 +223,83 @@ IGraphTransformer::TStatus EstimateDataSize(IYtGateway::TPathStatResult& result, overrideColumns = columns; } - auto ytPath = BuildYtPathForStatRequest(cluster, *pathInfo, overrideColumns, state, ctx); + auto ytPath = BuildYtPathForStatRequest(*pathInfo, overrideColumns, state, ctx); if (!ytPath) { return IGraphTransformer::TStatus::Error; } if (ytPath->Columns_) { - pathStatReqs.push_back( + const TString cluster = pathInfo->Table->Cluster; + YQL_ENSURE(cluster); + pathStatReqsByCluster[cluster].push_back( IYtGateway::TPathStatReq() .Path(*ytPath) .IsTemp(pathInfo->Table->IsTemp) .IsAnonymous(pathInfo->Table->IsAnonymous) .Epoch(pathInfo->Table->Epoch.GetOrElse(0)) ); - reqMap.push_back(i); - totalChunkCount += pathInfo->Table->Stat->ChunkCount; + reqMapByCluster[cluster].push_back(i); + totalChunkCountByCluster[cluster] += pathInfo->Table->Stat->ChunkCount; } } } - if (!pathStatReqs.empty()) { - for (auto& req : pathStatReqs) { - YQL_ENSURE(req.Path().Columns_); - requestedColumns.insert(req.Path().Columns_->Parts_.begin(), req.Path().Columns_->Parts_.end()); + if (!pathStatReqsByCluster.empty()) { + const TMaybe<ui64> maxChunkCountExtendedStats = state.Configuration->ExtendedStatsMaxChunkCount.Get(); + TMap<TString, IYtGateway::TPathStatResult> pathStatsByCluster; + TMap<TString, NThreading::TFuture<IYtGateway::TPathStatResult>> futuresByCluster; + THashSet<TString> extendedStatsRequested; + IGraphTransformer::TStatus resultStatus = IGraphTransformer::TStatus::Ok; + for (const auto& [cluster, reqs] : pathStatReqsByCluster) { + for (auto& req : reqs) { + YQL_ENSURE(req.Path().Columns_); + requestedColumns.insert(req.Path().Columns_->Parts_.begin(), req.Path().Columns_->Parts_.end()); + } + const bool requestExtendedStats = !sync && maxChunkCountExtendedStats && + (*maxChunkCountExtendedStats == 0 || totalChunkCountByCluster[cluster] <= *maxChunkCountExtendedStats); + IYtGateway::TPathStatOptions pathStatOptions = + IYtGateway::TPathStatOptions(state.SessionId) + .Cluster(cluster) + .Paths(reqs) + .Config(state.Configuration->Snapshot()) + .Extended(requestExtendedStats); + if (requestExtendedStats) { + extendedStatsRequested.insert(cluster); + } + if (sync) { + futuresByCluster[cluster] = state.Gateway->PathStat(std::move(pathStatOptions)); + } else { + auto& pathStats = pathStatsByCluster[cluster]; + pathStats = state.Gateway->TryPathStat(std::move(pathStatOptions)); + if (!pathStats.Success()) { + resultStatus = resultStatus.Combine(IGraphTransformer::TStatus::Repeat); + } + } } - const TMaybe<ui64> maxChunkCountExtendedStats = state.Configuration->ExtendedStatsMaxChunkCount.Get(); - const bool requestExtendedStats = !sync && maxChunkCountExtendedStats && - (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats); - - IYtGateway::TPathStatResult pathStats; - IYtGateway::TPathStatOptions pathStatOptions = - IYtGateway::TPathStatOptions(state.SessionId) - .Cluster(cluster) - .Paths(pathStatReqs) - .Config(state.Configuration->Snapshot()) - .Extended(requestExtendedStats); - if (sync) { - auto future = state.Gateway->PathStat(std::move(pathStatOptions)); + for (auto& [cluster, future] : futuresByCluster) { + auto& pathStats = pathStatsByCluster[cluster]; pathStats = future.GetValueSync(); pathStats.ReportIssues(ctx.IssueManager); if (!pathStats.Success()) { - return IGraphTransformer::TStatus::Error; - } - } else { - pathStats = state.Gateway->TryPathStat(std::move(pathStatOptions)); - if (!pathStats.Success()) { - return IGraphTransformer::TStatus::Repeat; + resultStatus = resultStatus.Combine(IGraphTransformer::TStatus::Error); } } - YQL_ENSURE(pathStats.DataSize.size() == reqMap.size()); - YQL_ENSURE(!requestExtendedStats || pathStats.Extended.size() == reqMap.size()); - for (size_t i: xrange(pathStats.DataSize.size())) { - result.DataSize[reqMap[i]] = pathStats.DataSize[i]; - if (requestExtendedStats) { - result.Extended[reqMap[i]] = pathStats.Extended[i]; + if (resultStatus != IGraphTransformer::TStatus::Ok) { + return resultStatus; + } + + for (auto& [cluster, pathStats] : pathStatsByCluster) { + auto it = reqMapByCluster.find(cluster); + YQL_ENSURE(it != reqMapByCluster.end()); + YQL_ENSURE(pathStats.DataSize.size() == it->second.size()); + YQL_ENSURE(!extendedStatsRequested.contains(cluster) || pathStats.Extended.size() == it->second.size()); + for (size_t i: xrange(pathStats.DataSize.size())) { + result.DataSize[it->second[i]] = pathStats.DataSize[i]; + if (extendedStatsRequested.contains(cluster)) { + result.Extended[it->second[i]] = pathStats.Extended[i]; + } } } } @@ -1847,7 +1869,7 @@ bool IsOutputUsedMultipleTimes(const TExprNode& op, const TParentsMap& parentsMa return node == nullptr; } -TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const TYtPathInfo& pathInfo, +TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TYtPathInfo& pathInfo, const TMaybe<TVector<TString>>& overrideColumns, const TYtState& state, TExprContext& ctx) { auto ytPath = NYT::TRichYPath(pathInfo.Table->Name); @@ -1858,6 +1880,8 @@ TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const if (ytPath.Columns_ && dynamic_cast<TYtTableInfo*>(pathInfo.Table.Get()) && pathInfo.Table->IsAnonymous && !TYtTableInfo::HasSubstAnonymousLabel(pathInfo.Table->FromNode.Cast())) { + const TString cluster = pathInfo.Table->Cluster; + YQL_ENSURE(cluster); TString realTableName = state.AnonymousLabels.Value(std::make_pair(cluster, pathInfo.Table->Name), TString()); if (!realTableName) { TPositionHandle pos; @@ -1873,7 +1897,7 @@ TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const return ytPath; } -TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths, +TMaybe<TVector<ui64>> EstimateDataSize(const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx) { TVector<ui64> result; @@ -1882,7 +1906,7 @@ TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYt bool sync = true; IYtGateway::TPathStatResult res; - auto status = EstimateDataSize(res, requestedColumns, cluster, paths, columns, state, ctx, sync); + auto status = EstimateDataSize(res, requestedColumns, paths, columns, state, ctx, sync); if (status != IGraphTransformer::TStatus::Ok) { return {}; } @@ -1891,11 +1915,11 @@ TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYt } IGraphTransformer::TStatus TryEstimateDataSize(IYtGateway::TPathStatResult& result, TSet<TString>& requestedColumns, - const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths, + const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx) { bool sync = false; - return EstimateDataSize(result, requestedColumns, cluster, paths, columns, state, ctx, sync); + return EstimateDataSize(result, requestedColumns, paths, columns, state, ctx, sync); } TYtSection UpdateInputFields(TYtSection section, TExprBase fields, TExprContext& ctx) { diff --git a/yt/yql/providers/yt/provider/yql_yt_helpers.h b/yt/yql/providers/yt/provider/yql_yt_helpers.h index 61a1c4bab5..3a821b16d9 100644 --- a/yt/yql/providers/yt/provider/yql_yt_helpers.h +++ b/yt/yql/providers/yt/provider/yql_yt_helpers.h @@ -90,12 +90,12 @@ NNodes::TYtPath CopyOrTrivialMap(TPositionHandle pos, NNodes::TExprBase world, N const TCopyOrTrivialMapOpts& opts); bool IsOutputUsedMultipleTimes(const TExprNode& op, const TParentsMap& parentsMap); -TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths, +TMaybe<TVector<ui64>> EstimateDataSize( const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx); IGraphTransformer::TStatus TryEstimateDataSize(IYtGateway::TPathStatResult& result, TSet<TString>& requestedColumns, - const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths, + const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx); -TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const TYtPathInfo& pathInfo, +TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TYtPathInfo& pathInfo, const TMaybe<TVector<TString>>& overrideColumns, const TYtState& state, TExprContext& ctx); NNodes::TYtSection UpdateInputFields(NNodes::TYtSection section, NNodes::TExprBase fields, TExprContext& ctx); diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp index 9f663d2897..f4942ab18d 100644 --- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp @@ -193,11 +193,11 @@ bool HasNonTrivialAny(const TEquiJoinLinkSettings& linkSettings, const TMapJoinS TStatus UpdateInMemorySizeSetting(TMapJoinSettings& settings, TYtSection& inputSection, const TJoinLabels& labels, const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft, - const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster, + const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TVector<TYtPathInfo::TPtr>& tables, bool mapJoinUseFlow) { ui64 size = 0; - auto status = CalculateJoinLeafSize(size, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, cluster, tables); + auto status = CalculateJoinLeafSize(size, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, tables); if (status != TStatus::Ok) { return status; } @@ -230,11 +230,11 @@ TStatus UpdateInMemorySizeSetting(TMapJoinSettings& settings, TYtSection& inputS TStatus UpdateInMemorySizeUsingBlocksSetting(TMapJoinSettings& settings, TYtSection& inputSection, const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft, - const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster, + const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TVector<TYtPathInfo::TPtr>& tables) { ui64 dataSize = 0; - auto status = CalculateJoinLeafSize(dataSize, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, cluster, tables); + auto status = CalculateJoinLeafSize(dataSize, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, tables); if (status != TStatus::Ok) { return status; } @@ -3050,7 +3050,7 @@ bool RewriteYtEmptyJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, TYtJoin } TStatus CollectJoinSideStats(ESizeStatCollectMode sizeMode, TJoinSideStats& stats, TYtSection& inputSection, - const TYtState& state, const TString& cluster, + const TYtState& state, const TVector<TYtPathInfo::TPtr>& tableInfo, const THashSet<TString>& joinKeys, bool isCross, TMaybeNode<TCoLambda> premap, TExprContext& ctx) { @@ -3103,7 +3103,7 @@ TStatus CollectJoinSideStats(ESizeStatCollectMode sizeMode, TJoinSideStats& stat } IYtGateway::TPathStatResult pathStatResult; - auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, cluster, tableInfo, {}, state, ctx); + auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, tableInfo, {}, state, ctx); if (status.Level != TStatus::Ok) { return status; } @@ -3271,8 +3271,6 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo return TStatus::Repeat; } - auto cluster = TString{equiJoin.DataSink().Cluster().Value()}; - TMapJoinSettings mapSettings; TJoinSideStats leftStats; TJoinSideStats rightStats; @@ -3281,7 +3279,7 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo if (allowLookupJoin) { auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::RawSize, mapSettings, leftStats, rightStats, leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys, - &leftLeaf, &rightLeaf, *state, isCross, cluster, ctx); + &leftLeaf, &rightLeaf, *state, isCross, ctx); if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } @@ -3352,7 +3350,7 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo { auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::ColumnarSize, mapSettings, leftStats, rightStats, leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys, - &leftLeaf, &rightLeaf, *state, isCross, cluster, ctx); + &leftLeaf, &rightLeaf, *state, isCross, ctx); if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } @@ -3624,13 +3622,13 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo bool mapJoinUseBlocks = state->Configuration->BlockMapJoin.Get().GetOrElse(state->Types->UseBlocks); if (leftTablesReady) { - auto status = UpdateInMemorySizeSetting(mapSettings, leftLeaf.Section, labels, op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables, mapJoinUseFlow); + auto status = UpdateInMemorySizeSetting(mapSettings, leftLeaf.Section, labels, op, ctx, true, leftItemType, leftJoinKeyList, state, leftTables, mapJoinUseFlow); if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } if (mapJoinUseBlocks) { - auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, leftLeaf.Section, op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables); + auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, leftLeaf.Section, op, ctx, true, leftItemType, leftJoinKeyList, state, leftTables); if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } @@ -3638,13 +3636,13 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo } if (rightTablesReady) { - auto status = UpdateInMemorySizeSetting(mapSettings, rightLeaf.Section, labels, op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables, mapJoinUseFlow); + auto status = UpdateInMemorySizeSetting(mapSettings, rightLeaf.Section, labels, op, ctx, false, rightItemType, rightJoinKeyList, state, rightTables, mapJoinUseFlow); if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } if (mapJoinUseBlocks) { - auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, rightLeaf.Section, op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables); + auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, rightLeaf.Section, op, ctx, false, rightItemType, rightJoinKeyList, state, rightTables); if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } @@ -3947,9 +3945,6 @@ void CollectPossibleStarJoins(const TYtEquiJoin& equiJoin, TYtJoinNodeOp& op, co rightJoinKeyList = BuildJoinKeyList(labels.Inputs[leftLeaf ? 1 : 0], *op.RightLabel); } - - auto cluster = TString{equiJoin.DataSink().Cluster().Value()}; - TMapJoinSettings mapSettings; TJoinSideStats leftStats; TJoinSideStats rightStats; @@ -3958,7 +3953,7 @@ void CollectPossibleStarJoins(const TYtEquiJoin& equiJoin, TYtJoinNodeOp& op, co bool isCross = false; auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::NoSize, mapSettings, leftStats, rightStats, leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys, - leftLeaf, rightLeaf, *state, isCross, cluster, ctx); + leftLeaf, rightLeaf, *state, isCross, ctx); switch (status.Level) { case TStatus::Error: @@ -4866,12 +4861,12 @@ EStarRewriteStatus RewriteYtEquiJoinStar(TYtEquiJoin equiJoin, TYtJoinNodeOp& op } // namespace -IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection, const TString& cluster, +IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection, const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx) { result = IYtGateway::TPathStatResult(); if (GetJoinCollectColumnarStatisticsMode(*state.Configuration) == EJoinCollectColumnarStatisticsMode::Sync) { - auto syncResult = EstimateDataSize(cluster, paths, columns, state, ctx); + auto syncResult = EstimateDataSize(paths, columns, state, ctx); if (!syncResult) { return IGraphTransformer::TStatus::Error; } @@ -4881,7 +4876,7 @@ IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResul } TSet<TString> requestedColumns; - auto status = TryEstimateDataSize(result, requestedColumns, cluster, paths, columns, state, ctx); + auto status = TryEstimateDataSize(result, requestedColumns, paths, columns, state, ctx); auto settings = inputSection.Settings().Ptr(); if (status == TStatus::Repeat) { bool hasStatColumns = NYql::HasSetting(inputSection.Settings().Ref(), EYtSettingType::StatColumns); @@ -4935,7 +4930,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe bool leftTablesReady, const TVector<TYtPathInfo::TPtr>& leftTables, const THashSet<TString>& leftJoinKeys, bool rightTablesReady, const TVector<TYtPathInfo::TPtr>& rightTables, const THashSet<TString>& rightJoinKeys, TYtJoinNodeLeaf* leftLeaf, TYtJoinNodeLeaf* rightLeaf, const TYtState& state, bool isCross, - TString cluster, TExprContext& ctx) + TExprContext& ctx) { mapSettings = {}; leftStats = {}; @@ -4943,7 +4938,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe if (leftLeaf) { auto premap = GetPremapLambda(*leftLeaf); - auto joinSideStatus = CollectJoinSideStats(leftTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, leftStats, leftLeaf->Section, state, cluster, + auto joinSideStatus = CollectJoinSideStats(leftTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, leftStats, leftLeaf->Section, state, leftTables, leftJoinKeys, isCross, premap, ctx); if (joinSideStatus.Level != TStatus::Ok) { return joinSideStatus; @@ -4959,7 +4954,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe if (rightLeaf) { auto premap = GetPremapLambda(*rightLeaf); - auto joinSideStatus = CollectJoinSideStats(rightTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, rightStats, rightLeaf->Section, state, cluster, + auto joinSideStatus = CollectJoinSideStats(rightTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, rightStats, rightLeaf->Section, state, rightTables, rightJoinKeys, isCross, premap, ctx); if (joinSideStatus.Level != TStatus::Ok) { return joinSideStatus; @@ -4983,7 +4978,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe TStatus CalculateJoinLeafSize(ui64& result, TMapJoinSettings& settings, TYtSection& inputSection, const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft, - const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster, + const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TVector<TYtPathInfo::TPtr>& tables) { result = isLeft ? settings.LeftSize : settings.RightSize; @@ -4992,7 +4987,7 @@ TStatus CalculateJoinLeafSize(ui64& result, TMapJoinSettings& settings, TYtSecti if (!needPayload && !op.JoinKind->IsAtom("Cross")) { if (joinKeyList.size() < itemType->GetSize()) { IYtGateway::TPathStatResult pathStatResult; - auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, cluster, tables, joinKeyList, *state, ctx); + auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, tables, joinKeyList, *state, ctx); if (status.Level != TStatus::Ok) { return status; } diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.h b/yt/yql/providers/yt/provider/yql_yt_join_impl.h index d8702fa43e..80f79f4d3b 100644 --- a/yt/yql/providers/yt/provider/yql_yt_join_impl.h +++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.h @@ -72,12 +72,12 @@ TYtJoinNodeOp::TPtr ImportYtEquiJoin(TYtEquiJoin equiJoin, TExprContext& ctx); IGraphTransformer::TStatus RewriteYtEquiJoinLeaves(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx); IGraphTransformer::TStatus RewriteYtEquiJoin(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx); TMaybeNode<TExprBase> ExportYtEquiJoin(TYtEquiJoin equiJoin, const TYtJoinNodeOp& op, TExprContext& ctx, const TYtState::TPtr& state); -TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, const TString& cluster, TExprContext& ctx, bool debug = false); +TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, TExprContext& ctx, bool debug = false); struct IBaseOptimizerNode; struct IProviderContext; -void BuildOptimizerJoinTree(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& settings, TYtJoinNodeOp::TPtr op, TExprContext& ctx); +void BuildOptimizerJoinTree(TYtState::TPtr state, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& settings, TYtJoinNodeOp::TPtr op, TExprContext& ctx); TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TExprContext& ctx, TPositionHandle pos); bool AreSimilarTrees(TYtJoinNode::TPtr node1, TYtJoinNode::TPtr node2); @@ -89,7 +89,7 @@ IGraphTransformer::TStatus CollectPathsAndLabelsReady( IGraphTransformer::TStatus CalculateJoinLeafSize(ui64& result, TMapJoinSettings& settings, TYtSection& inputSection, const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft, - const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster, + const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TVector<TYtPathInfo::TPtr>& tables); enum class ESizeStatCollectMode { @@ -115,9 +115,9 @@ IGraphTransformer::TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode s bool leftTablesReady, const TVector<TYtPathInfo::TPtr>& leftTables, const THashSet<TString>& leftJoinKeys, bool rightTablesReady, const TVector<TYtPathInfo::TPtr>& rightTables, const THashSet<TString>& rightJoinKeys, TYtJoinNodeLeaf* leftLeaf, TYtJoinNodeLeaf* rightLeaf, const TYtState& state, bool isCross, - TString cluster, TExprContext& ctx); + TExprContext& ctx); -IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection, const TString& cluster, +IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection, const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx); ui64 CalcInMemorySizeNoCrossJoin(const TJoinLabel& label, const TYtJoinNodeOp& op, const TMapJoinSettings& settings, bool isLeft, diff --git a/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp b/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp index a195a8f5d0..ca808b621b 100644 --- a/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp @@ -58,12 +58,10 @@ public: TJoinReorderer( TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, - const TString& cluster, TExprContext& ctx, bool debug = false) : Root(op) , State(state) - , Cluster(cluster) , Ctx(ctx) , Debug(debug) { @@ -78,7 +76,7 @@ public: std::shared_ptr<IBaseOptimizerNode> tree; TOptimizerLinkSettings linkSettings; std::shared_ptr<IProviderContext> providerCtx; - BuildOptimizerJoinTree(State, Cluster, tree, providerCtx, linkSettings, Root, Ctx); + BuildOptimizerJoinTree(State, tree, providerCtx, linkSettings, Root, Ctx); auto ytCtx = std::static_pointer_cast<TYtProviderContext>(providerCtx); std::function<void(const TString& str)> log; @@ -139,7 +137,6 @@ public: private: TYtJoinNodeOp::TPtr Root; const TYtState::TPtr& State; - TString Cluster; TExprContext& Ctx; bool Debug; }; @@ -177,9 +174,8 @@ class TOptimizerTreeBuilder { public: TOptimizerLinkSettings LinkSettings; - TOptimizerTreeBuilder(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TYtJoinNodeOp::TPtr inputTree, TExprContext& ctx) + TOptimizerTreeBuilder(TYtState::TPtr state, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TYtJoinNodeOp::TPtr inputTree, TExprContext& ctx) : State(state) - , Cluster(cluster) , Tree(tree) , OutProviderCtx(providerCtx) , InputTree(inputTree) @@ -258,7 +254,7 @@ private: } TRelSizeInfo leftSizeInfo; TRelSizeInfo rightSizeInfo; - PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, State, Cluster, Ctx, op); + PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, State, Ctx, op); auto left = ProcessNode(op->Left, leftSizeInfo); auto right = ProcessNode(op->Right, rightSizeInfo); @@ -374,7 +370,7 @@ private: TSet<TString> requestedColumns; IYtGateway::TPathStatResult result; - auto status = TryEstimateDataSize(result, requestedColumns, Cluster, paths, columns, *State, Ctx); + auto status = TryEstimateDataSize(result, requestedColumns, paths, columns, *State, Ctx); YQL_ENSURE(status != IGraphTransformer::TStatus::Error); if (status != IGraphTransformer::TStatus::Ok) { YQL_CLOG(WARN, ProviderYt) << "Unable to read path stats that must be already present in cache"; @@ -434,7 +430,6 @@ private: } TYtState::TPtr State; - const TString Cluster; std::shared_ptr<IBaseOptimizerNode>& Tree; std::shared_ptr<IProviderContext>& OutProviderCtx; THashMap<TString, THashSet<TString>> RelJoinColumns; @@ -517,9 +512,9 @@ bool AreSimilarTrees(TYtJoinNode::TPtr node1, TYtJoinNode::TPtr node2) { } } -void BuildOptimizerJoinTree(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& linkSettings, TYtJoinNodeOp::TPtr op, TExprContext& ctx) +void BuildOptimizerJoinTree(TYtState::TPtr state, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& linkSettings, TYtJoinNodeOp::TPtr op, TExprContext& ctx) { - TOptimizerTreeBuilder builder(state, cluster, tree, providerCtx, op, ctx); + TOptimizerTreeBuilder builder(state, tree, providerCtx, op, ctx); builder.Do(); linkSettings = builder.LinkSettings; } @@ -529,13 +524,13 @@ TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TExp return BuildYtJoinTree(node, scope, ctx, pos); } -TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, const TString& cluster, TExprContext& ctx, bool debug) +TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, TExprContext& ctx, bool debug) { if (state->Types->CostBasedOptimizer == ECostBasedOptimizerType::Disable || op->CostBasedOptPassed) { return op; } - auto result = TJoinReorderer(op, state, cluster, ctx, debug).Do(); + auto result = TJoinReorderer(op, state, ctx, debug).Do(); if (!debug && AreSimilarTrees(result, op)) { return op; } diff --git a/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp b/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp index 789938e02f..525c967853 100644 --- a/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp @@ -29,22 +29,21 @@ public: YQL_ENSURE(PathStatResults.empty()); } - TNodeMap<IYtGateway::TPathStatResult> PullPathStatResults() { - TNodeMap<IYtGateway::TPathStatResult> results; + TNodeMap<TVector<IYtGateway::TPathStatResult>> PullPathStatResults() { + TNodeMap<TVector<IYtGateway::TPathStatResult>> results; TGuard<TMutex> guard(Lock); results.swap(PathStatResults); return results; } - void MarkReady(TExprNode* node, const IYtGateway::TPathStatResult& result) { + void AddResult(TExprNode* node, const IYtGateway::TPathStatResult& result) { TGuard<TMutex> guard(Lock); - YQL_ENSURE(PathStatResults.count(node) == 0); - PathStatResults[node] = result; + PathStatResults[node].push_back(result); } private: mutable TMutex Lock; - TNodeMap<IYtGateway::TPathStatResult> PathStatResults; + TNodeMap<TVector<IYtGateway::TPathStatResult>> PathStatResults; }; class TYtLoadColumnarStatsTransformer : public TGraphTransformerBase { @@ -65,7 +64,7 @@ private: output = input; PathStatusState->EnsureNoInflightRequests(); - TVector<std::pair<IYtGateway::TPathStatOptions, TExprNode*>> pathStatArgs; + TVector<std::pair<TVector<IYtGateway::TPathStatOptions>, TExprNode*>> pathStatArgs; bool hasError = false; TNodeOnNodeOwnedMap sectionRewrites; VisitExpr(input, [this, &pathStatArgs, &hasError, §ionRewrites, &ctx](const TExprNode::TPtr& node) { @@ -75,10 +74,9 @@ private: if (NYql::HasSetting(section.Settings().Ref(), EYtSettingType::StatColumns)) { auto columnList = NYql::GetSettingAsColumnList(section.Settings().Ref(), EYtSettingType::StatColumns); - TMaybe<TString> cluster; - TVector<IYtGateway::TPathStatReq> pathStatReqs; + TMap<TString, TVector<IYtGateway::TPathStatReq>> pathStatReqsByCluster; size_t idx = 0; - ui64 totalChunkCount = 0; + THashMap<TString, ui64> totalChunkCountByCluster; for (auto path: section.Paths()) { bool hasStat = false; if (path.Table().Maybe<TYtTable>().Stat().Maybe<TYtStat>()) { @@ -108,30 +106,18 @@ private: } TYtPathInfo pathInfo(path); + const TString cluster = pathInfo.Table->Cluster; + YQL_ENSURE(cluster); YQL_ENSURE(pathInfo.Table->Stat); - totalChunkCount += pathInfo.Table->Stat->ChunkCount; + totalChunkCountByCluster[cluster] += pathInfo.Table->Stat->ChunkCount; - TString currCluster; - if (auto ytTable = path.Table().Maybe<TYtTable>()) { - currCluster = TString{ytTable.Cast().Cluster().Value()}; - } else { - currCluster = TString{GetOutputOp(path.Table().Cast<TYtOutput>()).DataSink().Cluster().Value()}; - } - YQL_ENSURE(currCluster); - - if (cluster) { - YQL_ENSURE(currCluster == *cluster); - } else { - cluster = currCluster; - } - - auto ytPath = BuildYtPathForStatRequest(*cluster, pathInfo, columnList, *State_, ctx); + auto ytPath = BuildYtPathForStatRequest(pathInfo, columnList, *State_, ctx); if (!ytPath) { hasError = true; return false; } - pathStatReqs.push_back( + pathStatReqsByCluster[cluster].push_back( IYtGateway::TPathStatReq() .Path(*ytPath) .IsTemp(pathInfo.Table->IsTemp) @@ -142,21 +128,28 @@ private: ++idx; } - bool requestExtendedStats = maxChunkCountExtendedStats && - (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats); - - if (pathStatReqs) { - auto pathStatOptions = IYtGateway::TPathStatOptions(State_->SessionId) - .Cluster(*cluster) + TVector<IYtGateway::TPathStatOptions> pathStatOptions; + for (auto& [cluster, pathStatReqs] : pathStatReqsByCluster) { + auto itCount = totalChunkCountByCluster.find(cluster); + YQL_ENSURE(itCount != totalChunkCountByCluster.end()); + const ui64 totalChunkCount = itCount->second; + bool requestExtendedStats = maxChunkCountExtendedStats && + (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats); + YQL_ENSURE(!pathStatReqs.empty()); + auto options = IYtGateway::TPathStatOptions(State_->SessionId) + .Cluster(cluster) .Paths(pathStatReqs) .Config(State_->Configuration->Snapshot()) .Extended(requestExtendedStats); - - auto tryResult = State_->Gateway->TryPathStat(IYtGateway::TPathStatOptions(pathStatOptions)); + auto tryResult = State_->Gateway->TryPathStat(IYtGateway::TPathStatOptions(options)); if (!tryResult.Success()) { - pathStatArgs.emplace_back(std::move(pathStatOptions), node.Get()); + pathStatOptions.push_back(std::move(options)); } } + + if (pathStatOptions) { + pathStatArgs.emplace_back(std::move(pathStatOptions), node.Get()); + } } } return !hasError; @@ -177,16 +170,20 @@ private: } TVector<NThreading::TFuture<void>> futures; - YQL_CLOG(INFO, ProviderYt) << "Starting " << pathStatArgs.size() << " requests for columnar stats"; + size_t reqCount = 0; + for (const auto& arg : pathStatArgs) { + reqCount += arg.first.size(); + } + YQL_CLOG(INFO, ProviderYt) << "Starting " << reqCount << " requests for columnar stats"; for (auto& arg : pathStatArgs) { - IYtGateway::TPathStatOptions& options = arg.first; + TVector<IYtGateway::TPathStatOptions>& options = arg.first; TExprNode* node = arg.second; - - auto future = State_->Gateway->PathStat(std::move(options)); - - futures.push_back(future.Apply([pathStatusState = PathStatusState, node](const NThreading::TFuture<IYtGateway::TPathStatResult>& result) { - pathStatusState->MarkReady(node, result.GetValueSync()); - })); + for (auto& opt : options) { + auto future = State_->Gateway->PathStat(std::move(opt)); + futures.push_back(future.Apply([pathStatusState = PathStatusState, node](const NThreading::TFuture<IYtGateway::TPathStatResult>& result) { + pathStatusState->AddResult(node, result.GetValueSync()); + })); + } } AsyncFuture = WaitExceptionOrAll(futures); @@ -201,26 +198,32 @@ private: TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override { output = input; - TNodeMap<IYtGateway::TPathStatResult> results = PathStatusState->PullPathStatResults(); + TNodeMap<TVector<IYtGateway::TPathStatResult>> results = PathStatusState->PullPathStatResults(); YQL_ENSURE(!results.empty()); + size_t applied = 0; + TStatus status = TStatus::Repeat; for (auto& item : results) { auto& node = item.first; - auto& result = item.second; - if (!result.Success()) { - TIssueScopeGuard issueScope(ctx.IssueManager, [&]() { - return MakeIntrusive<TIssue>( - ctx.GetPosition(node->Pos()), - TStringBuilder() << "Execution of node: " << node->Content() - ); - }); - result.ReportIssues(ctx.IssueManager); - return TStatus::Error; + auto& batch = item.second; + TIssueScopeGuard issueScope(ctx.IssueManager, [&]() { + return MakeIntrusive<TIssue>( + ctx.GetPosition(node->Pos()), + TStringBuilder() << "Execution of node: " << node->Content() + ); + }); + for (auto& result : batch) { + if (!result.Success()) { + result.ReportIssues(ctx.IssueManager); + status = status.Combine(TStatus::Error); + } + ++applied; } } - YQL_CLOG(INFO, ProviderYt) << "Applied " << results.size() << " results of columnar stats"; - return TStatus::Repeat; + YQL_CLOG(INFO, ProviderYt) << "Applied " << applied << " results of columnar stats " + << (status == TStatus::Error ? "with errors" : "successfully"); + return status; } TYtState::TPtr State_; diff --git a/yt/yql/providers/yt/provider/yql_yt_optimize.cpp b/yt/yql/providers/yt/provider/yql_yt_optimize.cpp index cd0b5e2ade..639307c92e 100644 --- a/yt/yql/providers/yt/provider/yql_yt_optimize.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_optimize.cpp @@ -533,6 +533,9 @@ IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr& if (info->Table->Meta->IsDynamic) { useItemsCount = false; } + if (!info->Table->Cluster) { + info->Table->Cluster = maybeRead.Cast().DataSource().Cluster().StringValue(); + } records.push_back(tableRecord); tableInfos.push_back(info); } @@ -553,7 +556,7 @@ IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr& } } if (!hasNotCalculated && !tableInfos.empty()) { - if (auto dataSizes = EstimateDataSize(TString{maybeRead.Cast().DataSource().Cluster().Value()}, tableInfos, Nothing(), *state, ctx)) { + if (auto dataSizes = EstimateDataSize(tableInfos, Nothing(), *state, ctx)) { YQL_ENSURE(dataSizes->size() == records.size()); for (size_t i: xrange(records.size())) { for (auto& factor: factors) { diff --git a/yt/yql/providers/ytflow/expr_nodes/ya.make b/yt/yql/providers/ytflow/expr_nodes/ya.make index ba1eccbb56..7331a46d81 100644 --- a/yt/yql/providers/ytflow/expr_nodes/ya.make +++ b/yt/yql/providers/ytflow/expr_nodes/ya.make @@ -13,40 +13,21 @@ SRCDIR( yql/essentials/core/expr_nodes_gen ) -IF(EXPORT_CMAKE) - RUN_PYTHON3( - ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/gen/__main__.py - yql_expr_nodes_gen.jnj - yql_ytflow_expr_nodes.json - yql_ytflow_expr_nodes.gen.h - yql_ytflow_expr_nodes.decl.inl.h - yql_ytflow_expr_nodes.defs.inl.h - IN yql_expr_nodes_gen.jnj - IN yql_ytflow_expr_nodes.json - OUT yql_ytflow_expr_nodes.gen.h - OUT yql_ytflow_expr_nodes.decl.inl.h - OUT yql_ytflow_expr_nodes.defs.inl.h - OUTPUT_INCLUDES - ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h - ${ARCADIA_ROOT}/util/generic/hash_set.h - ) -ELSE() - RUN_PROGRAM( - yql/essentials/core/expr_nodes_gen/gen - yql_expr_nodes_gen.jnj - yql_ytflow_expr_nodes.json - yql_ytflow_expr_nodes.gen.h - yql_ytflow_expr_nodes.decl.inl.h - yql_ytflow_expr_nodes.defs.inl.h - IN yql_expr_nodes_gen.jnj - IN yql_ytflow_expr_nodes.json - OUT yql_ytflow_expr_nodes.gen.h - OUT yql_ytflow_expr_nodes.decl.inl.h - OUT yql_ytflow_expr_nodes.defs.inl.h - OUTPUT_INCLUDES - ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h - ${ARCADIA_ROOT}/util/generic/hash_set.h - ) -ENDIF() +RUN_PY3_PROGRAM( + yql/essentials/core/expr_nodes_gen/gen + yql_expr_nodes_gen.jnj + yql_ytflow_expr_nodes.json + yql_ytflow_expr_nodes.gen.h + yql_ytflow_expr_nodes.decl.inl.h + yql_ytflow_expr_nodes.defs.inl.h + IN yql_expr_nodes_gen.jnj + IN yql_ytflow_expr_nodes.json + OUT yql_ytflow_expr_nodes.gen.h + OUT yql_ytflow_expr_nodes.decl.inl.h + OUT yql_ytflow_expr_nodes.defs.inl.h + OUTPUT_INCLUDES + ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h + ${ARCADIA_ROOT}/util/generic/hash_set.h +) END() diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_single_equals.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_single_equals.cfg new file mode 100644 index 0000000000..d0ce4581d7 --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_single_equals.cfg @@ -0,0 +1 @@ +in Input integer.txt diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_single_equals.sql b/yt/yql/tests/sql/suites/ql_filter/integer_single_equals.sql new file mode 100644 index 0000000000..de3063e888 --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_single_equals.sql @@ -0,0 +1,5 @@ +pragma yt.UseQLFilter; + +select a +from plato.Input +where a = 1; |