aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-03-14 00:51:45 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-03-14 00:51:45 +0000
commit3e3d50dea42f66b1ba457411b8864990f90bbe21 (patch)
tree7d75df352fc045a84d46764b96b496b5775bbf44 /yt
parent7778cd274683ce11e318b799ea12c7bc0b3a4bdd (diff)
parent422642b601155a296cb0a69eb9b1f7ba146ffa49 (diff)
downloadydb-3e3d50dea42f66b1ba457411b8864990f90bbe21.tar.gz
Merge branch 'rightlib' into merge-libs-250314-0050
Diffstat (limited to 'yt')
-rw-r--r--yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp3
-rw-r--r--yt/cpp/mapreduce/interface/operation.h4
-rw-r--r--yt/yql/providers/yt/expr_nodes/ya.make51
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp186
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_reader_ut.cpp98
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp118
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h8
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.cpp83
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h45
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp9
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h9
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_native.cpp59
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp5
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp31
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h4
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp8
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_helpers.cpp106
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_helpers.h6
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_join_impl.cpp47
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_join_impl.h10
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp21
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp117
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_optimize.cpp5
-rw-r--r--yt/yql/providers/ytflow/expr_nodes/ya.make51
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_single_equals.cfg1
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_single_equals.sql5
-rw-r--r--yt/yt/client/api/operation_client.h1
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp3
-rw-r--r--yt/yt/client/driver/scheduler_commands.cpp5
-rw-r--r--yt/yt/core/http/server.cpp19
-rw-r--r--yt/yt/core/http/unittests/http_ut.cpp15
-rw-r--r--yt/yt/core/misc/collection_helpers-inl.h9
-rw-r--r--yt/yt/core/misc/collection_helpers.h6
-rw-r--r--yt/yt/core/misc/range_helpers-inl.h59
-rw-r--r--yt/yt/core/misc/range_helpers.h31
-rw-r--r--yt/yt/core/misc/unittests/collection_helpers_ut.cpp43
-rw-r--r--yt/yt/core/misc/unittests/range_helpers_ut.cpp54
-rw-r--r--yt/yt/core/misc/unittests/ya.make2
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto2
41 files changed, 929 insertions, 412 deletions
diff --git a/yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp
index f7f6c4713b..081845be5e 100644
--- a/yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp
+++ b/yt/cpp/mapreduce/http_client/rpc_parameters_serialization.cpp
@@ -558,6 +558,9 @@ TNode SerializeParamsForListJobs(
if (options.WithMonitoringDescriptor_) {
result["with_monitoring_descriptor"] = *options.WithMonitoringDescriptor_;
}
+ if (options.WithInterruptionInfo_) {
+ result["with_interruption_info"] = *options.WithInterruptionInfo_;
+ }
if (options.OperationIncarnation_) {
result["operation_incarnation"] = *options.OperationIncarnation_;
}
diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h
index 923b11b6a0..869c8f9c0b 100644
--- a/yt/cpp/mapreduce/interface/operation.h
+++ b/yt/cpp/mapreduce/interface/operation.h
@@ -2890,6 +2890,10 @@ struct TListJobsOptions
FLUENT_FIELD_OPTION(bool, WithMonitoringDescriptor);
///
+ /// @brief Return only jobs with interruption info.
+ FLUENT_FIELD_OPTION(bool, WithInterruptionInfo);
+
+ ///
/// @brief Return only jobs with given operation incarnation.
FLUENT_FIELD_OPTION(TString, OperationIncarnation);
diff --git a/yt/yql/providers/yt/expr_nodes/ya.make b/yt/yql/providers/yt/expr_nodes/ya.make
index 5c46e4ff03..744cedec7b 100644
--- a/yt/yql/providers/yt/expr_nodes/ya.make
+++ b/yt/yql/providers/yt/expr_nodes/ya.make
@@ -13,40 +13,21 @@ SRCDIR(
yql/essentials/core/expr_nodes_gen
)
-IF (EXPORT_CMAKE)
- RUN_PYTHON3(
- ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/gen/__main__.py
- yql_expr_nodes_gen.jnj
- yql_yt_expr_nodes.json
- yql_yt_expr_nodes.gen.h
- yql_yt_expr_nodes.decl.inl.h
- yql_yt_expr_nodes.defs.inl.h
- IN yql_expr_nodes_gen.jnj
- IN yql_yt_expr_nodes.json
- OUT yql_yt_expr_nodes.gen.h
- OUT yql_yt_expr_nodes.decl.inl.h
- OUT yql_yt_expr_nodes.defs.inl.h
- OUTPUT_INCLUDES
- ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h
- ${ARCADIA_ROOT}/util/generic/hash_set.h
- )
-ELSE()
- RUN_PROGRAM(
- yql/essentials/core/expr_nodes_gen/gen
- yql_expr_nodes_gen.jnj
- yql_yt_expr_nodes.json
- yql_yt_expr_nodes.gen.h
- yql_yt_expr_nodes.decl.inl.h
- yql_yt_expr_nodes.defs.inl.h
- IN yql_expr_nodes_gen.jnj
- IN yql_yt_expr_nodes.json
- OUT yql_yt_expr_nodes.gen.h
- OUT yql_yt_expr_nodes.decl.inl.h
- OUT yql_yt_expr_nodes.defs.inl.h
- OUTPUT_INCLUDES
- ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h
- ${ARCADIA_ROOT}/util/generic/hash_set.h
- )
-ENDIF()
+RUN_PY3_PROGRAM(
+ yql/essentials/core/expr_nodes_gen/gen
+ yql_expr_nodes_gen.jnj
+ yql_yt_expr_nodes.json
+ yql_yt_expr_nodes.gen.h
+ yql_yt_expr_nodes.decl.inl.h
+ yql_yt_expr_nodes.defs.inl.h
+ IN yql_expr_nodes_gen.jnj
+ IN yql_yt_expr_nodes.json
+ OUT yql_yt_expr_nodes.gen.h
+ OUT yql_yt_expr_nodes.decl.inl.h
+ OUT yql_yt_expr_nodes.defs.inl.h
+ OUTPUT_INCLUDES
+ ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h
+ ${ARCADIA_ROOT}/util/generic/hash_set.h
+)
END()
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make
index cb2ee1f19b..78813e529d 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make
@@ -3,6 +3,7 @@ UNITTEST()
SRCS(
yql_yt_job_ut.cpp
yql_yt_output_stream_ut.cpp
+ yql_yt_raw_table_reader_ut.cpp
)
PEERDIR(
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
index 4b4399dcca..7be6348426 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
@@ -6,10 +6,18 @@
namespace NYql::NFmr {
-TString TableContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+TString TableContent_1 = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+TString TableContent_2 = "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+TString TableContent_3 = "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
Y_UNIT_TEST_SUITE(FmrJobTests) {
Y_UNIT_TEST(DownloadTable) {
@@ -17,14 +25,15 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+ TFmrJobSettings settings = {1};
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings);
TYtTableRef input = TYtTableRef("test_cluster", "test_path");
TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id");
TDownloadTaskParams params = TDownloadTaskParams(input, output);
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- ytUploadedTablesMock->AddTable(input, TableContent);
+ ytUploadedTablesMock->AddTable(input, TableContent_1);
auto res = job->Download(params);
@@ -35,57 +44,47 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
UNIT_ASSERT_EQUAL(statistics->OutputTables.at(output).Rows, 4);
auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync();
UNIT_ASSERT_C(resultTableContent, "Result table content is empty");
- UNIT_ASSERT_NO_DIFF(*resultTableContent, TableContent);
+ UNIT_ASSERT_NO_DIFF(*resultTableContent, TableContent_1);
}
Y_UNIT_TEST(UploadTable) {
- TString ytTableContent = TableContent;
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+ TFmrJobSettings settings = {1};
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings);
TYtTableRef output = TYtTableRef("test_cluster", "test_path");
- TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id"};
+ std::vector<TTableRange> ranges = {{"test_part_id"}};
+ TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id", .TableRanges = ranges};
auto params = TUploadTaskParams(input, output);
- tableDataServicePtr->Put(input.TableId, ytTableContent);
+ auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0);
+ tableDataServicePtr->Put(key, TableContent_1);
auto res = job->Upload(params);
auto err = std::get_if<TError>(&res);
UNIT_ASSERT_C(!err,err->ErrorMessage);
- UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), ytTableContent);
+ UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), TableContent_1);
}
Y_UNIT_TEST(MergeFmrTables) {
- TString TableContent_1 =
- "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)";
- TString TableContent_2 =
- "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
- TString TableContent_3 =
- "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+ TFmrJobSettings settings = {1};
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings);
- TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"};
- TFmrTableInputRef input_2 = TFmrTableInputRef{.TableId = "test_table_id_2"};
- TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"};
+ std::vector<TTableRange> ranges = {{"test_part_id"}};
+
+ TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges};
+ TFmrTableInputRef input_2 = TFmrTableInputRef{.TableId = "test_table_id_2", .TableRanges = ranges};
+ TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges};
TTaskTableRef input_table_ref_1 = {input_1};
TTaskTableRef input_table_ref_2 = {input_2};
TTaskTableRef input_table_ref_3 = {input_3};
@@ -94,9 +93,12 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
auto params = TMergeTaskParams(inputs, output);
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- tableDataServicePtr->Put(input_1.TableId, TableContent_1);
- tableDataServicePtr->Put(input_2.TableId, TableContent_2);
- tableDataServicePtr->Put(input_3.TableId, TableContent_3);
+ auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0);
+ auto key_2 = GetTableDataServiceKey(input_2.TableId, "test_part_id", 0);
+ auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0);
+ tableDataServicePtr->Put(key_1, TableContent_1);
+ tableDataServicePtr->Put(key_2, TableContent_2);
+ tableDataServicePtr->Put(key_3, TableContent_3);
auto res = job->Merge(params);
@@ -109,31 +111,19 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
}
Y_UNIT_TEST(MergeMixedTables) {
- TString TableContent_1 =
- "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)";
- TString TableContent_2 =
- "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
- TString TableContent_3 =
- "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+ TFmrJobSettings settings = {1};
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings);
+
+ std::vector<TTableRange> ranges = {{"test_part_id"}};
- TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"};
+ TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges};
TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster");
- TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"};
+ TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges};
TTaskTableRef input_table_ref_1 = {input_1};
TTaskTableRef input_table_ref_2 = {input_2};
TTaskTableRef input_table_ref_3 = {input_3};
@@ -142,9 +132,11 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
auto params = TMergeTaskParams(inputs, output);
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- tableDataServicePtr->Put(input_1.TableId, TableContent_1);
+ auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0);
+ auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0);
+ tableDataServicePtr->Put(key_1, TableContent_1);
ytUploadedTablesMock->AddTable(input_2, TableContent_2);
- tableDataServicePtr->Put(input_3.TableId, TableContent_3);
+ tableDataServicePtr->Put(key_3, TableContent_3);
auto res = job->Merge(params);
@@ -159,11 +151,6 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
Y_UNIT_TEST_SUITE(TaskRunTests) {
Y_UNIT_TEST(RunDownloadTask) {
- TString ytTableContent =
- "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}";
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
@@ -174,7 +161,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id");
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- ytUploadedTablesMock->AddTable(input, ytTableContent);
+ ytUploadedTablesMock->AddTable(input, TableContent_1);
TDownloadTaskParams params = TDownloadTaskParams(input, output);
TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id");
@@ -183,55 +170,50 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed);
auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync();
UNIT_ASSERT_C(resultTableContent, "Result table content is empty");
- UNIT_ASSERT_NO_DIFF(*resultTableContent, ytTableContent);
+ UNIT_ASSERT_NO_DIFF(*resultTableContent, TableContent_1);
}
Y_UNIT_TEST(RunUploadTask) {
- TString ytTableContent =
- "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}";
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id"};
+ std::vector<TTableRange> ranges = {{"test_part_id"}};
+ TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id", .TableRanges = ranges};
TYtTableRef output = TYtTableRef("test_cluster", "test_path");
TUploadTaskParams params = TUploadTaskParams(input, output);
TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
- tableDataServicePtr->Put(input.TableId, ytTableContent);
+ auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0);
+
+ tableDataServicePtr->Put(key, TableContent_1);
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed);
- UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), ytTableContent);
+ UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), TableContent_1);
}
Y_UNIT_TEST(RunUploadTaskWithNoTable) {
- TString ytTableContent =
- "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}";
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id"};
+ std::vector<TTableRange> ranges = {{"test_part_id"}};
+ TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id", .TableRanges = ranges};
TYtTableRef output = TYtTableRef("test_cluster", "test_path");
TUploadTaskParams params = TUploadTaskParams(input, output);
TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
// No table in tableDataServicePtr
- // tableDataServicePtr->Put(input.TableId, ytTableContent);
+ // auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0);
+ // tableDataServicePtr->Put(key, ytTableContent);
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
@@ -240,30 +222,16 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
}
Y_UNIT_TEST(RunMergeTask) {
- TString TableContent_1 =
- "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)";
- TString TableContent_2 =
- "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
- TString TableContent_3 =
- "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"};
+ std::vector<TTableRange> ranges = {{"test_part_id"}};
+ TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges};
TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster");
- TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"};
+ TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges};
TTaskTableRef input_table_ref_1 = {input_1};
TTaskTableRef input_table_ref_2 = {input_2};
TTaskTableRef input_table_ref_3 = {input_3};
@@ -274,9 +242,11 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
- tableDataServicePtr->Put(input_1.TableId, TableContent_1);
+ auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0);
+ auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0);
+ tableDataServicePtr->Put(key_1, TableContent_1);
ytUploadedTablesMock->AddTable(input_2, TableContent_2);
- tableDataServicePtr->Put(input_3.TableId, TableContent_3);
+ tableDataServicePtr->Put(key_3, TableContent_3);
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
@@ -287,30 +257,16 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
}
Y_UNIT_TEST(RunMergeTaskWithNoTable) {
- TString TableContent_1 =
- "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)";
- TString TableContent_2 =
- "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
- TString TableContent_3 =
- "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"};
+ std::vector<TTableRange> ranges = {{"test_part_id"}};
+ TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges};
TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster");
- TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"};
+ TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges};
TTaskTableRef input_table_ref_1 = {input_1};
TTaskTableRef input_table_ref_2 = {input_2};
TTaskTableRef input_table_ref_3 = {input_3};
@@ -321,10 +277,12 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
- tableDataServicePtr->Put(input_1.TableId, TableContent_1);
+ auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0);
+ auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0);
+ tableDataServicePtr->Put(key_1, TableContent_1);
// No table in Yt
// ytUploadedTablesMock->AddTable(input_2, TableContent_2);
- tableDataServicePtr->Put(input_3.TableId, TableContent_3);
+ tableDataServicePtr->Put(key_3, TableContent_3);
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_reader_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_reader_ut.cpp
new file mode 100644
index 0000000000..4d45e054b6
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_reader_ut.cpp
@@ -0,0 +1,98 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_output_stream.h>
+#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h>
+#include <yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h>
+
+namespace NYql::NFmr {
+
+TString originalTableContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+
+Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) {
+ Y_UNIT_TEST(ReadOneChunkSmallPart) {
+ size_t chunkSize = 1024;
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+
+ TFmrOutputStreamSettings settings{chunkSize};
+ TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings);
+
+ outputStream.Write(originalTableContent.data(), originalTableContent.size());
+ outputStream.Flush();
+
+ TFmrRawTableReaderSettings readerSettings{1};
+ std::vector<TTableRange> tableRanges = {{"partId", 0, 1}};
+ TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings);
+
+ char buffer[10];
+ reader.Read(buffer, 10);
+ TString readTableContentPart = {buffer, 10};
+ auto originalTableContentPart = originalTableContent.substr(0, 10);
+ UNIT_ASSERT_NO_DIFF(readTableContentPart, originalTableContentPart);
+ }
+
+ Y_UNIT_TEST(ReadAllOneChunk) {
+ size_t chunkSize = 1024;
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+
+ TFmrOutputStreamSettings settings{chunkSize};
+ TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings);
+
+ outputStream.Write(originalTableContent.data(), originalTableContent.size());
+ outputStream.Flush();
+
+ TFmrRawTableReaderSettings readerSettings{1};
+ std::vector<TTableRange> tableRanges = {{"partId", 0, 1}};
+ TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings);
+
+ auto readTableContent = reader.ReadAll();
+ UNIT_ASSERT_NO_DIFF(readTableContent, originalTableContent);
+ }
+
+ Y_UNIT_TEST(ReadAllMultipleChunks) {
+ size_t chunkSize = 32;
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+
+ TFmrOutputStreamSettings settings{chunkSize};
+ TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings);
+
+ outputStream.Write(originalTableContent.data(), originalTableContent.size());
+ outputStream.Flush();
+
+ TFmrRawTableReaderSettings readerSettings{1};
+
+ auto maxChunk = originalTableContent.size() / chunkSize + 1;
+ std::vector<TTableRange> tableRanges = {{"partId", 0, maxChunk}};
+ TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings);
+
+ auto readTableContent = reader.ReadAll();
+ UNIT_ASSERT_NO_DIFF(readTableContent, originalTableContent);
+ }
+
+ Y_UNIT_TEST(ReadAllMultipleChunksBigReadAhead) {
+ size_t chunkSize = 32;
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+
+ TFmrOutputStreamSettings settings{chunkSize};
+ TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings);
+
+ outputStream.Write(originalTableContent.data(), originalTableContent.size());
+ outputStream.Flush();
+
+ TFmrRawTableReaderSettings readerSettings{5};
+
+ auto maxChunk = originalTableContent.size() / chunkSize + 1;
+ std::vector<TTableRange> tableRanges = {{"partId", 0, maxChunk}};
+ TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings);
+
+ auto readTableContent = reader.ReadAll();
+ UNIT_ASSERT_NO_DIFF(readTableContent, originalTableContent);
+ }
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job/impl/ya.make b/yt/yql/providers/yt/fmr/job/impl/ya.make
index 89865c44a4..969c305ed1 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ya.make
+++ b/yt/yql/providers/yt/fmr/job/impl/ya.make
@@ -3,6 +3,7 @@ LIBRARY()
SRCS(
yql_yt_job_impl.cpp
yql_yt_output_stream.cpp
+ yql_yt_raw_table_reader.cpp
)
PEERDIR(
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
index 4e18a287a1..40d0a26760 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
@@ -4,6 +4,7 @@
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_output_stream.h>
+#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h>
#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
#include <yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h>
#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h>
@@ -15,8 +16,8 @@ namespace NYql::NFmr {
class TFmrJob: public IFmrJob {
public:
- TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag)
- : TableDataService_(tableDataService), YtService_(ytService), CancelFlag_(cancelFlag)
+ TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings)
+ : TableDataService_(tableDataService), YtService_(ytService), CancelFlag_(cancelFlag), Settings_(settings)
{
}
@@ -57,60 +58,68 @@ public:
}
virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const TClusterConnection& clusterConnection) override {
- const auto ytTable = params.Output;
- const auto cluster = params.Output.Cluster;
- const auto path = params.Output.Path;
- const auto tableId = params.Input.TableId;
-
- YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path;
+ try {
+ const auto ytTable = params.Output;
+ const auto cluster = params.Output.Cluster;
+ const auto path = params.Output.Path;
+ const auto tableId = params.Input.TableId;
+ const auto tableRanges = params.Input.TableRanges;
- TMaybe<TString> getResult = TableDataService_->Get(tableId).GetValueSync();
+ YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path;
- if (!getResult) {
- YQL_CLOG(ERROR, FastMapReduce) << "Table " << tableId << " not found";
- return TError("Table not found");
- }
+ auto res = GetFmrTableStream(params.Input);
+ auto err = std::get_if<TError>(&res);
+ if (err) {
+ return *err;
+ }
+ auto inputStream = std::get_if<THolder<IInputStream>>(&res);
- TString tableContent = getResult.GetRef();
- TStringInput inputStream(tableContent);
+ // How to raise if not found
- YtService_->Upload(ytTable, inputStream, clusterConnection);
+ YtService_->Upload(ytTable, *inputStream->get(), clusterConnection);
- return TStatistics();
+ return TStatistics();
+ } catch (...) {
+ return TError(CurrentExceptionMessage());
+ }
}
virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const TClusterConnection& clusterConnection) override {
// расширить таск парамс. добавить туда мету
- const auto inputs = params.Input;
- const auto output = params.Output;
+ try {
+ const auto inputs = params.Input;
+ const auto output = params.Output;
- YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs";
+ YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs";
- TFmrOutputStream outputStream(output.TableId, output.PartId, TableDataService_);
+ TFmrOutputStream outputStream(output.TableId, output.PartId, TableDataService_);
- ui32 totalRowsCount = 0;
+ ui32 totalRowsCount = 0;
- for (const auto& inputTableRef : inputs) {
- if (CancelFlag_->load()) {
- return TError("Canceled");
- }
- ui64 rowsCount = 0; // TMP Todo get rows count from input stats
- auto res = GetTableInputStream(inputTableRef, rowsCount, clusterConnection);
- totalRowsCount += rowsCount;
+ for (const auto& inputTableRef : inputs) {
+ if (CancelFlag_->load()) {
+ return TError("Canceled");
+ }
+ ui64 rowsCount = 0; // TMP Todo get rows count from input stats
+ auto res = GetTableInputStream(inputTableRef, rowsCount, clusterConnection);
+ totalRowsCount += rowsCount;
- auto err = std::get_if<TError>(&res);
- if (err) {
- return *err;
+ auto err = std::get_if<TError>(&res);
+ if (err) {
+ return *err;
+ }
+ auto inputStream = std::get_if<THolder<IInputStream>>(&res);
+ TransferData(inputStream->get(), &outputStream);
}
- auto inputStream = std::get_if<THolder<IInputStream>>(&res);
- TransferData(inputStream->get(), &outputStream);
- }
- outputStream.Flush();
+ outputStream.Flush();
- TTableStats stats = outputStream.GetStats();
- stats.Rows = totalRowsCount;
+ TTableStats stats = outputStream.GetStats();
+ stats.Rows = totalRowsCount;
- return TStatistics({{output, stats}});
+ return TStatistics({{output, stats}});
+ } catch (...) {
+ return TError(CurrentExceptionMessage());
+ }
}
private:
@@ -141,33 +150,40 @@ private:
}
std::variant<THolder<IInputStream>, TError> GetFmrTableStream(const TFmrTableInputRef& fmrTable) {
- auto res = TableDataService_->Get(fmrTable.TableId).GetValueSync();
- if (!res) {
- return TError("Table not found");
- }
- auto tableContent = *res;
- TStringStream stream;
- stream << tableContent;
- return MakeHolder<TStringStream>(stream);
+
+ auto settings = TFmrRawTableReaderSettings(Settings_.ReadAheadChunks);
+ return MakeHolder<TFmrRawTableReader>(TFmrRawTableReader(
+ fmrTable.TableId,
+ fmrTable.TableRanges,
+ TableDataService_,
+ settings
+ ));
}
private:
ITableDataService::TPtr TableDataService_;
IYtService::TPtr YtService_;
std::shared_ptr<std::atomic<bool>> CancelFlag_;
+ const TFmrJobSettings Settings_;
};
-IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag) {
- return MakeIntrusive<TFmrJob>(tableDataService, ytService, cancelFlag);
+IFmrJob::TPtr MakeFmrJob(
+ ITableDataService::TPtr tableDataService,
+ IYtService::TPtr ytService,
+ std::shared_ptr<std::atomic<bool>> cancelFlag,
+ const TFmrJobSettings& settings
+) {
+ return MakeIntrusive<TFmrJob>(tableDataService, ytService, cancelFlag, settings);
}
TJobResult RunJob(
TTask::TPtr task,
ITableDataService::TPtr tableDataService,
IYtService::TPtr ytService,
- std::shared_ptr<std::atomic<bool>> cancelFlag
+ std::shared_ptr<std::atomic<bool>> cancelFlag,
+ const TFmrJobSettings& settings
) {
- IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag, settings);
auto processTask = [job, task] (auto&& taskParams) {
using T = std::decay_t<decltype(taskParams)>;
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
index 1da096ee23..284b68040d 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
@@ -6,8 +6,12 @@
namespace NYql::NFmr {
-IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag);
+struct TFmrJobSettings {
+ ui64 ReadAheadChunks = 1;
+};
-TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag);
+IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings);
+
+TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings = {});
} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.cpp
new file mode 100644
index 0000000000..c458be66b6
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.cpp
@@ -0,0 +1,83 @@
+#include "yql_yt_raw_table_reader.h"
+#include <yql/essentials/utils/log/log.h>
+#include <yt/yql/providers/yt/fmr/utils/table_data_service_key.h>
+
+namespace NYql::NFmr {
+
+TFmrRawTableReader::TFmrRawTableReader(
+ const TString& tableId,
+ const std::vector<TTableRange>& tableRanges,
+ ITableDataService::TPtr tableDataService,
+ const TFmrRawTableReaderSettings& settings
+)
+ : TableId_(tableId)
+ , TableRanges_(tableRanges)
+ , TableDataService_(tableDataService)
+ , Settings_(settings)
+{
+ ReadAhead();
+}
+
+size_t TFmrRawTableReader::DoRead(void* buf, size_t len) {
+ ui64 totalRead = 0;
+ char* output = static_cast<char*>(buf);
+
+ while (len > 0) {
+ ui64 available = DataBuffer_.size() - CurrentPosition_;
+ if (available > 0) {
+ ui64 toCopy = std::min(available, len);
+
+ auto start = DataBuffer_.Begin() + CurrentPosition_;
+ auto end = start + toCopy;
+ std::copy(start, end, output);
+
+ CurrentPosition_ += toCopy;
+ output += toCopy;
+ len -= toCopy;
+ totalRead += toCopy;
+ } else if (!PendingChunks_.empty()) {
+ auto chunk = PendingChunks_.front();
+ TMaybe<TString> data;
+ try {
+ data = chunk.Data.GetValueSync();
+ } catch (...) {
+ ythrow yexception() << "Error reading chunk:" << chunk.Meta << "Error: " << CurrentExceptionMessage();
+ }
+
+ if (data) {
+ DataBuffer_.Assign(data->data(), data->size());
+ } else {
+ ythrow yexception() << "No data for chunk:" << chunk.Meta;
+ }
+
+ PendingChunks_.pop();
+ CurrentPosition_ = 0;
+ available = DataBuffer_.size();
+
+ ReadAhead();
+ } else {
+ break;
+ }
+ }
+ return totalRead;
+}
+
+void TFmrRawTableReader::ReadAhead() {
+ while (PendingChunks_.size() < Settings_.ReadAheadChunks) {
+ if (CurrentRange_ < TableRanges_.size()) {
+ auto currentPartId = TableRanges_[CurrentRange_].PartId;
+ if (CurrentChunk_ < TableRanges_[CurrentRange_].MaxChunk) {
+ auto key = GetTableDataServiceKey(TableId_, currentPartId, CurrentChunk_);
+ PendingChunks_.push({TableDataService_->Get(key), {TableId_, currentPartId, CurrentChunk_}});
+ CurrentChunk_++;
+ } else {
+ CurrentRange_++;
+ }
+ }
+ else {
+ break;
+ }
+ }
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h
new file mode 100644
index 0000000000..ccebc661b4
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h
@@ -0,0 +1,45 @@
+#pragma once
+
+#include <queue>
+#include <util/generic/buffer.h>
+#include <util/stream/input.h>
+#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
+#include <yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h>
+
+namespace NYql::NFmr {
+
+struct TFmrRawTableReaderSettings {
+ ui64 ReadAheadChunks = 1;
+};
+
+struct TPendingFmrChunk {
+ NThreading::TFuture<TMaybe<TString>> Data;
+ TFmrChunkMeta Meta;
+};
+
+class TFmrRawTableReader: public IInputStream {
+ public:
+ TFmrRawTableReader(
+ const TString& tableId,
+ const std::vector<TTableRange>& tableRanges,
+ ITableDataService::TPtr tableDataService,
+ const TFmrRawTableReaderSettings& settings
+ );
+ protected:
+ size_t DoRead(void* buf, size_t len) override;
+ private:
+ void ReadAhead();
+ private:
+ const TString TableId_;
+ const std::vector<TTableRange> TableRanges_;
+ ITableDataService::TPtr TableDataService_;
+ const TFmrRawTableReaderSettings Settings_;
+
+ ui64 CurrentRange_ = 0;
+ ui64 CurrentChunk_ = 0;
+ ui64 CurrentPosition_ = 0;
+ TBuffer DataBuffer_;
+ std::queue<TPendingFmrChunk> PendingChunks_;
+};
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
index c3a182b486..319cf20801 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
@@ -10,6 +10,10 @@ TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, co
return MakeIntrusive<TTaskState>(taskStatus, taskId, taskErrorMessage, stats);
}
+TString TFmrChunkMeta::ToString() const {
+ return TStringBuilder() << TableId << ":" << PartId << ":" << std::to_string(Chunk);
+}
+
} // namespace NYql::NFmr
template<>
@@ -22,3 +26,8 @@ void Out<NYql::NFmr::TFmrError>(IOutputStream& out, const NYql::NFmr::TFmrError&
}
out << error.ErrorMessage;
}
+
+template<>
+void Out<NYql::NFmr::TFmrChunkMeta>(IOutputStream& out, const NYql::NFmr::TFmrChunkMeta& meta) {
+ out << meta.ToString();
+}
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
index a4b1ebf37c..4c5bd1252f 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
@@ -3,6 +3,7 @@
#include <util/digest/numeric.h>
#include <util/generic/maybe.h>
#include <util/generic/string.h>
+#include <util/string/builder.h>
#include <vector>
namespace NYql::NFmr {
@@ -66,6 +67,14 @@ struct TTableRange {
ui64 MaxChunk = 1; // Пока такой дефолт
};
+struct TFmrChunkMeta {
+ TString TableId;
+ TString PartId;
+ ui64 Chunk = 0; // сделать out метод
+
+ TString ToString() const;
+};
+
struct TFmrTableInputRef {
TString TableId;
std::vector<TTableRange> TableRanges;
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
index ec1edd10d9..330cfbbbad 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -262,7 +262,7 @@ void GetIntegerConstraints(const TExprNode::TPtr& column, bool& isSigned, ui64&
}
}
-void QuoteColumnForQL(const TStringBuf columnName, TStringBuilder& result) {
+void QuoteColumnForQL(const TStringBuf& columnName, TStringBuilder& result) {
result << '`';
if (!columnName.Contains('`')) {
result << columnName;
@@ -278,6 +278,14 @@ void QuoteColumnForQL(const TStringBuf columnName, TStringBuilder& result) {
result << '`';
}
+void ConvertComparisonForQL(const TStringBuf& opName, TStringBuilder& result) {
+ if (opName == "==") {
+ result << '=';
+ } else {
+ result << opName;
+ }
+}
+
void GenerateInputQueryIntegerComparison(const TStringBuf& opName, const TExprNode::TPtr& intColumn, const TExprNode::TPtr& intValue, TStringBuilder& result) {
bool columnsIsSigned;
ui64 minValueAbs;
@@ -310,7 +318,9 @@ void GenerateInputQueryIntegerComparison(const TStringBuf& opName, const TExprNo
const auto columnName = intColumn->ChildPtr(1)->Content();
const auto valueStr = maybeInt.Cast().Literal().Value();
QuoteColumnForQL(columnName, result);
- result << " " << opName << " " << valueStr;
+ result << " ";
+ ConvertComparisonForQL(opName, result);
+ result << " " << valueStr;
}
}
@@ -910,6 +920,8 @@ public:
execCtx->SetOutput(outputOp.Cast().Output());
}
+ ReportBlockStatus(opBase, execCtx);
+
TFuture<void> future;
if (auto op = opBase.Maybe<TYtSort>()) {
future = DoSort(op.Cast(), execCtx);
@@ -5734,6 +5746,49 @@ private:
}
}
+ static void ReportBlockStatus(const TYtOpBase& op, const TExecContext<TRunOptions>::TPtr& execCtx) {
+ if (execCtx->Options_.PublicId().Empty()) {
+ return;
+ }
+
+ auto opPublicId = *execCtx->Options_.PublicId();
+
+ TOperationProgress::EOpBlockStatus status;
+ if (auto map = op.Maybe<TYtMap>()) {
+ status = DetermineProgramBlockStatus(map.Cast().Mapper().Body().Ref());
+ } else if (auto map = op.Maybe<TYtReduce>()) {
+ status = DetermineProgramBlockStatus(map.Cast().Reducer().Body().Ref());
+ } else if (auto map = op.Maybe<TYtMapReduce>()) {
+ status = DetermineProgramBlockStatus(map.Cast().Reducer().Body().Ref());
+ if (auto mapLambda = map.Cast().Mapper().Maybe<TCoLambda>()) {
+ status = TOperationProgress::CombineBlockStatuses(status, DetermineProgramBlockStatus(mapLambda.Cast().Body().Ref()));
+ }
+ } else if (auto fill = op.Maybe<TYtFill>()) {
+ status = DetermineProgramBlockStatus(fill.Cast().Content().Body().Ref());
+ } else if (op.Maybe<TYtSort>()) {
+ return;
+ } else if (op.Maybe<TYtCopy>()) {
+ return;
+ } else if (op.Maybe<TYtMerge>()) {
+ return;
+ } else if (op.Maybe<TYtTouch>()) {
+ return;
+ } else if (op.Maybe<TYtDropTable>()) {
+ return;
+ } else if (op.Maybe<TYtStatOut>()) {
+ return;
+ } else if (op.Maybe<TYtDqProcessWrite>()) {
+ return;
+ } else {
+ YQL_ENSURE(false, "unknown operation: " << op.Ref().Content());
+ }
+
+ YQL_CLOG(INFO, ProviderYt) << "Reporting " << status << " block status for operation " << op.Ref().Content() << " with public id #" << opPublicId;
+ auto p = TOperationProgress(TString(YtProviderName), opPublicId, TOperationProgress::EState::InProgress);
+ p.BlockStatus = status;
+ execCtx->Session_->ProgressWriter_(p);
+ }
+
private:
const TYtNativeServices Services_;
const TConfigClusters::TPtr Clusters_;
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
index da0b5919ac..076237b9d7 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
@@ -347,7 +347,6 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EarlyMergeJoin(TExprBas
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBase node, TExprContext& ctx) const {
auto equiJoin = node.Cast<TYtEquiJoin>();
- auto cluster = equiJoin.DataSink().Cluster().StringValue();
const bool tryReorder = State_->Types->CostBasedOptimizer != ECostBasedOptimizerType::Disable
&& equiJoin.Input().Size() > 2
@@ -369,12 +368,12 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBa
if (tryReorder) {
YQL_CLOG(INFO, ProviderYt) << "Collecting cbo stats for equiJoin";
- auto collectStatus = CollectCboStats(cluster, *tree, State_, ctx);
+ auto collectStatus = CollectCboStats(*tree, State_, ctx);
if (collectStatus == TStatus::Repeat) {
return ExportYtEquiJoin(equiJoin, *tree, ctx, State_);
}
- const auto optimizedTree = OrderJoins(tree, State_, cluster, ctx);
+ const auto optimizedTree = OrderJoins(tree, State_, ctx);
if (optimizedTree != tree) {
return ExportYtEquiJoin(equiJoin, *optimizedTree, ctx, State_);
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp
index 6cf823a7e1..70bb6fed87 100644
--- a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp
@@ -21,7 +21,6 @@ void AddJoinColumns(THashMap<TString, THashSet<TString>>& relJoinColumns, const
IGraphTransformer::TStatus ExtractInMemorySize(
const TYtState::TPtr& state,
- TString cluster,
TExprContext& ctx,
TMaybe<ui64>& leftMemorySize,
TMaybe<ui64>& rightMemorySize,
@@ -46,7 +45,7 @@ IGraphTransformer::TStatus ExtractInMemorySize(
bool isCross = false;
auto status = CollectStatsAndMapJoinSettings(mode, mapSettings, leftStats, rightStats,
leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys,
- leftLeaf, rightLeaf, *state, isCross, cluster, ctx);
+ leftLeaf, rightLeaf, *state, isCross, ctx);
if (status != IGraphTransformer::TStatus::Ok) {
YQL_CLOG(WARN, ProviderYt) << "Unable to collect paths and labels: " << status;
return status;
@@ -57,7 +56,7 @@ IGraphTransformer::TStatus ExtractInMemorySize(
TVector<TString> leftJoinKeyList(leftJoinKeys.begin(), leftJoinKeys.end());
const ui64 rows = mapSettings.LeftRows;
ui64 size = 0;
- auto status = CalculateJoinLeafSize(size, mapSettings, leftLeaf->Section, *op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables);
+ auto status = CalculateJoinLeafSize(size, mapSettings, leftLeaf->Section, *op, ctx, true, leftItemType, leftJoinKeyList, state, leftTables);
if (status != IGraphTransformer::TStatus::Ok) {
YQL_CLOG(WARN, ProviderYt) << "Unable to calculate left join leaf size: " << status;
return status;
@@ -77,7 +76,7 @@ IGraphTransformer::TStatus ExtractInMemorySize(
const ui64 rows = mapSettings.RightRows;
ui64 size = 0;
- auto status = CalculateJoinLeafSize(size, mapSettings, rightLeaf->Section, *op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables);
+ auto status = CalculateJoinLeafSize(size, mapSettings, rightLeaf->Section, *op, ctx, false, rightItemType, rightJoinKeyList, state, rightTables);
if (status != IGraphTransformer::TStatus::Ok) {
YQL_CLOG(WARN, ProviderYt) << "Unable to calculate right join leaf size: " << status;
return status;
@@ -94,7 +93,6 @@ IGraphTransformer::TStatus ExtractInMemorySize(
IGraphTransformer::TStatus CollectCboStatsLeaf(
const THashMap<TString, THashSet<TString>>& relJoinColumns,
- const TString& cluster,
TYtJoinNodeLeaf& leaf,
const TYtState::TPtr& state,
TExprContext& ctx)
@@ -115,36 +113,36 @@ IGraphTransformer::TStatus CollectCboStatsLeaf(
}
IYtGateway::TPathStatResult result;
- return TryEstimateDataSizeChecked(result, leaf.Section, cluster, tables, requestedColumnList, *state, ctx);
+ return TryEstimateDataSizeChecked(result, leaf.Section, tables, requestedColumnList, *state, ctx);
}
-IGraphTransformer::TStatus CollectCboStatsNode(THashMap<TString, THashSet<TString>>& relJoinColumns, const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
+IGraphTransformer::TStatus CollectCboStatsNode(THashMap<TString, THashSet<TString>>& relJoinColumns, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
TYtJoinNodeLeaf* leftLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Left.Get());
TYtJoinNodeLeaf* rightLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Right.Get());
AddJoinColumns(relJoinColumns, op);
TRelSizeInfo leftSizeInfo;
TRelSizeInfo rightSizeInfo;
- auto result = PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, state, cluster, ctx, &op);
+ auto result = PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, state, ctx, &op);
if (result != IGraphTransformer::TStatus::Ok) {
return result;
}
if (leftLeaf) {
- result = CollectCboStatsLeaf(relJoinColumns, cluster, *leftLeaf, state, ctx);
+ result = CollectCboStatsLeaf(relJoinColumns, *leftLeaf, state, ctx);
} else {
auto& leftOp = *dynamic_cast<TYtJoinNodeOp*>(op.Left.Get());
- result = CollectCboStatsNode(relJoinColumns, cluster, leftOp, state, ctx);
+ result = CollectCboStatsNode(relJoinColumns, leftOp, state, ctx);
}
if (result != IGraphTransformer::TStatus::Ok) {
return result;
}
if (rightLeaf) {
- result = CollectCboStatsLeaf(relJoinColumns, cluster, *rightLeaf, state, ctx);
+ result = CollectCboStatsLeaf(relJoinColumns, *rightLeaf, state, ctx);
} else {
auto& rightOp = *dynamic_cast<TYtJoinNodeOp*>(op.Right.Get());
- result = CollectCboStatsNode(relJoinColumns, cluster, rightOp, state, ctx);
+ result = CollectCboStatsNode(relJoinColumns, rightOp, state, ctx);
}
return result;
}
@@ -155,7 +153,6 @@ IGraphTransformer::TStatus PopulateJoinStrategySizeInfo(
TRelSizeInfo& outLeft,
TRelSizeInfo& outRight,
const TYtState::TPtr& state,
- TString cluster,
TExprContext& ctx,
TYtJoinNodeOp* op) {
auto mapJoinUseFlow = state->Configuration->MapJoinUseFlow.Get().GetOrElse(DEFAULT_MAP_JOIN_USE_FLOW);
@@ -217,22 +214,22 @@ IGraphTransformer::TStatus PopulateJoinStrategySizeInfo(
return IGraphTransformer::TStatus::Ok;
}
- auto status = ExtractInMemorySize(state, cluster, ctx, outLeft.MapJoinMemSize, outRight.MapJoinMemSize, ESizeStatCollectMode::ColumnarSize, op, labels,
+ auto status = ExtractInMemorySize(state, ctx, outLeft.MapJoinMemSize, outRight.MapJoinMemSize, ESizeStatCollectMode::ColumnarSize, op, labels,
numLeaves, leftLeaf, leftTablesReady, leftTables, leftJoinKeys, leftItemType,
rightLeaf, rightTablesReady, rightTables, rightJoinKeys, rightItemType);
if (status != IGraphTransformer::TStatus::Ok) {
return status;
}
- status = ExtractInMemorySize(state, cluster, ctx, outLeft.LookupJoinMemSize, outRight.LookupJoinMemSize, ESizeStatCollectMode::RawSize, op, labels,
+ status = ExtractInMemorySize(state, ctx, outLeft.LookupJoinMemSize, outRight.LookupJoinMemSize, ESizeStatCollectMode::RawSize, op, labels,
numLeaves, leftLeaf, leftTablesReady, leftTables, leftJoinKeys, leftItemType,
rightLeaf, rightTablesReady, rightTables, rightJoinKeys, rightItemType);
return status;
}
-IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
+IGraphTransformer::TStatus CollectCboStats(TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
THashMap<TString, THashSet<TString>> relJoinColumns;
- return CollectCboStatsNode(relJoinColumns, cluster, op, state, ctx);
+ return CollectCboStatsNode(relJoinColumns, op, state, ctx);
}
TVector<TString> JoinLeafLabels(TExprNode::TPtr label) {
diff --git a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h
index 0d05beea7a..fd5b894bb2 100644
--- a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h
+++ b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h
@@ -5,9 +5,9 @@
namespace NYql {
-IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
+IGraphTransformer::TStatus CollectCboStats(TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
-IGraphTransformer::TStatus PopulateJoinStrategySizeInfo(TRelSizeInfo& outLeft, TRelSizeInfo& outRight, const TYtState::TPtr& state, TString cluster, TExprContext& ctx, TYtJoinNodeOp* op);
+IGraphTransformer::TStatus PopulateJoinStrategySizeInfo(TRelSizeInfo& outLeft, TRelSizeInfo& outRight, const TYtState::TPtr& state, TExprContext& ctx, TYtJoinNodeOp* op);
TVector<TString> JoinLeafLabels(TExprNode::TPtr label);
diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
index 9b691e5554..5ffc4cf1ef 100644
--- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
@@ -157,7 +157,7 @@ public:
{
}
- TVector<TVector<ui64>> EstimateColumnStats(TExprContext& ctx, const TString& cluster, const TVector<TVector<TYtPathInfo::TPtr>>& groupIdPathInfos, ui64& sumAllTableSizes) {
+ TVector<TVector<ui64>> EstimateColumnStats(TExprContext& ctx, const TVector<TVector<TYtPathInfo::TPtr>>& groupIdPathInfos, ui64& sumAllTableSizes) {
TVector<TVector<ui64>> groupIdColumnarStats;
groupIdColumnarStats.reserve(groupIdPathInfos.size());
TVector<bool> lookupsInfo;
@@ -175,7 +175,7 @@ public:
flattenPaths.push_back(pathInfo);
}
}
- auto result = EstimateDataSize(cluster, flattenPaths, Nothing(), *State_, ctx);
+ auto result = EstimateDataSize(flattenPaths, Nothing(), *State_, ctx);
size_t statIdx = 0;
size_t pathIdx = 0;
for (const auto& [idx, pathInfos]: Enumerate(groupIdPathInfos)) {
@@ -302,7 +302,7 @@ public:
} else {
TVector<TVector<std::tuple<ui64, ui64, NYT::TRichYPath>>> partitionTuplesArr;
ui64 sumAllTableSizes = 0;
- TVector<TVector<ui64>> groupIdColumnarStats = EstimateColumnStats(ctx, cluster, {groupIdPathInfos}, sumAllTableSizes);
+ TVector<TVector<ui64>> groupIdColumnarStats = EstimateColumnStats(ctx, {groupIdPathInfos}, sumAllTableSizes);
ui64 parts = (sumAllTableSizes + dataSizePerJob - 1) / dataSizePerJob;
if (settings.CanFallback && hasErasure && parts > maxTasks) {
auto message = DqFallbackErrorMessageWrap("too big table with erasure codec");
@@ -634,7 +634,7 @@ public:
}
ui64 dataSize = 0;
for (auto& [cluster, info]: clusterToNodesAndErasure) {
- auto res = EstimateColumnStats(ctx, cluster, clusterToGroups[cluster], dataSize);
+ auto res = EstimateColumnStats(ctx, clusterToGroups[cluster], dataSize);
auto codecCpu = State_->Configuration->ErasureCodecCpuForDq.Get(cluster);
if (!codecCpu) {
continue;
diff --git a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
index 32c1dc2359..b1868c0a4a 100644
--- a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
@@ -188,7 +188,7 @@ bool IsYtIsolatedLambdaImpl(const TExprNode& lambdaBody, TSyncMap& syncList, TSt
}
IGraphTransformer::TStatus EstimateDataSize(IYtGateway::TPathStatResult& result, TSet<TString>& requestedColumns,
- const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths,
+ const TVector<TYtPathInfo::TPtr>& paths,
const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx, bool sync)
{
result = IYtGateway::TPathStatResult{};
@@ -199,9 +199,10 @@ IGraphTransformer::TStatus EstimateDataSize(IYtGateway::TPathStatResult& result,
const bool useColumnarStat = GetJoinCollectColumnarStatisticsMode(*state.Configuration) != EJoinCollectColumnarStatisticsMode::Disable
&& !state.Types->UseTableMetaFromGraph;
- TVector<size_t> reqMap;
TVector<IYtGateway::TPathStatReq> pathStatReqs;
- ui64 totalChunkCount = 0;
+ THashMap<TString, TVector<size_t>> reqMapByCluster;
+ TMap<TString, TVector<IYtGateway::TPathStatReq>> pathStatReqsByCluster;
+ THashMap<TString, ui64> totalChunkCountByCluster;
for (size_t i: xrange(paths.size())) {
const TYtPathInfo::TPtr& pathInfo = paths[i];
YQL_ENSURE(pathInfo->Table->Stat);
@@ -222,62 +223,83 @@ IGraphTransformer::TStatus EstimateDataSize(IYtGateway::TPathStatResult& result,
overrideColumns = columns;
}
- auto ytPath = BuildYtPathForStatRequest(cluster, *pathInfo, overrideColumns, state, ctx);
+ auto ytPath = BuildYtPathForStatRequest(*pathInfo, overrideColumns, state, ctx);
if (!ytPath) {
return IGraphTransformer::TStatus::Error;
}
if (ytPath->Columns_) {
- pathStatReqs.push_back(
+ const TString cluster = pathInfo->Table->Cluster;
+ YQL_ENSURE(cluster);
+ pathStatReqsByCluster[cluster].push_back(
IYtGateway::TPathStatReq()
.Path(*ytPath)
.IsTemp(pathInfo->Table->IsTemp)
.IsAnonymous(pathInfo->Table->IsAnonymous)
.Epoch(pathInfo->Table->Epoch.GetOrElse(0))
);
- reqMap.push_back(i);
- totalChunkCount += pathInfo->Table->Stat->ChunkCount;
+ reqMapByCluster[cluster].push_back(i);
+ totalChunkCountByCluster[cluster] += pathInfo->Table->Stat->ChunkCount;
}
}
}
- if (!pathStatReqs.empty()) {
- for (auto& req : pathStatReqs) {
- YQL_ENSURE(req.Path().Columns_);
- requestedColumns.insert(req.Path().Columns_->Parts_.begin(), req.Path().Columns_->Parts_.end());
+ if (!pathStatReqsByCluster.empty()) {
+ const TMaybe<ui64> maxChunkCountExtendedStats = state.Configuration->ExtendedStatsMaxChunkCount.Get();
+ TMap<TString, IYtGateway::TPathStatResult> pathStatsByCluster;
+ TMap<TString, NThreading::TFuture<IYtGateway::TPathStatResult>> futuresByCluster;
+ THashSet<TString> extendedStatsRequested;
+ IGraphTransformer::TStatus resultStatus = IGraphTransformer::TStatus::Ok;
+ for (const auto& [cluster, reqs] : pathStatReqsByCluster) {
+ for (auto& req : reqs) {
+ YQL_ENSURE(req.Path().Columns_);
+ requestedColumns.insert(req.Path().Columns_->Parts_.begin(), req.Path().Columns_->Parts_.end());
+ }
+ const bool requestExtendedStats = !sync && maxChunkCountExtendedStats &&
+ (*maxChunkCountExtendedStats == 0 || totalChunkCountByCluster[cluster] <= *maxChunkCountExtendedStats);
+ IYtGateway::TPathStatOptions pathStatOptions =
+ IYtGateway::TPathStatOptions(state.SessionId)
+ .Cluster(cluster)
+ .Paths(reqs)
+ .Config(state.Configuration->Snapshot())
+ .Extended(requestExtendedStats);
+ if (requestExtendedStats) {
+ extendedStatsRequested.insert(cluster);
+ }
+ if (sync) {
+ futuresByCluster[cluster] = state.Gateway->PathStat(std::move(pathStatOptions));
+ } else {
+ auto& pathStats = pathStatsByCluster[cluster];
+ pathStats = state.Gateway->TryPathStat(std::move(pathStatOptions));
+ if (!pathStats.Success()) {
+ resultStatus = resultStatus.Combine(IGraphTransformer::TStatus::Repeat);
+ }
+ }
}
- const TMaybe<ui64> maxChunkCountExtendedStats = state.Configuration->ExtendedStatsMaxChunkCount.Get();
- const bool requestExtendedStats = !sync && maxChunkCountExtendedStats &&
- (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats);
-
- IYtGateway::TPathStatResult pathStats;
- IYtGateway::TPathStatOptions pathStatOptions =
- IYtGateway::TPathStatOptions(state.SessionId)
- .Cluster(cluster)
- .Paths(pathStatReqs)
- .Config(state.Configuration->Snapshot())
- .Extended(requestExtendedStats);
- if (sync) {
- auto future = state.Gateway->PathStat(std::move(pathStatOptions));
+ for (auto& [cluster, future] : futuresByCluster) {
+ auto& pathStats = pathStatsByCluster[cluster];
pathStats = future.GetValueSync();
pathStats.ReportIssues(ctx.IssueManager);
if (!pathStats.Success()) {
- return IGraphTransformer::TStatus::Error;
- }
- } else {
- pathStats = state.Gateway->TryPathStat(std::move(pathStatOptions));
- if (!pathStats.Success()) {
- return IGraphTransformer::TStatus::Repeat;
+ resultStatus = resultStatus.Combine(IGraphTransformer::TStatus::Error);
}
}
- YQL_ENSURE(pathStats.DataSize.size() == reqMap.size());
- YQL_ENSURE(!requestExtendedStats || pathStats.Extended.size() == reqMap.size());
- for (size_t i: xrange(pathStats.DataSize.size())) {
- result.DataSize[reqMap[i]] = pathStats.DataSize[i];
- if (requestExtendedStats) {
- result.Extended[reqMap[i]] = pathStats.Extended[i];
+ if (resultStatus != IGraphTransformer::TStatus::Ok) {
+ return resultStatus;
+ }
+
+ for (auto& [cluster, pathStats] : pathStatsByCluster) {
+ auto it = reqMapByCluster.find(cluster);
+ YQL_ENSURE(it != reqMapByCluster.end());
+ YQL_ENSURE(pathStats.DataSize.size() == it->second.size());
+ YQL_ENSURE(!extendedStatsRequested.contains(cluster) || pathStats.Extended.size() == it->second.size());
+ for (size_t i: xrange(pathStats.DataSize.size())) {
+ result.DataSize[it->second[i]] = pathStats.DataSize[i];
+ if (extendedStatsRequested.contains(cluster)) {
+ result.Extended[it->second[i]] = pathStats.Extended[i];
+ }
}
}
}
@@ -1847,7 +1869,7 @@ bool IsOutputUsedMultipleTimes(const TExprNode& op, const TParentsMap& parentsMa
return node == nullptr;
}
-TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const TYtPathInfo& pathInfo,
+TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TYtPathInfo& pathInfo,
const TMaybe<TVector<TString>>& overrideColumns, const TYtState& state, TExprContext& ctx)
{
auto ytPath = NYT::TRichYPath(pathInfo.Table->Name);
@@ -1858,6 +1880,8 @@ TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const
if (ytPath.Columns_ && dynamic_cast<TYtTableInfo*>(pathInfo.Table.Get()) && pathInfo.Table->IsAnonymous
&& !TYtTableInfo::HasSubstAnonymousLabel(pathInfo.Table->FromNode.Cast())) {
+ const TString cluster = pathInfo.Table->Cluster;
+ YQL_ENSURE(cluster);
TString realTableName = state.AnonymousLabels.Value(std::make_pair(cluster, pathInfo.Table->Name), TString());
if (!realTableName) {
TPositionHandle pos;
@@ -1873,7 +1897,7 @@ TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const
return ytPath;
}
-TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths,
+TMaybe<TVector<ui64>> EstimateDataSize(const TVector<TYtPathInfo::TPtr>& paths,
const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx)
{
TVector<ui64> result;
@@ -1882,7 +1906,7 @@ TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYt
bool sync = true;
IYtGateway::TPathStatResult res;
- auto status = EstimateDataSize(res, requestedColumns, cluster, paths, columns, state, ctx, sync);
+ auto status = EstimateDataSize(res, requestedColumns, paths, columns, state, ctx, sync);
if (status != IGraphTransformer::TStatus::Ok) {
return {};
}
@@ -1891,11 +1915,11 @@ TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYt
}
IGraphTransformer::TStatus TryEstimateDataSize(IYtGateway::TPathStatResult& result, TSet<TString>& requestedColumns,
- const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths,
+ const TVector<TYtPathInfo::TPtr>& paths,
const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx)
{
bool sync = false;
- return EstimateDataSize(result, requestedColumns, cluster, paths, columns, state, ctx, sync);
+ return EstimateDataSize(result, requestedColumns, paths, columns, state, ctx, sync);
}
TYtSection UpdateInputFields(TYtSection section, TExprBase fields, TExprContext& ctx) {
diff --git a/yt/yql/providers/yt/provider/yql_yt_helpers.h b/yt/yql/providers/yt/provider/yql_yt_helpers.h
index 61a1c4bab5..3a821b16d9 100644
--- a/yt/yql/providers/yt/provider/yql_yt_helpers.h
+++ b/yt/yql/providers/yt/provider/yql_yt_helpers.h
@@ -90,12 +90,12 @@ NNodes::TYtPath CopyOrTrivialMap(TPositionHandle pos, NNodes::TExprBase world, N
const TCopyOrTrivialMapOpts& opts);
bool IsOutputUsedMultipleTimes(const TExprNode& op, const TParentsMap& parentsMap);
-TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths,
+TMaybe<TVector<ui64>> EstimateDataSize( const TVector<TYtPathInfo::TPtr>& paths,
const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx);
IGraphTransformer::TStatus TryEstimateDataSize(IYtGateway::TPathStatResult& result, TSet<TString>& requestedColumns,
- const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths,
+ const TVector<TYtPathInfo::TPtr>& paths,
const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx);
-TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const TYtPathInfo& pathInfo,
+TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TYtPathInfo& pathInfo,
const TMaybe<TVector<TString>>& overrideColumns, const TYtState& state, TExprContext& ctx);
NNodes::TYtSection UpdateInputFields(NNodes::TYtSection section, NNodes::TExprBase fields, TExprContext& ctx);
diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
index 9f663d2897..f4942ab18d 100644
--- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
@@ -193,11 +193,11 @@ bool HasNonTrivialAny(const TEquiJoinLinkSettings& linkSettings, const TMapJoinS
TStatus UpdateInMemorySizeSetting(TMapJoinSettings& settings, TYtSection& inputSection, const TJoinLabels& labels,
const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft,
- const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster,
+ const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state,
const TVector<TYtPathInfo::TPtr>& tables, bool mapJoinUseFlow)
{
ui64 size = 0;
- auto status = CalculateJoinLeafSize(size, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, cluster, tables);
+ auto status = CalculateJoinLeafSize(size, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, tables);
if (status != TStatus::Ok) {
return status;
}
@@ -230,11 +230,11 @@ TStatus UpdateInMemorySizeSetting(TMapJoinSettings& settings, TYtSection& inputS
TStatus UpdateInMemorySizeUsingBlocksSetting(TMapJoinSettings& settings, TYtSection& inputSection,
const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft,
- const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster,
+ const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state,
const TVector<TYtPathInfo::TPtr>& tables)
{
ui64 dataSize = 0;
- auto status = CalculateJoinLeafSize(dataSize, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, cluster, tables);
+ auto status = CalculateJoinLeafSize(dataSize, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, tables);
if (status != TStatus::Ok) {
return status;
}
@@ -3050,7 +3050,7 @@ bool RewriteYtEmptyJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, TYtJoin
}
TStatus CollectJoinSideStats(ESizeStatCollectMode sizeMode, TJoinSideStats& stats, TYtSection& inputSection,
- const TYtState& state, const TString& cluster,
+ const TYtState& state,
const TVector<TYtPathInfo::TPtr>& tableInfo, const THashSet<TString>& joinKeys,
bool isCross, TMaybeNode<TCoLambda> premap, TExprContext& ctx)
{
@@ -3103,7 +3103,7 @@ TStatus CollectJoinSideStats(ESizeStatCollectMode sizeMode, TJoinSideStats& stat
}
IYtGateway::TPathStatResult pathStatResult;
- auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, cluster, tableInfo, {}, state, ctx);
+ auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, tableInfo, {}, state, ctx);
if (status.Level != TStatus::Ok) {
return status;
}
@@ -3271,8 +3271,6 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
return TStatus::Repeat;
}
- auto cluster = TString{equiJoin.DataSink().Cluster().Value()};
-
TMapJoinSettings mapSettings;
TJoinSideStats leftStats;
TJoinSideStats rightStats;
@@ -3281,7 +3279,7 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
if (allowLookupJoin) {
auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::RawSize, mapSettings, leftStats, rightStats,
leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys,
- &leftLeaf, &rightLeaf, *state, isCross, cluster, ctx);
+ &leftLeaf, &rightLeaf, *state, isCross, ctx);
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
@@ -3352,7 +3350,7 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
{
auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::ColumnarSize, mapSettings, leftStats, rightStats,
leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys,
- &leftLeaf, &rightLeaf, *state, isCross, cluster, ctx);
+ &leftLeaf, &rightLeaf, *state, isCross, ctx);
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
@@ -3624,13 +3622,13 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
bool mapJoinUseBlocks = state->Configuration->BlockMapJoin.Get().GetOrElse(state->Types->UseBlocks);
if (leftTablesReady) {
- auto status = UpdateInMemorySizeSetting(mapSettings, leftLeaf.Section, labels, op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables, mapJoinUseFlow);
+ auto status = UpdateInMemorySizeSetting(mapSettings, leftLeaf.Section, labels, op, ctx, true, leftItemType, leftJoinKeyList, state, leftTables, mapJoinUseFlow);
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
if (mapJoinUseBlocks) {
- auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, leftLeaf.Section, op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables);
+ auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, leftLeaf.Section, op, ctx, true, leftItemType, leftJoinKeyList, state, leftTables);
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
@@ -3638,13 +3636,13 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
}
if (rightTablesReady) {
- auto status = UpdateInMemorySizeSetting(mapSettings, rightLeaf.Section, labels, op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables, mapJoinUseFlow);
+ auto status = UpdateInMemorySizeSetting(mapSettings, rightLeaf.Section, labels, op, ctx, false, rightItemType, rightJoinKeyList, state, rightTables, mapJoinUseFlow);
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
if (mapJoinUseBlocks) {
- auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, rightLeaf.Section, op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables);
+ auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, rightLeaf.Section, op, ctx, false, rightItemType, rightJoinKeyList, state, rightTables);
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
@@ -3947,9 +3945,6 @@ void CollectPossibleStarJoins(const TYtEquiJoin& equiJoin, TYtJoinNodeOp& op, co
rightJoinKeyList = BuildJoinKeyList(labels.Inputs[leftLeaf ? 1 : 0], *op.RightLabel);
}
-
- auto cluster = TString{equiJoin.DataSink().Cluster().Value()};
-
TMapJoinSettings mapSettings;
TJoinSideStats leftStats;
TJoinSideStats rightStats;
@@ -3958,7 +3953,7 @@ void CollectPossibleStarJoins(const TYtEquiJoin& equiJoin, TYtJoinNodeOp& op, co
bool isCross = false;
auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::NoSize, mapSettings, leftStats, rightStats,
leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys,
- leftLeaf, rightLeaf, *state, isCross, cluster, ctx);
+ leftLeaf, rightLeaf, *state, isCross, ctx);
switch (status.Level) {
case TStatus::Error:
@@ -4866,12 +4861,12 @@ EStarRewriteStatus RewriteYtEquiJoinStar(TYtEquiJoin equiJoin, TYtJoinNodeOp& op
} // namespace
-IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection, const TString& cluster,
+IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection,
const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx)
{
result = IYtGateway::TPathStatResult();
if (GetJoinCollectColumnarStatisticsMode(*state.Configuration) == EJoinCollectColumnarStatisticsMode::Sync) {
- auto syncResult = EstimateDataSize(cluster, paths, columns, state, ctx);
+ auto syncResult = EstimateDataSize(paths, columns, state, ctx);
if (!syncResult) {
return IGraphTransformer::TStatus::Error;
}
@@ -4881,7 +4876,7 @@ IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResul
}
TSet<TString> requestedColumns;
- auto status = TryEstimateDataSize(result, requestedColumns, cluster, paths, columns, state, ctx);
+ auto status = TryEstimateDataSize(result, requestedColumns, paths, columns, state, ctx);
auto settings = inputSection.Settings().Ptr();
if (status == TStatus::Repeat) {
bool hasStatColumns = NYql::HasSetting(inputSection.Settings().Ref(), EYtSettingType::StatColumns);
@@ -4935,7 +4930,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe
bool leftTablesReady, const TVector<TYtPathInfo::TPtr>& leftTables, const THashSet<TString>& leftJoinKeys,
bool rightTablesReady, const TVector<TYtPathInfo::TPtr>& rightTables, const THashSet<TString>& rightJoinKeys,
TYtJoinNodeLeaf* leftLeaf, TYtJoinNodeLeaf* rightLeaf, const TYtState& state, bool isCross,
- TString cluster, TExprContext& ctx)
+ TExprContext& ctx)
{
mapSettings = {};
leftStats = {};
@@ -4943,7 +4938,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe
if (leftLeaf) {
auto premap = GetPremapLambda(*leftLeaf);
- auto joinSideStatus = CollectJoinSideStats(leftTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, leftStats, leftLeaf->Section, state, cluster,
+ auto joinSideStatus = CollectJoinSideStats(leftTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, leftStats, leftLeaf->Section, state,
leftTables, leftJoinKeys, isCross, premap, ctx);
if (joinSideStatus.Level != TStatus::Ok) {
return joinSideStatus;
@@ -4959,7 +4954,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe
if (rightLeaf) {
auto premap = GetPremapLambda(*rightLeaf);
- auto joinSideStatus = CollectJoinSideStats(rightTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, rightStats, rightLeaf->Section, state, cluster,
+ auto joinSideStatus = CollectJoinSideStats(rightTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, rightStats, rightLeaf->Section, state,
rightTables, rightJoinKeys, isCross, premap, ctx);
if (joinSideStatus.Level != TStatus::Ok) {
return joinSideStatus;
@@ -4983,7 +4978,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe
TStatus CalculateJoinLeafSize(ui64& result, TMapJoinSettings& settings, TYtSection& inputSection,
const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft,
- const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster,
+ const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state,
const TVector<TYtPathInfo::TPtr>& tables)
{
result = isLeft ? settings.LeftSize : settings.RightSize;
@@ -4992,7 +4987,7 @@ TStatus CalculateJoinLeafSize(ui64& result, TMapJoinSettings& settings, TYtSecti
if (!needPayload && !op.JoinKind->IsAtom("Cross")) {
if (joinKeyList.size() < itemType->GetSize()) {
IYtGateway::TPathStatResult pathStatResult;
- auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, cluster, tables, joinKeyList, *state, ctx);
+ auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, tables, joinKeyList, *state, ctx);
if (status.Level != TStatus::Ok) {
return status;
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.h b/yt/yql/providers/yt/provider/yql_yt_join_impl.h
index d8702fa43e..80f79f4d3b 100644
--- a/yt/yql/providers/yt/provider/yql_yt_join_impl.h
+++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.h
@@ -72,12 +72,12 @@ TYtJoinNodeOp::TPtr ImportYtEquiJoin(TYtEquiJoin equiJoin, TExprContext& ctx);
IGraphTransformer::TStatus RewriteYtEquiJoinLeaves(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
IGraphTransformer::TStatus RewriteYtEquiJoin(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
TMaybeNode<TExprBase> ExportYtEquiJoin(TYtEquiJoin equiJoin, const TYtJoinNodeOp& op, TExprContext& ctx, const TYtState::TPtr& state);
-TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, const TString& cluster, TExprContext& ctx, bool debug = false);
+TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, TExprContext& ctx, bool debug = false);
struct IBaseOptimizerNode;
struct IProviderContext;
-void BuildOptimizerJoinTree(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& settings, TYtJoinNodeOp::TPtr op, TExprContext& ctx);
+void BuildOptimizerJoinTree(TYtState::TPtr state, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& settings, TYtJoinNodeOp::TPtr op, TExprContext& ctx);
TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TExprContext& ctx, TPositionHandle pos);
bool AreSimilarTrees(TYtJoinNode::TPtr node1, TYtJoinNode::TPtr node2);
@@ -89,7 +89,7 @@ IGraphTransformer::TStatus CollectPathsAndLabelsReady(
IGraphTransformer::TStatus CalculateJoinLeafSize(ui64& result, TMapJoinSettings& settings, TYtSection& inputSection,
const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft,
- const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster,
+ const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state,
const TVector<TYtPathInfo::TPtr>& tables);
enum class ESizeStatCollectMode {
@@ -115,9 +115,9 @@ IGraphTransformer::TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode s
bool leftTablesReady, const TVector<TYtPathInfo::TPtr>& leftTables, const THashSet<TString>& leftJoinKeys,
bool rightTablesReady, const TVector<TYtPathInfo::TPtr>& rightTables, const THashSet<TString>& rightJoinKeys,
TYtJoinNodeLeaf* leftLeaf, TYtJoinNodeLeaf* rightLeaf, const TYtState& state, bool isCross,
- TString cluster, TExprContext& ctx);
+ TExprContext& ctx);
-IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection, const TString& cluster,
+IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection,
const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx);
ui64 CalcInMemorySizeNoCrossJoin(const TJoinLabel& label, const TYtJoinNodeOp& op, const TMapJoinSettings& settings, bool isLeft,
diff --git a/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp b/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp
index a195a8f5d0..ca808b621b 100644
--- a/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp
@@ -58,12 +58,10 @@ public:
TJoinReorderer(
TYtJoinNodeOp::TPtr op,
const TYtState::TPtr& state,
- const TString& cluster,
TExprContext& ctx,
bool debug = false)
: Root(op)
, State(state)
- , Cluster(cluster)
, Ctx(ctx)
, Debug(debug)
{
@@ -78,7 +76,7 @@ public:
std::shared_ptr<IBaseOptimizerNode> tree;
TOptimizerLinkSettings linkSettings;
std::shared_ptr<IProviderContext> providerCtx;
- BuildOptimizerJoinTree(State, Cluster, tree, providerCtx, linkSettings, Root, Ctx);
+ BuildOptimizerJoinTree(State, tree, providerCtx, linkSettings, Root, Ctx);
auto ytCtx = std::static_pointer_cast<TYtProviderContext>(providerCtx);
std::function<void(const TString& str)> log;
@@ -139,7 +137,6 @@ public:
private:
TYtJoinNodeOp::TPtr Root;
const TYtState::TPtr& State;
- TString Cluster;
TExprContext& Ctx;
bool Debug;
};
@@ -177,9 +174,8 @@ class TOptimizerTreeBuilder
{
public:
TOptimizerLinkSettings LinkSettings;
- TOptimizerTreeBuilder(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TYtJoinNodeOp::TPtr inputTree, TExprContext& ctx)
+ TOptimizerTreeBuilder(TYtState::TPtr state, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TYtJoinNodeOp::TPtr inputTree, TExprContext& ctx)
: State(state)
- , Cluster(cluster)
, Tree(tree)
, OutProviderCtx(providerCtx)
, InputTree(inputTree)
@@ -258,7 +254,7 @@ private:
}
TRelSizeInfo leftSizeInfo;
TRelSizeInfo rightSizeInfo;
- PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, State, Cluster, Ctx, op);
+ PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, State, Ctx, op);
auto left = ProcessNode(op->Left, leftSizeInfo);
auto right = ProcessNode(op->Right, rightSizeInfo);
@@ -374,7 +370,7 @@ private:
TSet<TString> requestedColumns;
IYtGateway::TPathStatResult result;
- auto status = TryEstimateDataSize(result, requestedColumns, Cluster, paths, columns, *State, Ctx);
+ auto status = TryEstimateDataSize(result, requestedColumns, paths, columns, *State, Ctx);
YQL_ENSURE(status != IGraphTransformer::TStatus::Error);
if (status != IGraphTransformer::TStatus::Ok) {
YQL_CLOG(WARN, ProviderYt) << "Unable to read path stats that must be already present in cache";
@@ -434,7 +430,6 @@ private:
}
TYtState::TPtr State;
- const TString Cluster;
std::shared_ptr<IBaseOptimizerNode>& Tree;
std::shared_ptr<IProviderContext>& OutProviderCtx;
THashMap<TString, THashSet<TString>> RelJoinColumns;
@@ -517,9 +512,9 @@ bool AreSimilarTrees(TYtJoinNode::TPtr node1, TYtJoinNode::TPtr node2) {
}
}
-void BuildOptimizerJoinTree(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& linkSettings, TYtJoinNodeOp::TPtr op, TExprContext& ctx)
+void BuildOptimizerJoinTree(TYtState::TPtr state, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& linkSettings, TYtJoinNodeOp::TPtr op, TExprContext& ctx)
{
- TOptimizerTreeBuilder builder(state, cluster, tree, providerCtx, op, ctx);
+ TOptimizerTreeBuilder builder(state, tree, providerCtx, op, ctx);
builder.Do();
linkSettings = builder.LinkSettings;
}
@@ -529,13 +524,13 @@ TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TExp
return BuildYtJoinTree(node, scope, ctx, pos);
}
-TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, const TString& cluster, TExprContext& ctx, bool debug)
+TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, TExprContext& ctx, bool debug)
{
if (state->Types->CostBasedOptimizer == ECostBasedOptimizerType::Disable || op->CostBasedOptPassed) {
return op;
}
- auto result = TJoinReorderer(op, state, cluster, ctx, debug).Do();
+ auto result = TJoinReorderer(op, state, ctx, debug).Do();
if (!debug && AreSimilarTrees(result, op)) {
return op;
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp b/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp
index 789938e02f..525c967853 100644
--- a/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp
@@ -29,22 +29,21 @@ public:
YQL_ENSURE(PathStatResults.empty());
}
- TNodeMap<IYtGateway::TPathStatResult> PullPathStatResults() {
- TNodeMap<IYtGateway::TPathStatResult> results;
+ TNodeMap<TVector<IYtGateway::TPathStatResult>> PullPathStatResults() {
+ TNodeMap<TVector<IYtGateway::TPathStatResult>> results;
TGuard<TMutex> guard(Lock);
results.swap(PathStatResults);
return results;
}
- void MarkReady(TExprNode* node, const IYtGateway::TPathStatResult& result) {
+ void AddResult(TExprNode* node, const IYtGateway::TPathStatResult& result) {
TGuard<TMutex> guard(Lock);
- YQL_ENSURE(PathStatResults.count(node) == 0);
- PathStatResults[node] = result;
+ PathStatResults[node].push_back(result);
}
private:
mutable TMutex Lock;
- TNodeMap<IYtGateway::TPathStatResult> PathStatResults;
+ TNodeMap<TVector<IYtGateway::TPathStatResult>> PathStatResults;
};
class TYtLoadColumnarStatsTransformer : public TGraphTransformerBase {
@@ -65,7 +64,7 @@ private:
output = input;
PathStatusState->EnsureNoInflightRequests();
- TVector<std::pair<IYtGateway::TPathStatOptions, TExprNode*>> pathStatArgs;
+ TVector<std::pair<TVector<IYtGateway::TPathStatOptions>, TExprNode*>> pathStatArgs;
bool hasError = false;
TNodeOnNodeOwnedMap sectionRewrites;
VisitExpr(input, [this, &pathStatArgs, &hasError, &sectionRewrites, &ctx](const TExprNode::TPtr& node) {
@@ -75,10 +74,9 @@ private:
if (NYql::HasSetting(section.Settings().Ref(), EYtSettingType::StatColumns)) {
auto columnList = NYql::GetSettingAsColumnList(section.Settings().Ref(), EYtSettingType::StatColumns);
- TMaybe<TString> cluster;
- TVector<IYtGateway::TPathStatReq> pathStatReqs;
+ TMap<TString, TVector<IYtGateway::TPathStatReq>> pathStatReqsByCluster;
size_t idx = 0;
- ui64 totalChunkCount = 0;
+ THashMap<TString, ui64> totalChunkCountByCluster;
for (auto path: section.Paths()) {
bool hasStat = false;
if (path.Table().Maybe<TYtTable>().Stat().Maybe<TYtStat>()) {
@@ -108,30 +106,18 @@ private:
}
TYtPathInfo pathInfo(path);
+ const TString cluster = pathInfo.Table->Cluster;
+ YQL_ENSURE(cluster);
YQL_ENSURE(pathInfo.Table->Stat);
- totalChunkCount += pathInfo.Table->Stat->ChunkCount;
+ totalChunkCountByCluster[cluster] += pathInfo.Table->Stat->ChunkCount;
- TString currCluster;
- if (auto ytTable = path.Table().Maybe<TYtTable>()) {
- currCluster = TString{ytTable.Cast().Cluster().Value()};
- } else {
- currCluster = TString{GetOutputOp(path.Table().Cast<TYtOutput>()).DataSink().Cluster().Value()};
- }
- YQL_ENSURE(currCluster);
-
- if (cluster) {
- YQL_ENSURE(currCluster == *cluster);
- } else {
- cluster = currCluster;
- }
-
- auto ytPath = BuildYtPathForStatRequest(*cluster, pathInfo, columnList, *State_, ctx);
+ auto ytPath = BuildYtPathForStatRequest(pathInfo, columnList, *State_, ctx);
if (!ytPath) {
hasError = true;
return false;
}
- pathStatReqs.push_back(
+ pathStatReqsByCluster[cluster].push_back(
IYtGateway::TPathStatReq()
.Path(*ytPath)
.IsTemp(pathInfo.Table->IsTemp)
@@ -142,21 +128,28 @@ private:
++idx;
}
- bool requestExtendedStats = maxChunkCountExtendedStats &&
- (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats);
-
- if (pathStatReqs) {
- auto pathStatOptions = IYtGateway::TPathStatOptions(State_->SessionId)
- .Cluster(*cluster)
+ TVector<IYtGateway::TPathStatOptions> pathStatOptions;
+ for (auto& [cluster, pathStatReqs] : pathStatReqsByCluster) {
+ auto itCount = totalChunkCountByCluster.find(cluster);
+ YQL_ENSURE(itCount != totalChunkCountByCluster.end());
+ const ui64 totalChunkCount = itCount->second;
+ bool requestExtendedStats = maxChunkCountExtendedStats &&
+ (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats);
+ YQL_ENSURE(!pathStatReqs.empty());
+ auto options = IYtGateway::TPathStatOptions(State_->SessionId)
+ .Cluster(cluster)
.Paths(pathStatReqs)
.Config(State_->Configuration->Snapshot())
.Extended(requestExtendedStats);
-
- auto tryResult = State_->Gateway->TryPathStat(IYtGateway::TPathStatOptions(pathStatOptions));
+ auto tryResult = State_->Gateway->TryPathStat(IYtGateway::TPathStatOptions(options));
if (!tryResult.Success()) {
- pathStatArgs.emplace_back(std::move(pathStatOptions), node.Get());
+ pathStatOptions.push_back(std::move(options));
}
}
+
+ if (pathStatOptions) {
+ pathStatArgs.emplace_back(std::move(pathStatOptions), node.Get());
+ }
}
}
return !hasError;
@@ -177,16 +170,20 @@ private:
}
TVector<NThreading::TFuture<void>> futures;
- YQL_CLOG(INFO, ProviderYt) << "Starting " << pathStatArgs.size() << " requests for columnar stats";
+ size_t reqCount = 0;
+ for (const auto& arg : pathStatArgs) {
+ reqCount += arg.first.size();
+ }
+ YQL_CLOG(INFO, ProviderYt) << "Starting " << reqCount << " requests for columnar stats";
for (auto& arg : pathStatArgs) {
- IYtGateway::TPathStatOptions& options = arg.first;
+ TVector<IYtGateway::TPathStatOptions>& options = arg.first;
TExprNode* node = arg.second;
-
- auto future = State_->Gateway->PathStat(std::move(options));
-
- futures.push_back(future.Apply([pathStatusState = PathStatusState, node](const NThreading::TFuture<IYtGateway::TPathStatResult>& result) {
- pathStatusState->MarkReady(node, result.GetValueSync());
- }));
+ for (auto& opt : options) {
+ auto future = State_->Gateway->PathStat(std::move(opt));
+ futures.push_back(future.Apply([pathStatusState = PathStatusState, node](const NThreading::TFuture<IYtGateway::TPathStatResult>& result) {
+ pathStatusState->AddResult(node, result.GetValueSync());
+ }));
+ }
}
AsyncFuture = WaitExceptionOrAll(futures);
@@ -201,26 +198,32 @@ private:
TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
output = input;
- TNodeMap<IYtGateway::TPathStatResult> results = PathStatusState->PullPathStatResults();
+ TNodeMap<TVector<IYtGateway::TPathStatResult>> results = PathStatusState->PullPathStatResults();
YQL_ENSURE(!results.empty());
+ size_t applied = 0;
+ TStatus status = TStatus::Repeat;
for (auto& item : results) {
auto& node = item.first;
- auto& result = item.second;
- if (!result.Success()) {
- TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
- return MakeIntrusive<TIssue>(
- ctx.GetPosition(node->Pos()),
- TStringBuilder() << "Execution of node: " << node->Content()
- );
- });
- result.ReportIssues(ctx.IssueManager);
- return TStatus::Error;
+ auto& batch = item.second;
+ TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
+ return MakeIntrusive<TIssue>(
+ ctx.GetPosition(node->Pos()),
+ TStringBuilder() << "Execution of node: " << node->Content()
+ );
+ });
+ for (auto& result : batch) {
+ if (!result.Success()) {
+ result.ReportIssues(ctx.IssueManager);
+ status = status.Combine(TStatus::Error);
+ }
+ ++applied;
}
}
- YQL_CLOG(INFO, ProviderYt) << "Applied " << results.size() << " results of columnar stats";
- return TStatus::Repeat;
+ YQL_CLOG(INFO, ProviderYt) << "Applied " << applied << " results of columnar stats "
+ << (status == TStatus::Error ? "with errors" : "successfully");
+ return status;
}
TYtState::TPtr State_;
diff --git a/yt/yql/providers/yt/provider/yql_yt_optimize.cpp b/yt/yql/providers/yt/provider/yql_yt_optimize.cpp
index cd0b5e2ade..639307c92e 100644
--- a/yt/yql/providers/yt/provider/yql_yt_optimize.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_optimize.cpp
@@ -533,6 +533,9 @@ IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr&
if (info->Table->Meta->IsDynamic) {
useItemsCount = false;
}
+ if (!info->Table->Cluster) {
+ info->Table->Cluster = maybeRead.Cast().DataSource().Cluster().StringValue();
+ }
records.push_back(tableRecord);
tableInfos.push_back(info);
}
@@ -553,7 +556,7 @@ IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr&
}
}
if (!hasNotCalculated && !tableInfos.empty()) {
- if (auto dataSizes = EstimateDataSize(TString{maybeRead.Cast().DataSource().Cluster().Value()}, tableInfos, Nothing(), *state, ctx)) {
+ if (auto dataSizes = EstimateDataSize(tableInfos, Nothing(), *state, ctx)) {
YQL_ENSURE(dataSizes->size() == records.size());
for (size_t i: xrange(records.size())) {
for (auto& factor: factors) {
diff --git a/yt/yql/providers/ytflow/expr_nodes/ya.make b/yt/yql/providers/ytflow/expr_nodes/ya.make
index ba1eccbb56..7331a46d81 100644
--- a/yt/yql/providers/ytflow/expr_nodes/ya.make
+++ b/yt/yql/providers/ytflow/expr_nodes/ya.make
@@ -13,40 +13,21 @@ SRCDIR(
yql/essentials/core/expr_nodes_gen
)
-IF(EXPORT_CMAKE)
- RUN_PYTHON3(
- ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/gen/__main__.py
- yql_expr_nodes_gen.jnj
- yql_ytflow_expr_nodes.json
- yql_ytflow_expr_nodes.gen.h
- yql_ytflow_expr_nodes.decl.inl.h
- yql_ytflow_expr_nodes.defs.inl.h
- IN yql_expr_nodes_gen.jnj
- IN yql_ytflow_expr_nodes.json
- OUT yql_ytflow_expr_nodes.gen.h
- OUT yql_ytflow_expr_nodes.decl.inl.h
- OUT yql_ytflow_expr_nodes.defs.inl.h
- OUTPUT_INCLUDES
- ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h
- ${ARCADIA_ROOT}/util/generic/hash_set.h
- )
-ELSE()
- RUN_PROGRAM(
- yql/essentials/core/expr_nodes_gen/gen
- yql_expr_nodes_gen.jnj
- yql_ytflow_expr_nodes.json
- yql_ytflow_expr_nodes.gen.h
- yql_ytflow_expr_nodes.decl.inl.h
- yql_ytflow_expr_nodes.defs.inl.h
- IN yql_expr_nodes_gen.jnj
- IN yql_ytflow_expr_nodes.json
- OUT yql_ytflow_expr_nodes.gen.h
- OUT yql_ytflow_expr_nodes.decl.inl.h
- OUT yql_ytflow_expr_nodes.defs.inl.h
- OUTPUT_INCLUDES
- ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h
- ${ARCADIA_ROOT}/util/generic/hash_set.h
- )
-ENDIF()
+RUN_PY3_PROGRAM(
+ yql/essentials/core/expr_nodes_gen/gen
+ yql_expr_nodes_gen.jnj
+ yql_ytflow_expr_nodes.json
+ yql_ytflow_expr_nodes.gen.h
+ yql_ytflow_expr_nodes.decl.inl.h
+ yql_ytflow_expr_nodes.defs.inl.h
+ IN yql_expr_nodes_gen.jnj
+ IN yql_ytflow_expr_nodes.json
+ OUT yql_ytflow_expr_nodes.gen.h
+ OUT yql_ytflow_expr_nodes.decl.inl.h
+ OUT yql_ytflow_expr_nodes.defs.inl.h
+ OUTPUT_INCLUDES
+ ${ARCADIA_ROOT}/yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h
+ ${ARCADIA_ROOT}/util/generic/hash_set.h
+)
END()
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_single_equals.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_single_equals.cfg
new file mode 100644
index 0000000000..d0ce4581d7
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_single_equals.cfg
@@ -0,0 +1 @@
+in Input integer.txt
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_single_equals.sql b/yt/yql/tests/sql/suites/ql_filter/integer_single_equals.sql
new file mode 100644
index 0000000000..de3063e888
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_single_equals.sql
@@ -0,0 +1,5 @@
+pragma yt.UseQLFilter;
+
+select a
+from plato.Input
+where a = 1;
diff --git a/yt/yt/client/api/operation_client.h b/yt/yt/client/api/operation_client.h
index 9a2ccfc976..65154c26d9 100644
--- a/yt/yt/client/api/operation_client.h
+++ b/yt/yt/client/api/operation_client.h
@@ -209,6 +209,7 @@ struct TListJobsOptions
std::optional<bool> WithSpec;
std::optional<bool> WithCompetitors;
std::optional<bool> WithMonitoringDescriptor;
+ std::optional<bool> WithInterruptionInfo;
std::optional<TString> TaskName;
std::optional<std::string> OperationIncarnation;
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index 33bad4dcfc..f8a5f26d26 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -1452,6 +1452,9 @@ TFuture<TListJobsResult> TClient::ListJobs(
if (options.WithMonitoringDescriptor) {
req->set_with_monitoring_descriptor(*options.WithMonitoringDescriptor);
}
+ if (options.WithInterruptionInfo) {
+ req->set_with_interruption_info(*options.WithInterruptionInfo);
+ }
if (options.TaskName) {
req->set_task_name(*options.TaskName);
}
diff --git a/yt/yt/client/driver/scheduler_commands.cpp b/yt/yt/client/driver/scheduler_commands.cpp
index 5694b13e46..6710ee43e2 100644
--- a/yt/yt/client/driver/scheduler_commands.cpp
+++ b/yt/yt/client/driver/scheduler_commands.cpp
@@ -490,6 +490,11 @@ void TListJobsCommand::Register(TRegistrar registrar)
[] (TThis* command) -> auto& { return command->Options.WithMonitoringDescriptor; })
.Optional(/*init*/ false);
+ registrar.ParameterWithUniversalAccessor<std::optional<bool>>(
+ "with_interruption_info",
+ [] (TThis* command) -> auto& { return command->Options.WithInterruptionInfo; })
+ .Optional(/*init*/ false);
+
registrar.ParameterWithUniversalAccessor<std::optional<std::string>>(
"operation_incarnation",
[] (TThis* command) -> auto& { return command->Options.OperationIncarnation; })
diff --git a/yt/yt/core/http/server.cpp b/yt/yt/core/http/server.cpp
index 43ca58c795..feaad321da 100644
--- a/yt/yt/core/http/server.cpp
+++ b/yt/yt/core/http/server.cpp
@@ -537,13 +537,30 @@ IServerPtr CreateServer(
////////////////////////////////////////////////////////////////////////////////
+/*!
+ * Path matching semantic is copied from go standard library.
+ * See https://golang.org/pkg/net/http/#ServeMux
+ *
+ * Supported features:
+ * - matching path exactly: "/path/name"
+ * - matching path prefix: "/path/" matches all with prefix "/path/"
+ * - trailing-slash redirection: matching "/path/" implies "/path"
+ * - end of path wildcard: "/path/{$}" matches only "/path/" and "/path"
+ */
void TRequestPathMatcher::Add(const TString& pattern, const IHttpHandlerPtr& handler)
{
if (pattern.empty()) {
THROW_ERROR_EXCEPTION("Empty pattern is invalid");
}
- if (pattern.back() == '/') {
+ if (pattern.EndsWith("/{$}")) {
+ auto withoutWildcard = pattern.substr(0, pattern.size() - 3);
+
+ Exact_[withoutWildcard] = handler;
+ if (withoutWildcard.size() > 1) {
+ Exact_[withoutWildcard.substr(0, withoutWildcard.size() - 1)] = handler;
+ }
+ } else if (pattern.back() == '/') {
Subtrees_[pattern] = handler;
auto withoutSlash = pattern.substr(0, pattern.size() - 1);
diff --git a/yt/yt/core/http/unittests/http_ut.cpp b/yt/yt/core/http/unittests/http_ut.cpp
index 7030a2350f..4849ed0333 100644
--- a/yt/yt/core/http/unittests/http_ut.cpp
+++ b/yt/yt/core/http/unittests/http_ut.cpp
@@ -1377,6 +1377,21 @@ TEST(THttpHandlerMatchingTest, Simple)
EXPECT_EQ(h3.Get(), handlers3->Match(TStringBuf("/a")).Get());
EXPECT_EQ(h2.Get(), handlers3->Match(TStringBuf("/a/")).Get());
EXPECT_EQ(h2.Get(), handlers3->Match(TStringBuf("/a/b")).Get());
+
+ {
+ auto handlers = New<TRequestPathMatcher>();
+ handlers->Add("/{$}", h1);
+ handlers->Add("/a/{$}", h2);
+ handlers->Add("/a/b", h3);
+
+ EXPECT_EQ(h1.Get(), handlers->Match(TStringBuf("/")).Get());
+ EXPECT_EQ(h2.Get(), handlers->Match(TStringBuf("/a")).Get());
+ EXPECT_EQ(h2.Get(), handlers->Match(TStringBuf("/a/")).Get());
+ EXPECT_EQ(h3.Get(), handlers->Match(TStringBuf("/a/b")).Get());
+ EXPECT_FALSE(handlers->Match(TStringBuf("/a/b/")).Get());
+ EXPECT_FALSE(handlers->Match(TStringBuf("/a/c")).Get());
+ EXPECT_FALSE(handlers->Match(TStringBuf("/d")).Get());
+ }
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/misc/collection_helpers-inl.h b/yt/yt/core/misc/collection_helpers-inl.h
index fa299a05ca..85105406dc 100644
--- a/yt/yt/core/misc/collection_helpers-inl.h
+++ b/yt/yt/core/misc/collection_helpers-inl.h
@@ -208,6 +208,15 @@ auto EmplaceOrCrash(TContainer&& container, TArgs&&... args)
return it;
}
+template <class TMap, class TKey>
+auto EmplaceDefault(TMap&& map, TKey&& key)
+{
+ return map.emplace(
+ std::piecewise_construct_t{},
+ std::tuple<TKey&&>{std::forward<TKey>(key)},
+ std::tuple{});
+}
+
template <class T, class... TVariantArgs>
T& GetOrCrash(std::variant<TVariantArgs...>& variant)
{
diff --git a/yt/yt/core/misc/collection_helpers.h b/yt/yt/core/misc/collection_helpers.h
index f06740e245..6614e3a397 100644
--- a/yt/yt/core/misc/collection_helpers.h
+++ b/yt/yt/core/misc/collection_helpers.h
@@ -98,6 +98,12 @@ template <class TContainer, class... TArgs>
auto EmplaceOrCrash(TContainer&& container, TArgs&&... args);
/*!
+ * This function emplaces default value at the given key.
+ */
+template <class TMap, class TKey>
+auto EmplaceDefault(TMap&& map, TKey&& key);
+
+/*!
* This function is supposed to replace std::get<T>(variant)
* for those cases when exception should not be thrown.
*/
diff --git a/yt/yt/core/misc/range_helpers-inl.h b/yt/yt/core/misc/range_helpers-inl.h
new file mode 100644
index 0000000000..6ce1d498f1
--- /dev/null
+++ b/yt/yt/core/misc/range_helpers-inl.h
@@ -0,0 +1,59 @@
+#ifndef RANGE_HELPERS_INL_H_
+#error "Direct inclusion of this file is not allowed, include range_helpers.h"
+// For the sake of sane code completion.
+#include "range_helpers.h"
+#endif
+
+namespace NYT {
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TContainer, std::ranges::input_range TRange>
+struct TRangeTo
+{ };
+
+template <class TContainer, std::ranges::input_range TRange>
+ requires requires (TContainer container, size_t size) {
+ container.reserve(size);
+ container.push_back(std::declval<typename TContainer::value_type>());
+ }
+struct TRangeTo<TContainer, TRange>
+{
+ static auto ToContainer(TRange&& range)
+ {
+ TContainer container;
+ if constexpr (requires { std::ranges::size(range); }) {
+ container.reserve(std::ranges::size(range));
+ }
+
+ for (auto&& element : range) {
+ container.push_back(std::forward<decltype(element)>(element));
+ }
+
+ return container;
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <std::ranges::range... TContainers>
+auto ZipMutable(TContainers&&... containers) {
+ return Zip(std::ranges::views::transform(containers, [] <class T> (T&& t) {
+ return &t;
+ })...);
+}
+
+template <class TContainer, std::ranges::input_range TRange>
+auto RangeTo(TRange&& range)
+{
+ return NDetail::TRangeTo<TContainer, TRange>::ToContainer(std::forward<TRange>(range));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/yt/core/misc/range_helpers.h b/yt/yt/core/misc/range_helpers.h
new file mode 100644
index 0000000000..2de49738d2
--- /dev/null
+++ b/yt/yt/core/misc/range_helpers.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include "common.h"
+
+#include <library/cpp/iterator/zip.h>
+
+#include <ranges>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! An equivalent of Python's `zip()`, but resulting range consists of tuples
+//! of pointers and has length equal to the length of the shortest container.
+//! Implementation with mutable references depends on "lifetime extension in
+//! range-based for loops" from C++23.
+template <std::ranges::range... TRanges>
+auto ZipMutable(TRanges&&... ranges);
+
+//! Converts the provided range to the specified container.
+//! This is a simplified equivalent of std::ranges::to from range-v3.
+template <class TContainer, std::ranges::input_range TRange>
+auto RangeTo(TRange&& range);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define RANGE_HELPERS_INL_H_
+#include "range_helpers-inl.h"
+#undef RANGE_HELPERS_INL_H_
diff --git a/yt/yt/core/misc/unittests/collection_helpers_ut.cpp b/yt/yt/core/misc/unittests/collection_helpers_ut.cpp
new file mode 100644
index 0000000000..82dfb5f1df
--- /dev/null
+++ b/yt/yt/core/misc/unittests/collection_helpers_ut.cpp
@@ -0,0 +1,43 @@
+#include <yt/yt/core/test_framework/framework.h>
+
+#include <yt/yt/core/misc/collection_helpers.h>
+
+namespace NYT {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TMoveCounter
+{
+public:
+ TMoveCounter() = default;
+ TMoveCounter(const TMoveCounter&) = delete;
+ TMoveCounter& operator=(const TMoveCounter&) = delete;
+
+ TMoveCounter(TMoveCounter&&)
+ {
+ ++Moves_;
+ }
+
+ TMoveCounter& operator=(TMoveCounter&&)
+ {
+ ++Moves_;
+ return *this;
+ }
+
+ DEFINE_BYREF_RW_PROPERTY(int, Moves);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TCollectionHelpersTest, EmplaceDefault)
+{
+ THashMap<int, TMoveCounter> map;
+ EmplaceDefault(map, 0);
+ EXPECT_EQ(map[0].Moves(), 0);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT
diff --git a/yt/yt/core/misc/unittests/range_helpers_ut.cpp b/yt/yt/core/misc/unittests/range_helpers_ut.cpp
new file mode 100644
index 0000000000..19a2031034
--- /dev/null
+++ b/yt/yt/core/misc/unittests/range_helpers_ut.cpp
@@ -0,0 +1,54 @@
+#include <yt/yt/core/test_framework/framework.h>
+
+#include <yt/yt/core/misc/range_helpers.h>
+
+#include <library/cpp/yt/compact_containers/compact_vector.h>
+
+namespace NYT {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TRangeHelpersTest, ZipMutable)
+{
+ std::vector<int> vectorA(4);
+ std::vector<int> vectorB = {1, 2, 3};
+ for (auto [a, b] : ZipMutable(vectorA, vectorB)) {
+ *a = *b + 1;
+ }
+
+ auto expectedA = std::vector<int>{2, 3, 4, 0};
+ EXPECT_EQ(expectedA, vectorA);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TRangeHelpersTest, RangeToVector)
+{
+ auto data = std::vector<std::string>{"A", "B", "C", "D"};
+ auto range = std::ranges::views::transform(data, [] (std::string x) {
+ return "_" + x;
+ });
+
+ std::initializer_list<std::string> expectedValues{"_A", "_B", "_C", "_D"};
+ EXPECT_EQ(std::vector<std::string>(expectedValues), RangeTo<std::vector<std::string>>(range));
+ using TSomeCompactVector = TCompactVector<std::string, 4>;
+ EXPECT_EQ(TSomeCompactVector(expectedValues), RangeTo<TSomeCompactVector>(range));
+}
+
+TEST(TRangeHelpersTest, RangeToString)
+{
+ auto data = "_sample_"sv;
+ auto range = std::ranges::views::filter(data, [] (char x) {
+ return x != '_';
+ });
+ auto expectedData = "sample"sv;
+
+ EXPECT_EQ(std::string(expectedData), RangeTo<std::string>(range));
+ EXPECT_EQ(TString(expectedData), RangeTo<TString>(range));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT
diff --git a/yt/yt/core/misc/unittests/ya.make b/yt/yt/core/misc/unittests/ya.make
index aaae16b1df..35d367ba2c 100644
--- a/yt/yt/core/misc/unittests/ya.make
+++ b/yt/yt/core/misc/unittests/ya.make
@@ -18,6 +18,7 @@ SRCS(
callback_ut.cpp
checksum_ut.cpp
codicil_ut.cpp
+ collection_helpers_ut.cpp
concurrent_cache_ut.cpp
consistent_hashing_ut.cpp
default_map_ut.cpp
@@ -52,6 +53,7 @@ SRCS(
pool_allocator_ut.cpp
proc_ut.cpp
random_ut.cpp
+ range_helpers_ut.cpp
ref_counted_tracker_ut.cpp
ring_queue_ut.cpp
skip_list_ut.cpp
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 92b94e0274..8bb3739bd5 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -2443,6 +2443,8 @@ message TReqListJobs
optional string continuation_token = 24;
+ optional bool with_interruption_info = 26;
+
optional TMasterReadOptions master_read_options = 102;
}