From 9ea558ee3ea55df6daa3059cd641b80c6ee29c2c Mon Sep 17 00:00:00 2001 From: robot-piglet Date: Wed, 6 May 2026 15:06:33 +0300 Subject: Intermediate changes commit_hash:3e37e70e0c7a87457a3fa32235a0f54a3b1dee70 --- .../impl/default_operation_settings.yson | 5 + .../providers/yt/fmr/coordinator/impl/ut/ya.make | 2 + .../impl/ut/yql_yt_reduce_partitioner_ut.cpp | 178 +++++++ .../impl/ut/yql_yt_sorted_partitioner_base_ut.cpp | 177 +++++++ .../impl/ut/yql_yt_sorted_partitioner_base_ut.h | 51 ++ .../impl/ut/yql_yt_sorted_partitioner_ut.cpp | 185 +------ .../map/yql_yt_map_stage_operation_manager.cpp | 6 +- .../operation_manager/impl/reduce/ya.make | 11 + .../yql_yt_reduce_stage_operation_manager.cpp | 145 ++++++ .../reduce/yql_yt_reduce_stage_operation_manager.h | 9 + .../sort/yql_yt_sort_stage_operation_manager.cpp | 2 +- ...yql_yt_sorted_merge_stage_operation_manager.cpp | 2 +- .../fmr/coordinator/operation_manager/impl/ya.make | 1 + .../yql_yt_default_stage_operation_manager.cpp | 3 + .../impl/yql_yt_partition_settings_helpers.h | 9 + .../yt/fmr/coordinator/partitioner/ya.make | 2 + .../partitioner/yql_yt_reduce_partitioner.cpp | 168 +++++++ .../partitioner/yql_yt_reduce_partitioner.h | 62 +++ .../partitioner/yql_yt_sorted_partitioner.cpp | 558 ++------------------- .../partitioner/yql_yt_sorted_partitioner.h | 119 +---- .../partitioner/yql_yt_sorted_partitioner_base.cpp | 530 +++++++++++++++++++ .../partitioner/yql_yt_sorted_partitioner_base.h | 143 ++++++ .../yt_coordinator_service/file/ut/ya.make | 2 + yt/yql/providers/yt/fmr/job/impl/ya.make | 1 + .../providers/yt/fmr/job/impl/yql_yt_job_impl.cpp | 51 +- yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h | 10 + .../yt/fmr/job/impl/yql_yt_reduce_reader.cpp | 133 +++++ .../yt/fmr/job/impl/yql_yt_reduce_reader.h | 33 ++ yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h | 10 + .../providers/yt/fmr/job_factory/impl/ut/ya.make | 2 + .../yt/fmr/job_launcher/yql_yt_job_launcher.cpp | 8 +- yt/yql/providers/yt/fmr/process/ya.make | 1 + yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.cpp | 57 ++- yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h | 13 +- .../providers/yt/fmr/proto/request_options.proto | 42 +- .../proto_helpers/yql_yt_request_proto_helpers.cpp | 102 +++- .../proto_helpers/yql_yt_request_proto_helpers.h | 12 + .../fmr/request_options/yql_yt_request_options.cpp | 20 + .../fmr/request_options/yql_yt_request_options.h | 49 +- .../yql_yt_sorted_partitioner_test_tools.cpp | 1 + yt/yql/providers/yt/fmr/utils/comparator/ya.make | 5 +- .../comparator/yql_yt_binary_yson_comparator.cpp | 16 + .../comparator/yql_yt_binary_yson_comparator.h | 3 + yt/yql/providers/yt/fmr/utils/ya.make | 1 + .../yt/fmr/utils/yql_yt_parse_records.cpp | 2 +- .../providers/yt/fmr/utils/yql_yt_parse_records.h | 3 +- .../impl/yql_yt_yson_tds_block_iterator.cpp | 5 + .../impl/yql_yt_yson_tds_block_iterator.h | 2 + .../impl/yql_yt_yson_yt_block_iterator.cpp | 5 + .../impl/yql_yt_yson_yt_block_iterator.h | 2 + .../yt/fmr/yt_job_service/file/ut/ya.make | 2 + yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp | 339 +++++++++---- 52 files changed, 2362 insertions(+), 938 deletions(-) create mode 100644 yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_reduce_partitioner_ut.cpp create mode 100644 yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.cpp create mode 100644 yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.h create mode 100644 yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/ya.make create mode 100644 yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.cpp create mode 100644 yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.h create mode 100644 yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.cpp create mode 100644 yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.h create mode 100644 yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.cpp create mode 100644 yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.h create mode 100644 yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.cpp create mode 100644 yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.h diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/default_operation_settings.yson b/yt/yql/providers/yt/fmr/coordinator/impl/default_operation_settings.yson index 750b26ac23d..ef51e954180 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/default_operation_settings.yson +++ b/yt/yql/providers/yt/fmr/coordinator/impl/default_operation_settings.yson @@ -71,4 +71,9 @@ "retries_before_throw" = 5; "timeout_sec" = 300; }; + "reduce" = { + "max_data_weight_per_part" = 204288000; + "max_parts" = 64; + "max_key_size_per_part" = 10000000; + } } diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make index feb0219d42d..8ca30f57288 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make @@ -6,6 +6,8 @@ SRCS( yql_yt_ordered_partitioner_ut.cpp yql_yt_fmr_partitioner_ut.cpp yql_yt_sorted_partitioner_ut.cpp + yql_yt_sorted_partitioner_base_ut.cpp + yql_yt_reduce_partitioner_ut.cpp yql_yt_fmr_boundary_keys_ut.cpp ) diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_reduce_partitioner_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_reduce_partitioner_ut.cpp new file mode 100644 index 00000000000..63ea2c6298c --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_reduce_partitioner_ut.cpp @@ -0,0 +1,178 @@ +#include "yql_yt_sorted_partitioner_base_ut.h" + +namespace NYql::NFmr::NPartitionerTest { + +Y_UNIT_TEST_SUITE(ReducePartitionerTests) { + Y_UNIT_TEST(ReducePartitioningCases) { + const TVector cases = { + { + .Input = { + {{"A","B"}, {"C","D"}}, + }, + .Expected = { + {"TAB0-[A:B)"}, + {"TAB0-[B:D]"} + }, + .MaxWeight = 1, + .PartitionerType = EPartitionerType::Reduce, + .MaxKeySizePerPart = 1 + }, + { + .Input = { + {{"A","B"}, {"B","B"}, {"B","C"}, {"C","D"}}, + }, + .Expected = { + {"TAB0-[A:D]"}, + }, + .MaxWeight = 1000, + .PartitionerType = EPartitionerType::Reduce, + .MaxKeySizePerPart = 1 + }, + { + .Input = { + {{"A","B"}, {"B","B"}, {"B","C"}, {"C","D"}}, + }, + .Expected = { + {"TAB0-[A:B)"}, + {"TAB0-[B:D]"} + }, + .MaxWeight = 2, + .PartitionerType = EPartitionerType::Reduce, + .MaxKeySizePerPart = 1 + }, + { + .Input = { + {{"A","B"}, {"B","B"}, {"B", "B"}, {"B","C"}, {"C","D"}}, + }, + .MaxWeight = 2, + .PartitionerType = EPartitionerType::Reduce, + .MaxKeySizePerPart = 1, + .ExpectedError = "cannot partition" + }, + { + .Input = { + {{"A","B"}, {"B","C"}, {"C","D"}}, + {{"A","D"}}, + }, + .Expected = { + {"TAB0-[A:B)", "TAB1-[A:B)"}, + {"TAB0-[B:C)", "TAB1-[B:C)"}, + {"TAB0-[C:D]", "TAB1-[C:D]"}, + }, + .MaxWeight = 1, + .PartitionerType = EPartitionerType::Reduce, + .MaxKeySizePerPart = 1 + }, + { + .Input = { + {{"A","B"}, {"B","B"}, {"B","B"}, {"B","C"}}, + {{"A","D"}}, + }, + .MaxWeight = 1, + .PartitionerType = EPartitionerType::Reduce, + .MaxKeySizePerPart = 1, + .ExpectedError = "cannot partition" + }, + { + .Input = { + {{"A","B"}, {"B","B"}, {"B","B"}, {"B","C"}}, + {{"A","D"}}, + }, + .Expected = { + {"TAB0-[A:C)", "TAB1-[A:C)"}, + {"TAB0-[C:D]", "TAB1-[C:D]"}, + }, + .MaxWeight = 4, + .PartitionerType = EPartitionerType::Reduce, + .MaxKeySizePerPart = 2, + }, + { + .Input = { + {{"A","B"}, {"B","C"}}, + {{"X","Y"}}, + }, + .Expected = { + {"TAB0-[A:Y]", "TAB1-[A:Y]"}, + }, + .MaxWeight = 1000, + .PartitionerType = EPartitionerType::Reduce, + .MaxKeySizePerPart = 1 + }, + { + .Input = { + {{"A","B"}}, + {{"X","Y"}}, + }, + .Expected = { + {"TAB0-[A:B)"}, + {"TAB0-[B:Y]", "TAB1-[B:Y]"}, + }, + .MaxWeight = 1, + .PartitionerType = EPartitionerType::Reduce, + .MaxKeySizePerPart = 1, + }, + { + .Input = { + {{"A","B"}}, + {{"A","B"}}, + }, + .Expected = { + {"TAB0-[A:B]", "TAB1-[A:B]"}, + }, + .MaxWeight = 1, + .PartitionerType = EPartitionerType::Reduce, + .MaxKeySizePerPart = 1 + }, + { + .Input = { + {{"B","B"}, {"B","B"}}, + {{"A","D"}}, + }, + .Expected = { + {"TAB0-[A:D]", "TAB1-[A:D]"}, + }, + .MaxWeight = 1000, + .PartitionerType = EPartitionerType::Reduce, + .MaxKeySizePerPart = 3 + }, + { + .Input = { + {{"B","B"}, {"B","B"}}, + {{"A","D"}}, + }, + .Expected = { + {"TAB0-[A:B)", "TAB1-[A:B)"}, + {"TAB0-[B:D]", "TAB1-[B:D]"}, + }, + .MaxWeight = 2, + .PartitionerType = EPartitionerType::Reduce, + .MaxKeySizePerPart = 2 + }, + { + .Input = { + {{"A","B"}, {"B","C"}}, + {{"A","C"}, {"C","D"}}, + {{"A","D"}, {"D","E"}}, + {{"A","E"}, {"E","F"}, {"F","G"}}, + {{"A","F"}, {"F","G"}, {"G","H"}}, + }, + .Expected = { + {"TAB0-[A:B)", "TAB1-[A:B)", "TAB2-[A:B)", "TAB3-[A:B)", "TAB4-[A:B)"}, + {"TAB0-[B:C)", "TAB1-[B:C)", "TAB2-[B:C)", "TAB3-[B:C)", "TAB4-[B:C)"}, + {"TAB0-[C:D)", "TAB1-[C:D)", "TAB2-[C:D)", "TAB3-[C:D)", "TAB4-[C:D)"}, + {"TAB1-[D:E)", "TAB2-[D:E)", "TAB3-[D:E)", "TAB4-[D:E)"}, + {"TAB2-[E:F)", "TAB3-[E:F)", "TAB4-[E:F)"}, + {"TAB3-[F:G)", "TAB4-[F:G)"}, + {"TAB3-[G:H]", "TAB4-[G:H]"} + }, + .MaxWeight = 1, + .PartitionerType = EPartitionerType::Reduce, + .MaxKeySizePerPart = 1 + }, + }; + + CheckPartitionCorrectness(cases); + } +} + +} // namespace NYql::NFmr::NPartitionerTest diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.cpp new file mode 100644 index 00000000000..583766a7de7 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.cpp @@ -0,0 +1,177 @@ +#include "yql_yt_sorted_partitioner_base_ut.h" + +namespace NYql::NFmr::NPartitionerTest { + +NYT::TNode Key(ui64 v) { + NYT::TNode m = NYT::TNode::CreateMap(); + m["k"] = static_cast(v); + return m; +} + +TChunkStats MakeSortedChunk(ui64 weight, ui64 firstK, ui64 lastK) { + TSortedChunkStats s; + s.IsSorted = true; + s.FirstRowKeys = Key(firstK); + s.LastRowKeys = Key(lastK); + return TChunkStats{.Rows = 1, .DataWeight = weight, .SortedChunkStats = s}; +} + +NYT::TNode Key(const TString& v) { + NYT::TNode m = NYT::TNode::CreateMap(); + m["k"] = v; + return m; +} + +TChunkStats MakeSortedChunk(ui64 weight, const TString& firstK, const TString& lastK) { + TSortedChunkStats s; + s.IsSorted = true; + s.FirstRowKeys = Key(firstK); + s.LastRowKeys = Key(lastK); + return TChunkStats{.Rows = 1, .DataWeight = weight, .SortedChunkStats = s}; +} + +void AssertTaskHasRangesForTable( + const TTaskTableInputRef& task, + const TString& tableId, + size_t expectedRangesCount, + bool expectLowerKey, + bool expectUpperKey +) { + for (const auto& input : task.Inputs) { + if (auto* fmrInput = std::get_if(&input)) { + if (fmrInput->TableId == tableId) { + UNIT_ASSERT_VALUES_EQUAL(fmrInput->TableRanges.size(), expectedRangesCount); + UNIT_ASSERT_VALUES_EQUAL(fmrInput->FirstRowKeys.Defined(), expectLowerKey); + UNIT_ASSERT_VALUES_EQUAL(fmrInput->LastRowKeys.Defined(), expectUpperKey); + if (expectLowerKey) { + UNIT_ASSERT_C(fmrInput->IsFirstRowInclusive.Defined(), "IsFirstRowInclusive must be set when FirstRowKeys is set"); + } + return; + } + } + } + UNIT_FAIL("Table not found in task inputs: " + tableId); +} + +TString ExtractKeyValueK(const TString& ysonRow) { + const NYT::TNode n = NYT::NodeFromYsonString(ysonRow); + return n["k"].AsString(); +} + +TVector> NormalizeTasksAsProtoStrings(const std::vector& tasks, size_t tablesCount) { + TVector> out; + out.reserve(tasks.size()); + + for (const auto& task : tasks) { + TVector parts; + parts.reserve(tablesCount); + + for (size_t tableIdx = 0; tableIdx < tablesCount; ++tableIdx) { + const TString tableId = TStringBuilder() << "c.t" << tableIdx; + + const TFmrTableInputRef* ref = nullptr; + for (const auto& input : task.Inputs) { + const auto* fmr = std::get_if(&input); + if (!fmr) { + continue; + } + if (fmr->TableId == tableId) { + ref = fmr; + break; + } + } + if (!ref) { + continue; + } + + const bool leftInclusive = ref->IsFirstRowInclusive.Defined() ? *ref->IsFirstRowInclusive : true; + const bool rightInclusive = ref->IsLastRowInclusive.Defined() ? *ref->IsLastRowInclusive : true; + const TString left = ref->FirstRowKeys ? ExtractKeyValueK(*ref->FirstRowKeys) : TString(); + const TString right = ref->LastRowKeys ? ExtractKeyValueK(*ref->LastRowKeys) : TString(); + + TStringBuilder s; + s << "TAB" << tableIdx << "-" << (leftInclusive ? "[" : "(") << left << ":" << right << (rightInclusive ? "]" : ")"); + parts.push_back(s); + } + + out.push_back(std::move(parts)); + } + + return out; +} + +TString DumpTasks(const TVector>& tasks) { + TStringBuilder b; + b << "["; + for (size_t i = 0; i < tasks.size(); ++i) { + if (i) { + b << ", "; + } + b << "["; + for (size_t j = 0; j < tasks[i].size(); ++j) { + if (j) { + b << ", "; + } + b << tasks[i][j]; + } + b << "]"; + } + b << "]"; + return b; +} + +void CheckPartitionCorrectness(const std::vector& cases) { + for (size_t caseIdx = 0; caseIdx < cases.size(); ++caseIdx) { + const auto& c = cases[caseIdx]; + + std::unordered_map> partIdsForTables; + std::unordered_map> partIdStats; + TVector inputTables; + inputTables.reserve(c.Input.size()); + + for (size_t tableIdx = 0; tableIdx < c.Input.size(); ++tableIdx) { + TFmrTableId tableId("c", TStringBuilder() << "t" << tableIdx); + TFmrTableRef ref{.FmrTableId = tableId}; + inputTables.push_back(ref); + + const TString partId = TStringBuilder() << "p" << tableIdx; + partIdsForTables[tableId] = {partId}; + + auto& stats = partIdStats[partId]; + stats.reserve(c.Input[tableIdx].size()); + for (const auto& interval : c.Input[tableIdx]) { + stats.push_back(MakeSortedChunk(1, interval.first, interval.second)); + } + } + + TFmrPartitionerSettings fmrPartitionSettings {.MaxDataWeightPerPart = c.MaxWeight, .MaxParts = 1000}; + TSortingColumns keyColumns{.Columns = {"k"}, .SortOrders = {ESortOrder::Ascending}}; + + TSortedPartitionerBase::TPtr partitioner; + if (c.PartitionerType == EPartitionerType::Sort) { + TSortedPartitionSettings settings; + settings.FmrPartitionSettings = fmrPartitionSettings; + partitioner = MakeIntrusive(partIdsForTables, partIdStats, keyColumns, settings); + } else { + TReducePartitionSettings settings; + settings.FmrPartitionSettings = fmrPartitionSettings; + settings.MaxKeySizePerPart = c.MaxKeySizePerPart; + partitioner = MakeIntrusive(partIdsForTables, partIdStats, keyColumns, settings); + } + + auto [tasks, error] = partitioner->PartitionTablesIntoTasks(inputTables); + if (error) { + YQL_ENSURE(!c.ExpectedError.empty(), "Partitioner fell with unexpected error " << error->ErrorMessage); + UNIT_ASSERT(error->ErrorMessage.Contains(c.ExpectedError)); + } else { + const auto actual = NormalizeTasksAsProtoStrings(tasks, c.Input.size()); + UNIT_ASSERT_VALUES_EQUAL_C( + DumpTasks(actual), + DumpTasks(c.Expected), + TStringBuilder() << "caseIdx=" << caseIdx + ); + } + } +} + +} // namespace NYql::NFmr::NPartitionerTest diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.h b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.h new file mode 100644 index 00000000000..1f96f3e253d --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.h @@ -0,0 +1,51 @@ +#pragma once + +#include + +#include +#include +#include + +#include + +namespace NYql::NFmr::NPartitionerTest { + +NYT::TNode Key(ui64 v); + +TChunkStats MakeSortedChunk(ui64 weight, ui64 firstK, ui64 lastK); + +NYT::TNode Key(const TString& v); + +TChunkStats MakeSortedChunk(ui64 weight, const TString& firstK, const TString& lastK); + +void AssertTaskHasRangesForTable( + const TTaskTableInputRef& task, + const TString& tableId, + size_t expectedRangesCount, + bool expectLowerKey, + bool expectUpperKey +); + +TString ExtractKeyValueK(const TString& ysonRow); + +TVector> NormalizeTasksAsProtoStrings(const std::vector& tasks, size_t tablesCount); + +TString DumpTasks(const TVector>& tasks); + +enum EPartitionerType { + Sort, + Reduce +}; + +struct TCase { + TVector>> Input; + TVector> Expected; + ui64 MaxWeight = 1; + EPartitionerType PartitionerType = EPartitionerType::Sort; + ui64 MaxKeySizePerPart = 1; + TString ExpectedError; +}; + +void CheckPartitionCorrectness(const std::vector& cases); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_ut.cpp index 07b95d19cbf..a5a08201f3e 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_ut.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_ut.cpp @@ -1,132 +1,6 @@ -#include +#include "yql_yt_sorted_partitioner_base_ut.h" -#include -#include - -#include - -namespace NYql::NFmr { - -namespace { - -NYT::TNode Key(ui64 v) { - NYT::TNode m = NYT::TNode::CreateMap(); - m["k"] = static_cast(v); - return m; -} - -TChunkStats MakeSortedChunk(ui64 weight, ui64 firstK, ui64 lastK) { - TSortedChunkStats s; - s.IsSorted = true; - s.FirstRowKeys = Key(firstK); - s.LastRowKeys = Key(lastK); - return TChunkStats{.Rows = 1, .DataWeight = weight, .SortedChunkStats = s}; -} - -NYT::TNode Key(const TString& v) { - NYT::TNode m = NYT::TNode::CreateMap(); - m["k"] = v; - return m; -} - -TChunkStats MakeSortedChunk(ui64 weight, const TString& firstK, const TString& lastK) { - TSortedChunkStats s; - s.IsSorted = true; - s.FirstRowKeys = Key(firstK); - s.LastRowKeys = Key(lastK); - return TChunkStats{.Rows = 1, .DataWeight = weight, .SortedChunkStats = s}; -} - -void AssertTaskHasRangesForTable( - const TTaskTableInputRef& task, - const TString& tableId, - size_t expectedRangesCount, - bool expectLowerKey, - bool expectUpperKey -) { - for (const auto& input : task.Inputs) { - if (auto* fmrInput = std::get_if(&input)) { - if (fmrInput->TableId == tableId) { - UNIT_ASSERT_VALUES_EQUAL(fmrInput->TableRanges.size(), expectedRangesCount); - UNIT_ASSERT_VALUES_EQUAL(fmrInput->FirstRowKeys.Defined(), expectLowerKey); - UNIT_ASSERT_VALUES_EQUAL(fmrInput->LastRowKeys.Defined(), expectUpperKey); - if (expectLowerKey) { - UNIT_ASSERT_C(fmrInput->IsFirstRowInclusive.Defined(), "IsFirstRowInclusive must be set when FirstRowKeys is set"); - } - return; - } - } - } - UNIT_FAIL("Table not found in task inputs: " + tableId); -} - -TString ExtractKeyValueK(const TString& ysonRow) { - const NYT::TNode n = NYT::NodeFromYsonString(ysonRow); - return n["k"].AsString(); -} - -TVector> NormalizeTasksAsProtoStrings(const std::vector& tasks, size_t tablesCount) { - TVector> out; - out.reserve(tasks.size()); - - for (const auto& task : tasks) { - TVector parts; - parts.reserve(tablesCount); - - for (size_t tableIdx = 0; tableIdx < tablesCount; ++tableIdx) { - const TString tableId = TStringBuilder() << "c.t" << tableIdx; - - const TFmrTableInputRef* ref = nullptr; - for (const auto& input : task.Inputs) { - const auto* fmr = std::get_if(&input); - if (!fmr) { - continue; - } - if (fmr->TableId == tableId) { - ref = fmr; - break; - } - } - if (!ref) { - continue; - } - - const bool inclusive = ref->IsFirstRowInclusive.Defined() ? *ref->IsFirstRowInclusive : true; - const TString left = ref->FirstRowKeys ? ExtractKeyValueK(*ref->FirstRowKeys) : TString(); - const TString right = ref->LastRowKeys ? ExtractKeyValueK(*ref->LastRowKeys) : TString(); - - TStringBuilder s; - s << "TAB" << tableIdx << "-" << (inclusive ? "[" : "(") << left << ":" << right << "]"; - parts.push_back(s); - } - - out.push_back(std::move(parts)); - } - - return out; -} - -TString DumpTasks(const TVector>& tasks) { - TStringBuilder b; - b << "["; - for (size_t i = 0; i < tasks.size(); ++i) { - if (i) { - b << ", "; - } - b << "["; - for (size_t j = 0; j < tasks[i].size(); ++j) { - if (j) { - b << ", "; - } - b << tasks[i][j]; - } - b << "]"; - } - b << "]"; - return b; -} - -} // namespace +namespace NYql::NFmr::NPartitionerTest { Y_UNIT_TEST_SUITE(SortedPartitionerTests) { Y_UNIT_TEST(SplitsIntoKeyAlignedTasksAndSetsBounds) { @@ -151,7 +25,7 @@ Y_UNIT_TEST_SUITE(SortedPartitionerTests) { TSortingColumns keyColumns{.Columns = {"k"}, .SortOrders = {ESortOrder::Ascending}}; TSortedPartitioner partitioner(partIdsForTables, partIdStats, keyColumns, settings); - auto [tasks, error] = partitioner.PartitionTablesIntoTasksSorted({r1, r2}); + auto [tasks, error] = partitioner.PartitionTablesIntoTasks({r1, r2}); UNIT_ASSERT(!error); UNIT_ASSERT(!tasks.empty()); @@ -177,7 +51,7 @@ Y_UNIT_TEST_SUITE(SortedPartitionerTests) { TSortedPartitioner partitioner(partIdsForTables, partIdStats, keyColumns, settings); UNIT_ASSERT_EXCEPTION_CONTAINS( - partitioner.PartitionTablesIntoTasksSorted({r1}), + partitioner.PartitionTablesIntoTasks({r1}), yexception, "at least one partition"); } @@ -199,7 +73,7 @@ Y_UNIT_TEST_SUITE(SortedPartitionerTests) { TSortedPartitioner partitioner(partIdsForTables, partIdStats, keyColumns, settings); UNIT_ASSERT_EXCEPTION_CONTAINS( - partitioner.PartitionTablesIntoTasksSorted({r1}), + partitioner.PartitionTablesIntoTasks({r1}), yexception, "at least one chunk"); } @@ -235,7 +109,7 @@ Y_UNIT_TEST_SUITE(SortedPartitionerTests) { TSortingColumns sortingColumns{.Columns = keyColumns, .SortOrders = {ESortOrder::Ascending}}; TSortedPartitioner partitioner(partIdsForTables, partIdStats, sortingColumns, settings); - auto [tasks, error] = partitioner.PartitionTablesIntoTasksSorted({refTable1, refTable2}); + auto [tasks, error] = partitioner.PartitionTablesIntoTasks({refTable1, refTable2}); UNIT_ASSERT_C(!error, "Partitioner returned non-ok"); UNIT_ASSERT_C(!tasks.empty(), "Partitioner returned no tasks"); @@ -251,12 +125,6 @@ Y_UNIT_TEST_SUITE(SortedPartitionerTests) { } Y_UNIT_TEST(PrototypeLikePartitioningCases) { - struct TCase { - TVector>> Input; - TVector> Expected; - ui64 MaxWeight = 1; - }; - const TVector cases = { { .Input = { @@ -380,45 +248,8 @@ Y_UNIT_TEST_SUITE(SortedPartitionerTests) { }, }; - for (size_t caseIdx = 0; caseIdx < cases.size(); ++caseIdx) { - const auto& c = cases[caseIdx]; - - std::unordered_map> partIdsForTables; - std::unordered_map> partIdStats; - TVector inputTables; - inputTables.reserve(c.Input.size()); - - for (size_t tableIdx = 0; tableIdx < c.Input.size(); ++tableIdx) { - TFmrTableId tableId("c", TStringBuilder() << "t" << tableIdx); - TFmrTableRef ref{.FmrTableId = tableId}; - inputTables.push_back(ref); - - const TString partId = TStringBuilder() << "p" << tableIdx; - partIdsForTables[tableId] = {partId}; - - auto& stats = partIdStats[partId]; - stats.reserve(c.Input[tableIdx].size()); - for (const auto& interval : c.Input[tableIdx]) { - stats.push_back(MakeSortedChunk(1, interval.first, interval.second)); - } - } - - TSortedPartitionSettings settings; - settings.FmrPartitionSettings = {.MaxDataWeightPerPart = c.MaxWeight, .MaxParts = 1000}; - TSortingColumns keyColumns{.Columns = {"k"}, .SortOrders = {ESortOrder::Ascending}}; - - TSortedPartitioner partitioner(partIdsForTables, partIdStats, keyColumns, settings); - auto [tasks, error] = partitioner.PartitionTablesIntoTasksSorted(inputTables); - UNIT_ASSERT_C(!error, "Partitioner returned non-ok"); - - const auto actual = NormalizeTasksAsProtoStrings(tasks, c.Input.size()); - UNIT_ASSERT_VALUES_EQUAL_C( - DumpTasks(actual), - DumpTasks(c.Expected), - TStringBuilder() << "caseIdx=" << caseIdx - ); - } + CheckPartitionCorrectness(cases); } } -} // namespace NYql::NFmr +} // namespace NYql::NFmr::NPartitionerTest diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/map/yql_yt_map_stage_operation_manager.cpp b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/map/yql_yt_map_stage_operation_manager.cpp index f35c76b4dc3..80bfe0894d0 100644 --- a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/map/yql_yt_map_stage_operation_manager.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/map/yql_yt_map_stage_operation_manager.cpp @@ -24,7 +24,7 @@ public: const auto& partIdStats = context.PartIdStats; auto ytCoordinatorService = context.YtCoordinatorService; - if (operationParams.IsOrdered) { + if (operationParams.MapJobType == EFmrJobType::OrderedMap) { // Ordered map -> ordered partition auto orderedPartitionerSettings = GetOrderedPartitionerSettings(fmrOperationSpec); auto orderedPartitioner = TOrderedPartitioner(partIdsForTables, partIdStats, orderedPartitionerSettings); @@ -56,7 +56,7 @@ public: ) override { const auto& mapOperationParams = std::get(context.OperationParams); - if (mapOperationParams.IsOrdered) { + if (mapOperationParams.MapJobType == EFmrJobType::OrderedMap) { YQL_CLOG(INFO, FastMapReduce) << "Starting Ordered Map operation"; } else { YQL_CLOG(INFO, FastMapReduce) << "Starting Map operation"; @@ -80,7 +80,7 @@ public: mapTaskParams.Output = fmrTableOutputRefs; mapTaskParams.SerializedMapJobState = mapOperationParams.SerializedMapJobState; - mapTaskParams.IsOrdered = mapOperationParams.IsOrdered; + mapTaskParams.MapJobType = mapOperationParams.MapJobType; generatedTasks.push_back(TGeneratedTaskInfo{ .TaskType = ETaskType::Map, diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/ya.make b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/ya.make new file mode 100644 index 00000000000..53d3ad22d36 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + yql_yt_reduce_stage_operation_manager.cpp +) + +PEERDIR( + yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/base +) + +END() diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.cpp b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.cpp new file mode 100644 index 00000000000..70922eb6747 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.cpp @@ -0,0 +1,145 @@ + +#include "yql_yt_reduce_stage_operation_manager.h" + +#include +#include +#include + +namespace NYql::NFmr { + +namespace { + +class TReduceStageOperationManager: public TFmrStageOperationManagerBase { +public: + TReduceStageOperationManager(TIntrusivePtr randomProvider) + : TFmrStageOperationManagerBase(randomProvider) + { + } + + TPartitionResult PartitionOperationImpl(const TPrepareOperationStageContext& context) final { + const auto& operationParams = std::get(context.OperationParams); + const auto& fmrOperationSpec = context.FmrOperationSpec; + const auto& partIdsForTables = context.PartIdsForTables; + const auto& partIdStats = context.PartIdStats; + + TSortingColumns reduceBy = operationParams.ReduceOperationSpec.ReduceBy; + TSortingColumns sortBy = operationParams.ReduceOperationSpec.SortBy; + + + auto reduceParitionSettings = GetReducePartitionSettings(fmrOperationSpec); + auto reducePartitioner = TReducePartitioner(partIdsForTables, partIdStats, reduceBy, reduceParitionSettings); + + std::vector inputTables = operationParams.Input; + + return reducePartitioner.PartitionTablesIntoTasks(inputTables); + } + + TGenerateTasksResult GenerateTasksImpl(const TGenerateTasksContext& context) final { + const auto& reduceOperationParams = std::get(context.OperationParams); + + YQL_CLOG(INFO, FastMapReduce) << "Starting Reduce operation"; + + TGenerateTasksResult result; + std::vector generatedTasks; + for (auto& task: context.PartitionResult.TaskInputs) { + TReduceTaskParams reduceTaskParams; + reduceTaskParams.Input = task; + + std::vector fmrTableOutputRefs; + std::transform(reduceOperationParams.Output.begin(), reduceOperationParams.Output.end(), std::back_inserter(fmrTableOutputRefs), [] (const TFmrTableRef& fmrTableRef) { + return TFmrTableOutputRef(fmrTableRef); + }); + + TString newPartId = GenerateId(); + for (auto& fmrTableOutputRef: fmrTableOutputRefs) { + fmrTableOutputRef.PartId = newPartId; + result.PartIdsToUpdate[fmrTableOutputRef.TableId].emplace_back(newPartId); + } + + reduceTaskParams.Output = fmrTableOutputRefs; + reduceTaskParams.SerializedReduceJobState = reduceOperationParams.SerializedReduceJobState; + reduceTaskParams.ReduceOperationSpec = reduceOperationParams.ReduceOperationSpec; + + generatedTasks.push_back(TGeneratedTaskInfo{ + .TaskType = ETaskType::Reduce, + .TaskParams = std::move(reduceTaskParams) + }); + } + result.Tasks = std::move(generatedTasks); + + return result; + } + + TGetNewPartIdsForTaskResult GetNewPartIdsForTask(const TGetNewPartIdsForTaskContext& context) final { + TGetNewPartIdsForTaskResult result; + TReduceTaskParams& reduceTaskParams = std::get(context.Task->TaskParams); + + for (auto& fmrTableOutputRef: reduceTaskParams.Output) { + if (fmrTableOutputRef.PartId.empty()) { + return {.Error = TFmrError{ + .Component = EFmrComponent::Coordinator, + .Reason = EFmrErrorReason::RestartQuery, + .ErrorMessage = "Reduce task has empty output PartId", + .TaskId = context.TaskId, + .OperationId = context.OperationId + }}; + } + TString tableId = fmrTableOutputRef.TableId; + TString partId = fmrTableOutputRef.PartId; + + const auto& partIdsIter = context.PartIdsForTables.find(tableId); + if (partIdsIter == context.PartIdsForTables.end()) { + return {.Error = TFmrError{ + .Component = EFmrComponent::Coordinator, + .Reason = EFmrErrorReason::RestartQuery, + .ErrorMessage = "Reduce task output PartId is missing in coordinator part list", + .TaskId = context.TaskId, + .OperationId = context.OperationId + }}; + } + + const auto& partIds = partIdsIter->second; + if (std::find(partIds.begin(), partIds.end(), partId) == partIds.end()) { + return {.Error = TFmrError{ + .Component = EFmrComponent::Coordinator, + .Reason = EFmrErrorReason::RestartQuery, + .ErrorMessage = "Reduce task output PartId is missing in coordinator part list", + .TaskId = context.TaskId, + .OperationId = context.OperationId + }}; + } + } + + return result; + } + + std::vector GetPartIdsForTask(const GetPartIdsForTaskContext& context) final { + std::vector groupsToClear; + TReduceTaskParams& reduceTaskParams = std::get(context.Task->TaskParams); + for (auto& fmrTableOutputRef: reduceTaskParams.Output) { + TString tableId = fmrTableOutputRef.TableId; + if (!fmrTableOutputRef.PartId.empty() && context.PartIdStats.contains(fmrTableOutputRef.PartId)) { + auto prevPartId = fmrTableOutputRef.PartId; + groupsToClear.emplace_back(tableId, prevPartId); + } + } + return groupsToClear; + } + + std::vector GetExpectedOutputTableIds(const TOperationParams& params) const override { + const auto& reduceParams = std::get(params); + std::vector ids; + for (const auto& output : reduceParams.Output) { + ids.emplace_back(output.FmrTableId.Id); + } + return ids; + } +}; + +} // namespace + +IFmrStageOperationManager::TPtr MakeReduceStageOperationManager(TIntrusivePtr randomProvider) { + return MakeIntrusive(randomProvider); +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.h b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.h new file mode 100644 index 00000000000..b4055192897 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.h @@ -0,0 +1,9 @@ +#pragma once + +#include + +namespace NYql::NFmr { + +IFmrStageOperationManager::TPtr MakeReduceStageOperationManager(TIntrusivePtr randomProvider); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sort/yql_yt_sort_stage_operation_manager.cpp b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sort/yql_yt_sort_stage_operation_manager.cpp index 280225a7527..1d8c3c3a85d 100644 --- a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sort/yql_yt_sort_stage_operation_manager.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sort/yql_yt_sort_stage_operation_manager.cpp @@ -271,7 +271,7 @@ private: auto sortedPartitioner = TSortedPartitioner(partIdsForTables, partIdStats, sortingColumns, sortedPartitionerSettings); - return PartitionInputTablesIntoTasksSorted(inputTables, sortedPartitioner); + return sortedPartitioner.PartitionTablesIntoTasks(inputTables); } TPartitionResult DoUnorderedPartition(const TPrepareOperationStageContext& context) { diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_merge/yql_yt_sorted_merge_stage_operation_manager.cpp b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_merge/yql_yt_sorted_merge_stage_operation_manager.cpp index dd1b0c8ab1f..bf53e953e0f 100644 --- a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_merge/yql_yt_sorted_merge_stage_operation_manager.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_merge/yql_yt_sorted_merge_stage_operation_manager.cpp @@ -31,7 +31,7 @@ public: auto sortedPartitioner = TSortedPartitioner(partIdsForTables, partIdStats, sortingColumns, sortedPartitionerSettings); std::vector inputTables = operationParams.Input; - return PartitionInputTablesIntoTasksSorted(inputTables, sortedPartitioner); + return sortedPartitioner.PartitionTablesIntoTasks(inputTables); } TGenerateTasksResult GenerateTasksImpl(const TGenerateTasksContext& context) final { diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/ya.make b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/ya.make index cdfe66eb89c..0538e9e90dd 100644 --- a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/ya.make +++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/ya.make @@ -12,6 +12,7 @@ PEERDIR( yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/merge yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_merge yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/map + yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_upload yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sort yql/essentials/utils diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_default_stage_operation_manager.cpp b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_default_stage_operation_manager.cpp index 8a95c23e4c5..b6a0af0537c 100644 --- a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_default_stage_operation_manager.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_default_stage_operation_manager.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -29,6 +30,8 @@ IFmrStageOperationManager::TPtr MakeStageOperationManager(EOperationType operati return MakeSortedUploadStageOperationManager(randomProvider); case EOperationType::Sort: return MakeSortStageOperationManager(randomProvider); + case EOperationType::Reduce: + return MakeReduceStageOperationManager(randomProvider); default: ythrow yexception() << "Unknown operation type for stage operation manager"; } diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_partition_settings_helpers.h b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_partition_settings_helpers.h index 5fcb6358529..598a24649ca 100644 --- a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_partition_settings_helpers.h +++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_partition_settings_helpers.h @@ -4,6 +4,7 @@ #include #include #include +#include #include namespace NYql::NFmr { @@ -55,6 +56,14 @@ inline TSortedPartitionSettings GetSortedPartitionerSettings(const NYT::TNode& f return settings; } +inline TReducePartitionSettings GetReducePartitionSettings(const NYT::TNode& fmrOperationSpec) { + TReducePartitionSettings settings; + settings.FmrPartitionSettings.MaxDataWeightPerPart = fmrOperationSpec["reduce"]["max_data_weight_per_part"].AsInt64(); + settings.FmrPartitionSettings.MaxParts = fmrOperationSpec["reduce"]["max_parts"].AsInt64(); + settings.MaxKeySizePerPart = fmrOperationSpec["reduce"]["max_key_size_per_part"].AsInt64(); + return settings; +} + inline TFmrResourceTasksResult PartitionFmrResourcesIntoTasks( const std::vector& fmrResources, const NYT::TNode& fmrOperationSpec, diff --git a/yt/yql/providers/yt/fmr/coordinator/partitioner/ya.make b/yt/yql/providers/yt/fmr/coordinator/partitioner/ya.make index db85f67241e..399f68d520f 100644 --- a/yt/yql/providers/yt/fmr/coordinator/partitioner/ya.make +++ b/yt/yql/providers/yt/fmr/coordinator/partitioner/ya.make @@ -3,7 +3,9 @@ LIBRARY() SRCS( yql_yt_fmr_partitioner.cpp yql_yt_ordered_partitioner.cpp + yql_yt_reduce_partitioner.cpp yql_yt_sorted_partitioner.cpp + yql_yt_sorted_partitioner_base.cpp ) PEERDIR( diff --git a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.cpp b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.cpp new file mode 100644 index 00000000000..49dbafecd7b --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.cpp @@ -0,0 +1,168 @@ +#include "yql_yt_reduce_partitioner.h" + +#include "yql_yt_sorted_partitioner_base.h" + +#include + +namespace NYql::NFmr { + +TReducePartitioner::TReducePartitioner( + const std::unordered_map>& partIdsForTables, + const std::unordered_map>& partIdStats, + const TSortingColumns& reduceBy, + const TReducePartitionSettings& settings +) + : TSortedPartitionerBase(partIdsForTables, partIdStats, reduceBy, settings.FmrPartitionSettings) + , ReduceBy_(reduceBy) + , Settings_(settings) +{ + YQL_ENSURE(Settings_.MaxKeySizePerPart <= Settings_.FmrPartitionSettings.MaxDataWeightPerPart); +} + +TFmrTableKeysRange TReducePartitioner::GetReadRangeFromSlices(const std::vector& slices, bool isLastRange) { + YQL_ENSURE(!slices.empty()); + TFmrTableKeysRange taskRange{.IsEmpty = false}; + auto minSliceReadRange = slices[0].RangeForRead, maxSliceReadRange = slices[slices.size() - 1].RangeForRead; + YQL_ENSURE(minSliceReadRange.FirstKeysBound.Defined()); + YQL_ENSURE(maxSliceReadRange.LastKeysBound.Defined()); + + auto taskRangeLeftBorder = *minSliceReadRange.FirstKeysBound; + bool taskRangeLeftBorderInclusive = minSliceReadRange.IsFirstBoundInclusive; + if (LeftBoundary_.Defined()) { + taskRangeLeftBorder = *LeftBoundary_; + taskRangeLeftBorderInclusive = true; + } + + auto taskRangeRightBorder = *maxSliceReadRange.LastKeysBound; + bool taskRangeRightBorderInclusive = maxSliceReadRange.IsLastBoundInclusive; + LeftBoundary_ = Nothing(); + if (maxSliceReadRange.IsLastBoundInclusive) { + // don't take last key if inclusive, because we need to guarantee that all reduce keys are in the same job. + LeftBoundary_ = *maxSliceReadRange.LastKeysBound; + if (!isLastRange) { + taskRangeRightBorderInclusive = false; + } + } + + YQL_ENSURE(taskRangeLeftBorderInclusive, " task range left border should always be true for reduce paritioner"); + + if (taskRangeLeftBorder == taskRangeRightBorder && !taskRangeRightBorderInclusive) { + ythrow yexception() << "Key " << taskRangeLeftBorder.Row << " takes all slice and is too large, cannot partition"; + } + + taskRange.SetFirstKeysBound(taskRangeLeftBorder, taskRangeLeftBorderInclusive); + taskRange.SetLastKeysBound(taskRangeRightBorder, taskRangeRightBorderInclusive); + + std::unordered_map> chunksByTable; + std::unordered_map> seenChunksByTable; + + for (const auto& slice : slices) { + for (const auto& [tableId, sliceChunks] : slice.ChunksByTable) { + auto& out = chunksByTable[tableId]; + auto& seen = seenChunksByTable[tableId]; + for (const auto& chunk : sliceChunks) { + if (chunk.KeyRange.LastKeysBound != taskRangeRightBorder || !chunk.KeyRange.IsLastBoundInclusive) { + continue; + } + + TString chunkKey = TStringBuilder() << chunk.PartId << "#" << chunk.ChunkIndex; + if (!seen.insert(chunkKey).second) { + continue; + } + out.push_back(chunk); + } + } + } + LeftBoundaryChunks_.push(chunksByTable); + + return taskRange; +} + +void TReducePartitioner::CheckMaxKeySizePerSlices(const std::vector& slices) { + ui64 maxKeySizePerPart = Settings_.MaxKeySizePerPart; + YQL_ENSURE(!slices.empty()); + TMaybe curLeftBorder; + ui64 currentFilledSlicesWeight = CurrentLastKeyWeight_; + for (const auto& slice: slices) { + if (!curLeftBorder.Defined()) { + curLeftBorder = slice.RangeForRead.FirstKeysBound; + } + YQL_ENSURE(curLeftBorder.Defined()); + YQL_ENSURE(slice.RangeForRead.LastKeysBound.Defined()); + auto curRightBorder = *slice.RangeForRead.LastKeysBound; + + if (*curLeftBorder != curRightBorder) { + if (currentFilledSlicesWeight > maxKeySizePerPart) { + ythrow yexception() << "Key " << curLeftBorder->Row << " weighs at least " << currentFilledSlicesWeight << " bytes which is larget than maxKeySizePerPart - " << maxKeySizePerPart << " - cannot partition"; + } + curLeftBorder = Nothing(); + currentFilledSlicesWeight = 0; + } else { + currentFilledSlicesWeight += slice.Weight; + } + } + if (currentFilledSlicesWeight > maxKeySizePerPart) { + ythrow yexception() << "Key " << curLeftBorder->Row << " weighs at least " << currentFilledSlicesWeight << " bytes which is larget than maxKeySizePerPart - " << maxKeySizePerPart; + } + CurrentLastKeyWeight_ = currentFilledSlicesWeight; +} + +void TReducePartitioner::ChangeLeftKeyBoundaryIfNeeded( + TFmrTableKeysBoundary&, + bool& isLeftInclusive, + const TPartitionerFilterBoundary& +) { + isLeftInclusive = true; +} + +void TReducePartitioner::ChangeRightKeyBoundaryIfNeeded(TFmrTableKeysBoundary& rightKey, const TFmrTableKeysBoundary& taskRangeLastKey) { + rightKey = taskRangeLastKey; +} + +void TReducePartitioner::ExtendChunksPerTable(std::unordered_map>& chunksByTable) { + if (LeftBoundaryChunks_.size() < 2) { + return; + } + + auto leftBoundaryChunks = LeftBoundaryChunks_.front(); + LeftBoundaryChunks_.pop(); + if (leftBoundaryChunks.empty()) { + return; + } + + for (auto& [tableId, chunks]: leftBoundaryChunks) { + if (!chunksByTable.contains(tableId)) { + chunksByTable[tableId] = chunks; + } else { + // extending current chunks for table with all other non-intersecting elements from LeftBoundaryChunks_. + auto currentChunksByTable = chunksByTable[tableId]; + std::unordered_set chunkKeys; + std::vector neededChunks; + for (auto& chunk: currentChunksByTable) { + TString chunkKey = TStringBuilder() << chunk.PartId << "#" << chunk.ChunkIndex; + chunkKeys.emplace(chunkKey); + } + for (auto& chunk: chunks) { + auto curChunkKey = TStringBuilder() << chunk.PartId << "#" << chunk.ChunkIndex; + if (!chunkKeys.contains(curChunkKey)) { + neededChunks.emplace_back(chunk); + chunkKeys.emplace(curChunkKey); + } + } + // since all needed chunks are from left boundary, write them to beginning of existing chunks. + neededChunks.insert(neededChunks.end(), currentChunksByTable.begin(), currentChunksByTable.end()); + chunksByTable[tableId] = neededChunks; + } + } +} + +TTaskTableInputRef TReducePartitioner::CreateTaskInputFromSlices( + const std::vector& slices, + const std::vector& inputTables, + bool isLastRange +) { + CheckMaxKeySizePerSlices(slices); + return CreateTaskInputFromSlicesImpl(slices, inputTables, isLastRange); +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.h b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.h new file mode 100644 index 00000000000..d717d4dc0de --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.h @@ -0,0 +1,62 @@ +#pragma once + +#include "yql_yt_sorted_partitioner_base.h" + +#include +#include +#include +#include +#include +#include + +#include + +namespace NYql::NFmr { + +struct TReducePartitionSettings { + TFmrPartitionerSettings FmrPartitionSettings; + ui64 MaxKeySizePerPart = 0; +}; + +class TReducePartitioner: public TSortedPartitionerBase { +public: + TReducePartitioner( + const std::unordered_map>& partIdsForTables, + const std::unordered_map>& partIdStats, + const TSortingColumns& reduceBy, + const TReducePartitionSettings& settings + ); + +private: + TPartitionResult PartitionFmrTables(const std::vector& inputTables); + + TTaskTableInputRef CreateTaskInputFromSlices( + const std::vector& slices, + const std::vector& inputTables, + bool isLastRange + ) override; + + TFmrTableKeysRange GetReadRangeFromSlices(const std::vector& slices, bool isLastRange) override; + + void CheckMaxKeySizePerSlices(const std::vector& slices); + + void ChangeLeftKeyBoundaryIfNeeded( + TFmrTableKeysBoundary& leftKey, + bool& isLeftInclusive, + const TPartitionerFilterBoundary& filterBoundary + ) override; + + void ChangeRightKeyBoundaryIfNeeded(TFmrTableKeysBoundary& rightKey, const TFmrTableKeysBoundary& taskRangeLastKey) override; + + + void ExtendChunksPerTable(std::unordered_map>& chunksByTable) override; + +private: + const TSortingColumns ReduceBy_; + const TReducePartitionSettings Settings_; + TMaybe LeftBoundary_; // key right boundary with which we non-inclusively chopped previous reduce job. + std::queue>> LeftBoundaryChunks_; + ui64 CurrentLastKeyWeight_ = 0 ; +}; + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.cpp b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.cpp index 2facecde3e4..78f78c0a856 100644 --- a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.cpp @@ -6,548 +6,56 @@ namespace NYql::NFmr { -void TSortedPartitioner::TChunkContainer::Push(TChunkUnit chunk) { - Chunks_.push_back(std::move(chunk)); -} - -bool TSortedPartitioner::TChunkContainer::IsEmpty() const { - return Chunks_.empty(); -} - -const std::vector& TSortedPartitioner::TChunkContainer::GetChunks() const { - return Chunks_; -} - -void TSortedPartitioner::TChunkContainer::Clear() { - Chunks_.clear(); - KeysRange_ = TFmrTableKeysRange{.IsEmpty = true}; -} - -const TFmrTableKeysRange& TSortedPartitioner::TChunkContainer::GetKeysRange() const { - return KeysRange_; -} - -void TSortedPartitioner::TChunkContainer::UpdateKeyRange(const TFmrTableKeysRange& KeyRange) { - if (KeyRange.IsEmpty) { - return; - } - - if (KeysRange_.IsEmpty) { - KeysRange_.IsEmpty = false; - KeysRange_.SetFirstKeysBound(*KeyRange.FirstKeysBound, KeyRange.IsFirstBoundInclusive); - KeysRange_.SetLastKeysBound(*KeyRange.LastKeysBound, KeyRange.IsLastBoundInclusive); - } - - if (!KeysRange_.IsFirstKeySet()) { - KeysRange_.SetFirstKeysBound(*KeyRange.FirstKeysBound, KeyRange.IsFirstBoundInclusive); - } else { - bool isInclusive = true; - if (*KeysRange_.FirstKeysBound < *KeyRange.FirstKeysBound) { - isInclusive = KeysRange_.IsFirstBoundInclusive; - } else if (*KeysRange_.FirstKeysBound > *KeyRange.FirstKeysBound) { - isInclusive = KeyRange.IsFirstBoundInclusive; - } else { - isInclusive = KeysRange_.IsFirstBoundInclusive || KeyRange.IsFirstBoundInclusive; - } - KeysRange_.SetFirstKeysBound(std::min(*KeysRange_.FirstKeysBound, *KeyRange.FirstKeysBound), isInclusive); - } - - if (!KeysRange_.IsLastKeySet()) { - KeysRange_.SetLastKeysBound(*KeyRange.LastKeysBound, KeyRange.IsLastBoundInclusive); - } else { - bool isInclusive = true; - if (*KeysRange_.LastKeysBound < *KeyRange.LastKeysBound) { - isInclusive = KeysRange_.IsLastBoundInclusive; - } else if (*KeysRange_.LastKeysBound > *KeyRange.LastKeysBound) { - isInclusive = KeyRange.IsLastBoundInclusive; - } else { - isInclusive = KeysRange_.IsLastBoundInclusive && KeyRange.IsLastBoundInclusive; - } - KeysRange_.SetLastKeysBound(std::min(*KeysRange_.LastKeysBound, *KeyRange.LastKeysBound), isInclusive); - } -} - -TSortedPartitioner::TFmrTablesChunkPool::TFmrTablesChunkPool( +TFmrTableKeysRange TSortedPartitioner::GetReadRangeFromSlices(const std::vector& slices, bool /*isLastRange*/) { + YQL_ENSURE(!slices.empty()); + TFmrTableKeysRange taskRange{.IsEmpty = false}; + auto minSliceReadRange = slices[0].RangeForRead, maxSliceReadRange = slices[slices.size() - 1].RangeForRead; + YQL_ENSURE(minSliceReadRange.FirstKeysBound.Defined()); + YQL_ENSURE(maxSliceReadRange.LastKeysBound.Defined()); + taskRange.SetFirstKeysBound(*minSliceReadRange.FirstKeysBound, minSliceReadRange.IsFirstBoundInclusive); + taskRange.SetLastKeysBound(*maxSliceReadRange.LastKeysBound, maxSliceReadRange.IsLastBoundInclusive); + return taskRange; +} + +TTaskTableInputRef TSortedPartitioner::CreateTaskInputFromSlices( + const std::vector& slices, const std::vector& inputTables, - const std::unordered_map>& partIdsForTables, - const std::unordered_map>& partIdStats, - const TSortingColumns& KeyColumns -) - : PartIdsForTables_(partIdsForTables) - , PartIdStats_(partIdStats) - , KeyColumns_(KeyColumns) -{ - InitTableInputs(inputTables); -} - -void TSortedPartitioner::TFmrTablesChunkPool::InitTableInputs(const std::vector& inputTables) { - TableOrder_.clear(); - TableOrder_.reserve(inputTables.size()); - - for (const auto& table : inputTables) { - const TString& tableId = table.FmrTableId.Id; - Y_ENSURE( - PartIdsForTables_.contains(tableId), - "No partitions metadata found for input FMR table: " << tableId); - - const auto& partIds = PartIdsForTables_.at(tableId); - Y_ENSURE( - !partIds.empty(), - "SortedPartitioner requires at least one partition for input table: " << tableId); - TableOrder_.push_back(tableId); - - std::vector chunks; - ui64 tableChunkCount = 0; - for (const auto& partId : partIds) { - Y_ENSURE( - PartIdStats_.contains(partId), - "No chunk stats found for partId: " << partId << " (table: " << tableId << ")"); - - const auto& stats = PartIdStats_.at(partId); - tableChunkCount += stats.size(); - for (ui64 chunkIdx = 0; chunkIdx < stats.size(); ++chunkIdx) { - const auto& chunk = stats[chunkIdx]; - Y_ENSURE(chunk.SortedChunkStats.IsSorted, "Every FMR chunk inside SortedPartitioner must be sorted"); - if (chunk.SortedChunkStats.FirstRowKeys.IsUndefined() || chunk.SortedChunkStats.LastRowKeys.IsUndefined()) { - Error_ = TFmrError{ - .Component = EFmrComponent::Coordinator, - .Reason = EFmrErrorReason::RestartQuery, - .ErrorMessage = "Every FMR chunk inside SortedPartitioner must have first and last key bounds" - }; - return; - } - - auto firstBound = MakeKeyBound(chunk.SortedChunkStats.FirstRowKeys, KeyColumns_); - auto lastBound = MakeKeyBound(chunk.SortedChunkStats.LastRowKeys, KeyColumns_); - TFmrTableKeysRange KeyRange{.IsEmpty = false}; - KeyRange.SetFirstKeysBound(std::move(firstBound), true); - KeyRange.SetLastKeysBound(std::move(lastBound), true); - - chunks.push_back(TChunkUnit{ - .TableId = tableId, - .PartId = partId, - .ChunkIndex = chunkIdx, - .DataWeight = chunk.DataWeight, - .KeyRange = KeyRange, - }); - } - } - Y_ENSURE( - tableChunkCount > 0, - "SortedPartitioner requires at least one chunk for input table: " << tableId); - - std::stable_sort(chunks.begin(), chunks.end(), [](const TChunkUnit& a, const TChunkUnit& b) { - if (*a.KeyRange.FirstKeysBound != *b.KeyRange.FirstKeysBound) { - return *a.KeyRange.FirstKeysBound < *b.KeyRange.FirstKeysBound; - } - return *a.KeyRange.LastKeysBound < *b.KeyRange.LastKeysBound; - }); - - TableInputs_[tableId] = std::deque(chunks.begin(), chunks.end()); - - if (!chunks.empty()) { - TSortedPartitionerFilterBoundary FilterBoundary{.FilterBoundary = *chunks.front().KeyRange.FirstKeysBound, .IsInclusive = true}; - FilterBoundaries_[tableId] = FilterBoundary; - } - } -} - -void TSortedPartitioner::TFmrTablesChunkPool::SetError(TFmrError error) { - Error_ = std::move(error); -} - -TMaybe TSortedPartitioner::TFmrTablesChunkPool::GetError() const { - return Error_; -} - -bool TSortedPartitioner::TFmrTablesChunkPool::IsNotEmpty() const { - return std::any_of(TableInputs_.begin(), TableInputs_.end(), - [](const auto& pair) { return !pair.second.empty(); }); -} - -void TSortedPartitioner::TFmrTablesChunkPool::PutBack(TChunkUnit chunk) { - TableInputs_[chunk.TableId].push_front(std::move(chunk)); -} - -void TSortedPartitioner::TFmrTablesChunkPool::UpdateFilterBoundary(const TString& tableId, const TSortedPartitionerFilterBoundary& FilterBoundary) { - auto [it, inserted] = FilterBoundaries_.try_emplace(tableId, FilterBoundary); - if (!inserted) { - it->second = GetMaxFilterBoundary(tableId, FilterBoundary); - } -} - -TMaybe TSortedPartitioner::TFmrTablesChunkPool::GetFilterBoundary(const TString& tableId) const { - auto it = FilterBoundaries_.find(tableId); - if (it == FilterBoundaries_.end()) { - return Nothing(); - } - return TMaybe(it->second); -} - -TFmrTableKeysRange TSortedPartitioner::TFmrTablesChunkPool::GetEffectiveKeysRange(const TChunkUnit& chunk) const { - auto FilterBoundary = GetFilterBoundary(chunk.TableId); - if (FilterBoundary.Defined()) { - auto tmpFilterBoundary = TSortedPartitionerFilterBoundary{.FilterBoundary=*chunk.KeyRange.FirstKeysBound, .IsInclusive=true}; - const auto maxKey = GetMaxFilterBoundary(chunk.TableId, tmpFilterBoundary); - TFmrTableKeysRange out{.IsEmpty = false}; - out.SetFirstKeysBound(maxKey.FilterBoundary, maxKey.IsInclusive); - out.SetLastKeysBound(*chunk.KeyRange.LastKeysBound, chunk.KeyRange.IsLastBoundInclusive); - return out; - } - return chunk.KeyRange; -} - -const std::vector& TSortedPartitioner::TFmrTablesChunkPool::GetTableOrder() const { - return TableOrder_; -} - -TMaybe TSortedPartitioner::TFmrTablesChunkPool::ReadNextChunk(const TString& tableId) { - auto it = TableInputs_.find(tableId); - if (it == TableInputs_.end()) { - return Nothing(); - } - auto& chunkQueue = it->second; - if (chunkQueue.empty()) { - return Nothing(); - } - - TChunkUnit chunk = chunkQueue.front(); - chunkQueue.pop_front(); - return chunk; + bool isLastRange +) { + return CreateTaskInputFromSlicesImpl(slices, inputTables, isLastRange); } -TSortedPartitionerFilterBoundary TSortedPartitioner::TFmrTablesChunkPool::GetMaxFilterBoundary(const TString& tableId, const TSortedPartitionerFilterBoundary& FilterBoundary) const { - auto it = FilterBoundaries_.find(tableId); - if (it == FilterBoundaries_.end() || it->second.FilterBoundary < FilterBoundary.FilterBoundary) { - return FilterBoundary; - } else if (it->second.FilterBoundary == FilterBoundary.FilterBoundary) { - if (!FilterBoundary.IsInclusive) { - return FilterBoundary; - } else { - return it->second; - } +void TSortedPartitioner::ChangeLeftKeyBoundaryIfNeeded( + TFmrTableKeysBoundary& leftKey, + bool& leftInclusive, + const TPartitionerFilterBoundary& filterBoundary +) { + if (filterBoundary.FilterBoundary > leftKey) { + leftKey = filterBoundary.FilterBoundary; + leftInclusive = filterBoundary.IsInclusive; + } else if (filterBoundary.FilterBoundary == leftKey) { + leftInclusive = leftInclusive && filterBoundary.IsInclusive; } - return it->second; } -TSortedPartitioner::TReadSliceResult TSortedPartitioner::ReadSlice(TFmrTablesChunkPool& chunkPool) { - TChunkContainer container; - std::vector effectiveRanges; - effectiveRanges.reserve(chunkPool.GetTableOrder().size()); - - for (const auto& tableId : chunkPool.GetTableOrder()) { - auto chunk = chunkPool.ReadNextChunk(tableId); - if (!chunk.Defined()) { - continue; - } - auto effectiveRange = chunkPool.GetEffectiveKeysRange(*chunk); - container.Push(std::move(*chunk)); - container.UpdateKeyRange(effectiveRange); - effectiveRanges.push_back(std::move(effectiveRange)); - } - - if (container.IsEmpty()) { - return TReadSliceResult{}; +void TSortedPartitioner::ChangeRightKeyBoundaryIfNeeded(TFmrTableKeysBoundary& rightKey, const TFmrTableKeysBoundary& taskRangeLastKey) { + if (taskRangeLastKey < rightKey) { + rightKey = taskRangeLastKey; } - const TFmrTableKeysRange& rangeForRead = container.GetKeysRange(); - - const TFmrTableKeysBoundary& sepKey = *rangeForRead.LastKeysBound; - - TSlice slice; - slice.RangeForRead = rangeForRead; - - const auto& chunks = container.GetChunks(); - - for (size_t i = 0; i < chunks.size(); ++i) { - const auto& chunk = chunks[i]; - const auto& effectiveRange = effectiveRanges[i]; - - slice.PerTableLeft[chunk.TableId] = TSortedPartitionerFilterBoundary{ - .FilterBoundary = *effectiveRange.FirstKeysBound, - .IsInclusive = effectiveRange.IsFirstBoundInclusive - }; - - const TFmrTableKeysRange intersection = rangeForRead.GetIntersection(effectiveRange); - if (intersection.IsEmpty) { - chunkPool.PutBack(chunk); - continue; - } - - slice.ChunksByTable[chunk.TableId].push_back(chunk); - - const TFmrTableKeysBoundary& interFirst = *intersection.FirstKeysBound; - const TFmrTableKeysBoundary& effLast = *effectiveRange.LastKeysBound; - - if (interFirst <= sepKey && sepKey < effLast) { - chunkPool.UpdateFilterBoundary(chunk.TableId, TSortedPartitionerFilterBoundary{.FilterBoundary = sepKey, .IsInclusive = false}); - chunkPool.PutBack(chunk); - } else if (interFirst <= sepKey && sepKey == effLast) { - slice.Weight += chunk.DataWeight; - chunkPool.UpdateFilterBoundary(chunk.TableId, TSortedPartitionerFilterBoundary{.FilterBoundary = sepKey, .IsInclusive = true}); - } else { - YQL_CLOG(WARN, FastMapReduce) << "FMR fallback to YT: undefined behaviour in ReadSlice, intersection doesn't reach separator key"; - return TReadSliceResult{.Error = TFmrError{ - .Component = EFmrComponent::Coordinator, - .Reason = EFmrErrorReason::FallbackOperation, - .ErrorMessage = "Undefined behaviour in ReadSlice: intersection doesn't reach separator key" - }}; - } - } - - return TReadSliceResult{.Slice = std::move(slice)}; } -TTaskTableInputRef TSortedPartitioner::CreateTaskInputFromSlices(const std::vector& slices, const std::vector& inputTables) const { - TFmrTableKeysRange taskRange{.IsEmpty = true}; - std::unordered_map> chunksByTable; - std::unordered_map perTableLeft; - std::unordered_map> seenChunksByTable; - - for (const auto& slice : slices) { - if (!slice.RangeForRead.IsEmpty) { - if (taskRange.IsEmpty) { - taskRange = slice.RangeForRead; - } else { - const auto& firstKey = *slice.RangeForRead.FirstKeysBound; - if (!taskRange.IsFirstKeySet() || firstKey < *taskRange.FirstKeysBound) { - taskRange.SetFirstKeysBound(firstKey, slice.RangeForRead.IsFirstBoundInclusive); - } else if (taskRange.IsFirstKeySet() && firstKey == *taskRange.FirstKeysBound) { - taskRange.IsFirstBoundInclusive = taskRange.IsFirstBoundInclusive || slice.RangeForRead.IsFirstBoundInclusive; - } - - const auto& lastKey = *slice.RangeForRead.LastKeysBound; - if (!taskRange.IsLastKeySet() || lastKey > *taskRange.LastKeysBound) { - taskRange.SetLastKeysBound(lastKey, true); - } else if (taskRange.IsLastKeySet() && lastKey == *taskRange.LastKeysBound) { - taskRange.IsLastBoundInclusive = true; - } - taskRange.IsEmpty = false; - } - } - - for (const auto& [tableId, sliceChunks] : slice.ChunksByTable) { - auto& out = chunksByTable[tableId]; - auto& seen = seenChunksByTable[tableId]; - for (const auto& chunk : sliceChunks) { - TString chunkKey = TStringBuilder() << chunk.PartId << "#" << chunk.ChunkIndex; - if (!seen.insert(chunkKey).second) { - continue; - } - out.push_back(chunk); - } - } - - for (const auto& [tableId, sliceFilterBoundary] : slice.PerTableLeft) { - auto it = perTableLeft.find(tableId); - if (it == perTableLeft.end()) { - perTableLeft.emplace(tableId, sliceFilterBoundary); - continue; - } - auto& currentFilterBoundary = it->second; - if (sliceFilterBoundary.FilterBoundary < currentFilterBoundary.FilterBoundary) { - currentFilterBoundary = sliceFilterBoundary; - } else if (sliceFilterBoundary.FilterBoundary == currentFilterBoundary.FilterBoundary) { - currentFilterBoundary.IsInclusive = currentFilterBoundary.IsInclusive || sliceFilterBoundary.IsInclusive; - } - } - } - - TTaskTableInputRef taskInput; - taskInput.Inputs.reserve(inputTables.size()); - - const TFmrTableKeysBoundary& taskLastKey = *taskRange.LastKeysBound; - - for (const auto& t : inputTables) { - const TString& tableId = t.FmrTableId.Id; - auto it = chunksByTable.find(tableId); - if (it == chunksByTable.end() || it->second.empty()) { - continue; - } - const auto& chunks = it->second; - - std::vector tableRanges; - tableRanges.reserve(chunks.size()); - std::unordered_map byPartIndex; - byPartIndex.reserve(chunks.size()); - std::unordered_map> chunkIndexesByPart; - chunkIndexesByPart.reserve(chunks.size()); - for (const auto& c : chunks) { - auto [it, inserted] = byPartIndex.try_emplace(c.PartId, tableRanges.size()); - if (inserted) { - tableRanges.push_back(TTableRange{ - .PartId = c.PartId, - .MinChunk = c.ChunkIndex, - .MaxChunk = c.ChunkIndex + 1 - }); - } else { - auto& range = tableRanges[it->second]; - range.MinChunk = std::min(range.MinChunk, c.ChunkIndex); - range.MaxChunk = std::max(range.MaxChunk, c.ChunkIndex + 1); - } - chunkIndexesByPart[c.PartId].push_back(c.ChunkIndex); - } - - TFmrTableKeysBoundary leftKey = *taskRange.FirstKeysBound; - bool leftInclusive = taskRange.IsFirstBoundInclusive; - auto FilterBoundaryIt = perTableLeft.find(tableId); - if (FilterBoundaryIt != perTableLeft.end()) { - const auto& FilterBoundary = FilterBoundaryIt->second; - if (FilterBoundary.FilterBoundary > leftKey) { - leftKey = FilterBoundary.FilterBoundary; - leftInclusive = FilterBoundary.IsInclusive; - } else if (FilterBoundary.FilterBoundary == leftKey) { - leftInclusive = leftInclusive && FilterBoundary.IsInclusive; - } - } - - TFmrTableKeysBoundary rightKey = *chunks.back().KeyRange.LastKeysBound; - if (taskLastKey < rightKey) { - rightKey = taskLastKey; - } - - TFmrTableInputRef inputRef{ - .TableId = tableId, - .TableRanges = std::move(tableRanges), - .Columns = t.Columns, - .SerializedColumnGroups = t.SerializedColumnGroups, - .IsFirstRowInclusive = leftInclusive, - .FirstRowKeys = TString(leftKey.Row), - .LastRowKeys = TString(rightKey.Row), - }; - taskInput.Inputs.emplace_back(std::move(inputRef)); - } - - return taskInput; +void TSortedPartitioner::ExtendChunksPerTable(std::unordered_map>&) { + return; } TSortedPartitioner::TSortedPartitioner( const std::unordered_map>& partIdsForTables, const std::unordered_map>& partIdStats, - TSortingColumns KeyColumns, + TSortingColumns keyColumns, const TSortedPartitionSettings& settings ) - : PartIdsForTables_(partIdsForTables) - , PartIdStats_(partIdStats) - , KeyColumns_(std::move(KeyColumns)) - , Settings_(settings) + : TSortedPartitionerBase(partIdsForTables, partIdStats, keyColumns, settings.FmrPartitionSettings) { - Y_ENSURE(Settings_.FmrPartitionSettings.MaxParts > 0); - Y_ENSURE(Settings_.FmrPartitionSettings.MaxDataWeightPerPart > 0); -} - -TPartitionResult TSortedPartitioner::PartitionTablesIntoTasksSorted( - const std::vector& inputTables -) { - if (inputTables.empty()) { - return TPartitionResult{}; - } - - std::vector inputFmrTables; - inputFmrTables.reserve(inputTables.size()); - for (const auto& table : inputTables) { - if (auto fmrTable = std::get_if(&table)) { - inputFmrTables.push_back(*fmrTable); - } else { - return TPartitionResult{.Error = TFmrError{ - .Component = EFmrComponent::Coordinator, - .Reason = EFmrErrorReason::RestartQuery, - .ErrorMessage = "Unsupported table type for SortedPartitioner: only FMR tables are supported" - }}; - } - } - - return PartitionFmrTables(inputFmrTables); -} - -TPartitionResult TSortedPartitioner::PartitionFmrTables(const std::vector& inputTables) { - Y_ENSURE(Settings_.FmrPartitionSettings.MaxDataWeightPerPart > 0, "MaxDataWeightPerPart must be > 0"); - Y_ENSURE(!KeyColumns_.Columns.empty(), "KeyColumns must be set for SortedPartitioner"); - - std::vector tasks; - const ui64 maxParts = Settings_.FmrPartitionSettings.MaxParts; - - TFmrTablesChunkPool chunkPool(inputTables, PartIdsForTables_, PartIdStats_, KeyColumns_); - if (auto error = chunkPool.GetError()) { - return TPartitionResult{.Error = *error}; - } - ui64 maxWeight = Settings_.FmrPartitionSettings.MaxDataWeightPerPart; - if (Settings_.FmrPartitionSettings.AdjustDataWeightPerPartition) { - ui64 totalWeight = CollectFmrTotalWeight(inputTables); - ui64 estimatedParts = (totalWeight + maxWeight - 1) / maxWeight; - if (estimatedParts > maxParts && maxParts > 0) { - maxWeight = totalWeight / maxParts; - YQL_CLOG(INFO, FastMapReduce) << "AdjustDataWeightPerPartition (sorted): adjusted MaxDataWeightPerPart from " - << Settings_.FmrPartitionSettings.MaxDataWeightPerPart << " to " << maxWeight - << " (totalWeight=" << totalWeight << ", maxParts=" << maxParts << ")"; - } - } - - std::vector currentSlices; - ui64 currentWeight = 0; - - while (chunkPool.IsNotEmpty()) { - auto sliceResult = ReadSlice(chunkPool); - if (sliceResult.Error) { - return TPartitionResult{.Error = *sliceResult.Error}; - } - if (!sliceResult.Slice.Defined()) { - break; - } - auto& slice = *sliceResult.Slice; - - ui64 newWeight = slice.Weight; - - if (!currentSlices.empty() && currentWeight + newWeight > maxWeight) { - tasks.emplace_back(CreateTaskInputFromSlices(currentSlices, inputTables)); - currentSlices.clear(); - currentWeight = 0; - } - currentWeight += newWeight; - currentSlices.push_back(std::move(slice)); - } - if (!currentSlices.empty()) { - tasks.emplace_back(CreateTaskInputFromSlices(currentSlices, inputTables)); - } - - if (maxParts < tasks.size()) { - return TPartitionResult{.Error = TFmrError{ - .Component = EFmrComponent::Coordinator, - .Reason = EFmrErrorReason::RestartQuery, - .ErrorMessage = TStringBuilder() << "SortedPartitioner produced " << tasks.size() - << " tasks which exceeds max_parts=" << maxParts - }}; - } - - return TPartitionResult{.TaskInputs = tasks}; -} - -ui64 TSortedPartitioner::CollectFmrTotalWeight(const std::vector& inputTables) { - ui64 totalWeight = 0; - - for (const auto& table : inputTables) { - const TString& tableId = table.FmrTableId.Id; - const auto& partIds = PartIdsForTables_.at(tableId); - for (const auto& partId : partIds) { - const auto& stats = PartIdStats_.at(partId); - for (const auto& chunkStat : stats) { - totalWeight += chunkStat.DataWeight; - } - } - } - - return totalWeight; -} - -TPartitionResult PartitionInputTablesIntoTasksSorted( - const std::vector& inputTables, - TSortedPartitioner& partitioner -) { - auto result = partitioner.PartitionTablesIntoTasksSorted(inputTables); - if (!result.Error) { - YQL_CLOG(DEBUG, FastMapReduce) << "Successfully partitioned " << inputTables.size() - << " input tables (sorted) into " << result.TaskInputs.size() << " tasks"; - } - return result; } } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.h b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.h index 471fcbd21f4..c537b3add50 100644 --- a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.h +++ b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.h @@ -1,5 +1,7 @@ #pragma once +#include "yql_yt_sorted_partitioner_base.h" + #include #include #include @@ -14,124 +16,29 @@ struct TSortedPartitionSettings { TFmrPartitionerSettings FmrPartitionSettings; }; -struct TSortedPartitionerFilterBoundary { - TFmrTableKeysBoundary FilterBoundary; - bool IsInclusive; -}; - -class TSortedPartitioner { +class TSortedPartitioner: public TSortedPartitionerBase { public: TSortedPartitioner( const std::unordered_map>& partIdsForTables, const std::unordered_map>& partIdStats, - TSortingColumns KeyColumns, + TSortingColumns keyColumns, const TSortedPartitionSettings& settings ); - TPartitionResult PartitionTablesIntoTasksSorted( - const std::vector& inputTables - ); - private: - struct TChunkUnit { - TString TableId; - TString PartId; - ui64 ChunkIndex = 0; - ui64 DataWeight = 0; - TFmrTableKeysRange KeyRange; - }; - - class TChunkContainer { - public: - TChunkContainer() = default; - - void Push(TChunkUnit chunk); - bool IsEmpty() const; - const std::vector& GetChunks() const; - void Clear(); - - const TFmrTableKeysRange& GetKeysRange() const; - void UpdateKeyRange(const TFmrTableKeysRange& KeyRange); - - private: - std::vector Chunks_; - TFmrTableKeysRange KeysRange_; - }; - - class TFmrTablesChunkPool { - public: - TFmrTablesChunkPool( - const std::vector& inputTables, - const std::unordered_map>& partIdsForTables, - const std::unordered_map>& partIdStats, - const TSortingColumns& KeyColumns - ); - - void PutBack(TChunkUnit chunk); - void UpdateFilterBoundary(const TString& tableId, const TSortedPartitionerFilterBoundary& FilterBoundary); - bool IsNotEmpty() const; - - const std::vector& GetTableOrder() const; - TMaybe ReadNextChunk(const TString& tableId); + TTaskTableInputRef CreateTaskInputFromSlices(const std::vector& slices, const std::vector& inputTables, bool isLastRange) override; - TMaybe GetFilterBoundary(const TString& tableId) const; - TSortedPartitionerFilterBoundary GetMaxFilterBoundary(const TString& tableId, const TSortedPartitionerFilterBoundary& FilterBoundary) const; - TFmrTableKeysRange GetEffectiveKeysRange(const TChunkUnit& chunk) const; + TFmrTableKeysRange GetReadRangeFromSlices(const std::vector& slices, bool isLastRange) override; - void SetError(TFmrError error); - TMaybe GetError() const; + void ChangeLeftKeyBoundaryIfNeeded( + TFmrTableKeysBoundary& leftKey, + bool& isLeftInclusive, + const TPartitionerFilterBoundary& filterBoundary + ) override; - private: - void InitTableInputs(const std::vector& inputTables); + void ChangeRightKeyBoundaryIfNeeded(TFmrTableKeysBoundary& rightKey, const TFmrTableKeysBoundary& taskRangeLastKey) override; - std::vector TableOrder_; - std::unordered_map> TableInputs_; - std::unordered_map FilterBoundaries_; - const std::unordered_map>& PartIdsForTables_; - const std::unordered_map>& PartIdStats_; - const TSortingColumns& KeyColumns_; - TMaybe Error_; - }; - - TPartitionResult PartitionFmrTables( - const std::vector& inputTables - ); - - struct TSlice { - std::unordered_map> ChunksByTable; - TFmrTableKeysRange RangeForRead; - std::unordered_map PerTableLeft; - ui64 Weight = 0; - }; - - struct TReadSliceResult { - TMaybe Slice; - TMaybe Error; - }; - - TReadSliceResult ReadSlice(TFmrTablesChunkPool& chunkPool); - TTaskTableInputRef CreateTaskInputFromSlices(const std::vector& slices, const std::vector& inputTables) const; - - std::unordered_map> ProcessChunkForSlice( - TFmrTablesChunkPool& chunkPool, - const TChunkContainer& container, - const TFmrTableKeysRange& taskRange - ); - - ui64 CollectFmrTotalWeight( - const std::vector& inputTables - ); - -private: - const std::unordered_map> PartIdsForTables_; - const std::unordered_map> PartIdStats_; - const TSortingColumns KeyColumns_; - const TSortedPartitionSettings Settings_; + void ExtendChunksPerTable(std::unordered_map>& chunksByTable) override; }; -TPartitionResult PartitionInputTablesIntoTasksSorted( - const std::vector& inputTables, - TSortedPartitioner& partitioner -); - } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.cpp b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.cpp new file mode 100644 index 00000000000..4882972f2c6 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.cpp @@ -0,0 +1,530 @@ +#include "yql_yt_sorted_partitioner_base.h" + +#include + +namespace NYql::NFmr { + + +void TChunkContainer::Push(TChunkUnit chunk) { + Chunks_.push_back(std::move(chunk)); +} + +bool TChunkContainer::IsEmpty() const { + return Chunks_.empty(); +} + +const std::vector& TChunkContainer::GetChunks() const { + return Chunks_; +} + +void TChunkContainer::Clear() { + Chunks_.clear(); + KeysRange_ = TFmrTableKeysRange{.IsEmpty = true}; +} + +const TFmrTableKeysRange& TChunkContainer::GetKeysRange() const { + return KeysRange_; +} + +void TChunkContainer::UpdateKeyRange(const TFmrTableKeysRange& KeyRange) { + if (KeyRange.IsEmpty) { + return; + } + + if (KeysRange_.IsEmpty) { + KeysRange_.IsEmpty = false; + KeysRange_.SetFirstKeysBound(*KeyRange.FirstKeysBound, KeyRange.IsFirstBoundInclusive); + KeysRange_.SetLastKeysBound(*KeyRange.LastKeysBound, KeyRange.IsLastBoundInclusive); + } + + if (!KeysRange_.IsFirstKeySet()) { + KeysRange_.SetFirstKeysBound(*KeyRange.FirstKeysBound, KeyRange.IsFirstBoundInclusive); + } else { + bool isInclusive = true; + if (*KeysRange_.FirstKeysBound < *KeyRange.FirstKeysBound) { + isInclusive = KeysRange_.IsFirstBoundInclusive; + } else if (*KeysRange_.FirstKeysBound > *KeyRange.FirstKeysBound) { + isInclusive = KeyRange.IsFirstBoundInclusive; + } else { + isInclusive = KeysRange_.IsFirstBoundInclusive || KeyRange.IsFirstBoundInclusive; + } + KeysRange_.SetFirstKeysBound(std::min(*KeysRange_.FirstKeysBound, *KeyRange.FirstKeysBound), isInclusive); + } + + if (!KeysRange_.IsLastKeySet()) { + KeysRange_.SetLastKeysBound(*KeyRange.LastKeysBound, KeyRange.IsLastBoundInclusive); + } else { + bool isInclusive = true; + if (*KeysRange_.LastKeysBound < *KeyRange.LastKeysBound) { + isInclusive = KeysRange_.IsLastBoundInclusive; + } else if (*KeysRange_.LastKeysBound > *KeyRange.LastKeysBound) { + isInclusive = KeyRange.IsLastBoundInclusive; + } else { + isInclusive = KeysRange_.IsLastBoundInclusive && KeyRange.IsLastBoundInclusive; + } + KeysRange_.SetLastKeysBound(std::min(*KeysRange_.LastKeysBound, *KeyRange.LastKeysBound), isInclusive); + } +} + +TFmrTablesChunkPool::TFmrTablesChunkPool( + const std::vector& inputTables, + const std::unordered_map>& partIdsForTables, + const std::unordered_map>& partIdStats, + const TSortingColumns& KeyColumns +) + : PartIdsForTables_(partIdsForTables) + , PartIdStats_(partIdStats) + , KeyColumns_(KeyColumns) +{ + InitTableInputs(inputTables); +} + +void TFmrTablesChunkPool::InitTableInputs(const std::vector& inputTables) { + TableOrder_.clear(); + TableOrder_.reserve(inputTables.size()); + + for (const auto& table : inputTables) { + const TString& tableId = table.FmrTableId.Id; + Y_ENSURE( + PartIdsForTables_.contains(tableId), + "No partitions metadata found for input FMR table: " << tableId); + + const auto& partIds = PartIdsForTables_.at(tableId); + Y_ENSURE( + !partIds.empty(), + "SortedPartitioner requires at least one partition for input table: " << tableId); + TableOrder_.push_back(tableId); + + std::vector chunks; + ui64 tableChunkCount = 0; + for (const auto& partId : partIds) { + Y_ENSURE( + PartIdStats_.contains(partId), + "No chunk stats found for partId: " << partId << " (table: " << tableId << ")"); + + const auto& stats = PartIdStats_.at(partId); + tableChunkCount += stats.size(); + for (ui64 chunkIdx = 0; chunkIdx < stats.size(); ++chunkIdx) { + const auto& chunk = stats[chunkIdx]; + Y_ENSURE(chunk.SortedChunkStats.IsSorted, "Every FMR chunk inside SortedPartitioner must be sorted"); + if (chunk.SortedChunkStats.FirstRowKeys.IsUndefined() || chunk.SortedChunkStats.LastRowKeys.IsUndefined()) { + Error_ = TFmrError{ + .Component = EFmrComponent::Coordinator, + .Reason = EFmrErrorReason::RestartQuery, + .ErrorMessage = "Every FMR chunk inside SortedPartitioner must have first and last key bounds" + }; + return; + } + + auto firstBound = MakeKeyBound(chunk.SortedChunkStats.FirstRowKeys, KeyColumns_); + auto lastBound = MakeKeyBound(chunk.SortedChunkStats.LastRowKeys, KeyColumns_); + TFmrTableKeysRange KeyRange{.IsEmpty = false}; + KeyRange.SetFirstKeysBound(std::move(firstBound), true); + KeyRange.SetLastKeysBound(std::move(lastBound), true); + + chunks.push_back(TChunkUnit{ + .TableId = tableId, + .PartId = partId, + .ChunkIndex = chunkIdx, + .DataWeight = chunk.DataWeight, + .KeyRange = KeyRange, + }); + } + } + Y_ENSURE( + tableChunkCount > 0, + "SortedPartitioner requires at least one chunk for input table: " << tableId); + + std::stable_sort(chunks.begin(), chunks.end(), [](const TChunkUnit& a, const TChunkUnit& b) { + if (*a.KeyRange.FirstKeysBound != *b.KeyRange.FirstKeysBound) { + return *a.KeyRange.FirstKeysBound < *b.KeyRange.FirstKeysBound; + } + return *a.KeyRange.LastKeysBound < *b.KeyRange.LastKeysBound; + }); + + TableInputs_[tableId] = std::deque(chunks.begin(), chunks.end()); + + if (!chunks.empty()) { + TPartitionerFilterBoundary FilterBoundary{.FilterBoundary = *chunks.front().KeyRange.FirstKeysBound, .IsInclusive = true}; + FilterBoundaries_[tableId] = FilterBoundary; + } + } +} + +void TFmrTablesChunkPool::SetError(TFmrError error) { + Error_ = std::move(error); +} + +TMaybe TFmrTablesChunkPool::GetError() const { + return Error_; +} + +bool TFmrTablesChunkPool::IsNotEmpty() const { + return std::any_of(TableInputs_.begin(), TableInputs_.end(), + [](const auto& pair) { return !pair.second.empty(); }); +} + +void TFmrTablesChunkPool::PutBack(TChunkUnit chunk) { + TableInputs_[chunk.TableId].push_front(std::move(chunk)); +} + +void TFmrTablesChunkPool::UpdateFilterBoundary(const TString& tableId, const TPartitionerFilterBoundary& filterBoundary) { + auto [it, inserted] = FilterBoundaries_.try_emplace(tableId, filterBoundary); + if (!inserted) { + it->second = GetMaxFilterBoundary(tableId, filterBoundary); + } +} + +TMaybe TFmrTablesChunkPool::GetFilterBoundary(const TString& tableId) const { + auto it = FilterBoundaries_.find(tableId); + if (it == FilterBoundaries_.end()) { + return Nothing(); + } + return TMaybe(it->second); +} + +TFmrTableKeysRange TFmrTablesChunkPool::GetEffectiveKeysRange(const TChunkUnit& chunk) const { + auto FilterBoundary = GetFilterBoundary(chunk.TableId); + if (FilterBoundary.Defined()) { + auto tmpFilterBoundary = TPartitionerFilterBoundary{.FilterBoundary=*chunk.KeyRange.FirstKeysBound, .IsInclusive=true}; + const auto maxKey = GetMaxFilterBoundary(chunk.TableId, tmpFilterBoundary); + TFmrTableKeysRange out{.IsEmpty = false}; + out.SetFirstKeysBound(maxKey.FilterBoundary, maxKey.IsInclusive); + out.SetLastKeysBound(*chunk.KeyRange.LastKeysBound, chunk.KeyRange.IsLastBoundInclusive); + return out; + } + return chunk.KeyRange; +} + +const std::vector& TFmrTablesChunkPool::GetTableOrder() const { + return TableOrder_; +} + +TMaybe TFmrTablesChunkPool::ReadNextChunk(const TString& tableId) { + auto it = TableInputs_.find(tableId); + if (it == TableInputs_.end()) { + return Nothing(); + } + auto& chunkQueue = it->second; + if (chunkQueue.empty()) { + return Nothing(); + } + + TChunkUnit chunk = chunkQueue.front(); + chunkQueue.pop_front(); + return chunk; +} + +TPartitionerFilterBoundary TFmrTablesChunkPool::GetMaxFilterBoundary(const TString& tableId, const TPartitionerFilterBoundary& filterBoundary) const { + auto it = FilterBoundaries_.find(tableId); + if (it == FilterBoundaries_.end() || it->second.FilterBoundary < filterBoundary.FilterBoundary) { + return filterBoundary; + } else if (it->second.FilterBoundary == filterBoundary.FilterBoundary) { + if (!filterBoundary.IsInclusive) { + return filterBoundary; + } else { + return it->second; + } + } + return it->second; +} + +TSortedPartitionerBase::TSortedPartitionerBase( + const std::unordered_map>& partIdsForTables, + const std::unordered_map>& partIdStats, + const TSortingColumns& keyColumns, + const TFmrPartitionerSettings& fmrPartitionSettings +) + : PartIdsForTables_(partIdsForTables) + , PartIdStats_(partIdStats) + , KeyColumns_(keyColumns) + , FmrPartitionSettings_(fmrPartitionSettings) +{ +} + +TReadSliceResult TSortedPartitionerBase::ReadSlice(TFmrTablesChunkPool& chunkPool) { + TChunkContainer container; + std::vector effectiveRanges; + effectiveRanges.reserve(chunkPool.GetTableOrder().size()); + + for (const auto& tableId : chunkPool.GetTableOrder()) { + auto chunk = chunkPool.ReadNextChunk(tableId); + if (!chunk.Defined()) { + continue; + } + auto effectiveRange = chunkPool.GetEffectiveKeysRange(*chunk); + container.Push(std::move(*chunk)); + container.UpdateKeyRange(effectiveRange); + effectiveRanges.push_back(std::move(effectiveRange)); + } + + if (container.IsEmpty()) { + return TReadSliceResult{}; + } + const TFmrTableKeysRange& rangeForRead = container.GetKeysRange(); + + const TFmrTableKeysBoundary& sepKey = *rangeForRead.LastKeysBound; + + TSlice slice; + slice.RangeForRead = rangeForRead; + + const auto& chunks = container.GetChunks(); + + for (size_t i = 0; i < chunks.size(); ++i) { + const auto& chunk = chunks[i]; + const auto& effectiveRange = effectiveRanges[i]; + + slice.PerTableLeft[chunk.TableId] = TPartitionerFilterBoundary{ + .FilterBoundary = *effectiveRange.FirstKeysBound, + .IsInclusive = effectiveRange.IsFirstBoundInclusive + }; + + const TFmrTableKeysRange intersection = rangeForRead.GetIntersection(effectiveRange); + if (intersection.IsEmpty) { + chunkPool.PutBack(chunk); + continue; + } + + slice.ChunksByTable[chunk.TableId].push_back(chunk); + + const TFmrTableKeysBoundary& interFirst = *intersection.FirstKeysBound; + const TFmrTableKeysBoundary& effLast = *effectiveRange.LastKeysBound; + + if (interFirst <= sepKey && sepKey < effLast) { + chunkPool.UpdateFilterBoundary(chunk.TableId, TPartitionerFilterBoundary{.FilterBoundary = sepKey, .IsInclusive = false}); + chunkPool.PutBack(chunk); + } else if (interFirst <= sepKey && sepKey == effLast) { + slice.Weight += chunk.DataWeight; + chunkPool.UpdateFilterBoundary(chunk.TableId, TPartitionerFilterBoundary{.FilterBoundary = sepKey, .IsInclusive = true}); + } else { + YQL_CLOG(WARN, FastMapReduce) << "FMR fallback to YT: undefined behaviour in ReadSlice, intersection doesn't reach separator key"; + return TReadSliceResult{.Error = TFmrError{ + .Component = EFmrComponent::Coordinator, + .Reason = EFmrErrorReason::FallbackOperation, + .ErrorMessage = "Undefined behaviour in ReadSlice: intersection doesn't reach separator key" + }}; + } + } + + return TReadSliceResult{.Slice = std::move(slice)}; +} + +ui64 TSortedPartitionerBase::CollectFmrTotalWeight(const std::vector& inputTables) { + ui64 totalWeight = 0; + + for (const auto& table : inputTables) { + const TString& tableId = table.FmrTableId.Id; + const auto& partIds = PartIdsForTables_.at(tableId); + for (const auto& partId : partIds) { + const auto& stats = PartIdStats_.at(partId); + for (const auto& chunkStat : stats) { + totalWeight += chunkStat.DataWeight; + } + } + } + + return totalWeight; +} + +TTaskTableInputRef TSortedPartitionerBase::CreateTaskInputFromSlicesImpl( + const std::vector& slices, + const std::vector& inputTables, + bool isLastRange +) { + TFmrTableKeysRange taskRange = GetReadRangeFromSlices(slices, isLastRange); + std::unordered_map> chunksByTable; + std::unordered_map perTableLeft; + std::unordered_map> seenChunksByTable; + + for (const auto& slice : slices) { + for (const auto& [tableId, sliceChunks] : slice.ChunksByTable) { + auto& out = chunksByTable[tableId]; + auto& seen = seenChunksByTable[tableId]; + for (const auto& chunk : sliceChunks) { + TString chunkKey = TStringBuilder() << chunk.PartId << "#" << chunk.ChunkIndex; + if (!seen.insert(chunkKey).second) { + continue; + } + out.push_back(chunk); + } + } + + for (const auto& [tableId, sliceFilterBoundary] : slice.PerTableLeft) { + auto it = perTableLeft.find(tableId); + if (it == perTableLeft.end()) { + perTableLeft.emplace(tableId, sliceFilterBoundary); + continue; + } + auto& currentFilterBoundary = it->second; + if (sliceFilterBoundary.FilterBoundary < currentFilterBoundary.FilterBoundary) { + currentFilterBoundary = sliceFilterBoundary; + } else if (sliceFilterBoundary.FilterBoundary == currentFilterBoundary.FilterBoundary) { + currentFilterBoundary.IsInclusive = currentFilterBoundary.IsInclusive || sliceFilterBoundary.IsInclusive; + } + } + } + + ExtendChunksPerTable(chunksByTable); + + TTaskTableInputRef taskInput; + taskInput.Inputs.reserve(inputTables.size()); + + const TFmrTableKeysBoundary& taskLastKey = *taskRange.LastKeysBound; + + for (const auto& t : inputTables) { + const TString& tableId = t.FmrTableId.Id; + auto it = chunksByTable.find(tableId); + if (it == chunksByTable.end() || it->second.empty()) { + continue; + } + const auto& chunks = it->second; + + std::vector tableRanges; + tableRanges.reserve(chunks.size()); + std::unordered_map byPartIndex; + byPartIndex.reserve(chunks.size()); + std::unordered_map> chunkIndexesByPart; + chunkIndexesByPart.reserve(chunks.size()); + for (const auto& c : chunks) { + auto [it, inserted] = byPartIndex.try_emplace(c.PartId, tableRanges.size()); + if (inserted) { + tableRanges.push_back(TTableRange{ + .PartId = c.PartId, + .MinChunk = c.ChunkIndex, + .MaxChunk = c.ChunkIndex + 1 + }); + } else { + auto& range = tableRanges[it->second]; + range.MinChunk = std::min(range.MinChunk, c.ChunkIndex); + range.MaxChunk = std::max(range.MaxChunk, c.ChunkIndex + 1); + } + chunkIndexesByPart[c.PartId].push_back(c.ChunkIndex); + } + + TFmrTableKeysBoundary leftKey = *taskRange.FirstKeysBound; + bool leftInclusive = taskRange.IsFirstBoundInclusive; + auto FilterBoundaryIt = perTableLeft.find(tableId); + if (FilterBoundaryIt != perTableLeft.end()) { + ChangeLeftKeyBoundaryIfNeeded(leftKey, leftInclusive, FilterBoundaryIt->second); + } + + TFmrTableKeysBoundary rightKey = *chunks.back().KeyRange.LastKeysBound; + bool rightInclusive = taskRange.IsLastBoundInclusive; + ChangeRightKeyBoundaryIfNeeded(rightKey, taskLastKey); + + TFmrTableInputRef inputRef{ + .TableId = tableId, + .TableRanges = std::move(tableRanges), + .Columns = t.Columns, + .SerializedColumnGroups = t.SerializedColumnGroups, + .IsFirstRowInclusive = leftInclusive, + .IsLastRowInclusive = rightInclusive, + .FirstRowKeys = TString(leftKey.Row), + .LastRowKeys = TString(rightKey.Row), + }; + taskInput.Inputs.emplace_back(std::move(inputRef)); + } + + return taskInput; +} + +TPartitionResult TSortedPartitionerBase::PartitionTablesIntoTasks( + const std::vector& inputTables +) { + if (inputTables.empty()) { + return TPartitionResult{}; + } + + std::vector inputFmrTables; + inputFmrTables.reserve(inputTables.size()); + for (const auto& table : inputTables) { + if (auto fmrTable = std::get_if(&table)) { + inputFmrTables.push_back(*fmrTable); + } else { + return TPartitionResult{.Error = TFmrError{ + .Component = EFmrComponent::Coordinator, + .Reason = EFmrErrorReason::RestartQuery, + .ErrorMessage = "Unsupported table type for partitioner: only FMR tables are supported" + }}; + } + } + + auto result = PartitionFmrTables(inputFmrTables); + if (!result.Error) { + YQL_CLOG(DEBUG, FastMapReduce) << "Successfully partitioned " << inputTables.size() + << " input tables (sorted) into " << result.TaskInputs.size() << " tasks"; + } + return result; +} + +TPartitionResult TSortedPartitionerBase::PartitionFmrTables(const std::vector& inputTables) { + Y_ENSURE(FmrPartitionSettings_.MaxDataWeightPerPart > 0, "MaxDataWeightPerPart must be > 0"); + Y_ENSURE(!KeyColumns_.Columns.empty(), "KeyColumns must be set"); + + std::vector tasks; + const ui64 maxParts = FmrPartitionSettings_.MaxParts; + + TFmrTablesChunkPool chunkPool(inputTables, PartIdsForTables_, PartIdStats_, KeyColumns_); + if (auto error = chunkPool.GetError()) { + return TPartitionResult{.Error = *error}; + } + + ui64 maxWeight = FmrPartitionSettings_.MaxDataWeightPerPart; + if (FmrPartitionSettings_.AdjustDataWeightPerPartition) { + ui64 totalWeight = CollectFmrTotalWeight(inputTables); + ui64 estimatedParts = (totalWeight + maxWeight - 1) / maxWeight; + if (estimatedParts > maxParts && maxParts > 0) { + maxWeight = totalWeight / maxParts; + YQL_CLOG(INFO, FastMapReduce) << "AdjustDataWeightPerPartition (sorted): adjusted MaxDataWeightPerPart from " + << FmrPartitionSettings_.MaxDataWeightPerPart << " to " << maxWeight + << " (totalWeight=" << totalWeight << ", maxParts=" << maxParts << ")"; + } + } + + std::vector currentSlices; + ui64 currentWeight = 0; + + try { + while (chunkPool.IsNotEmpty()) { + auto sliceResult = ReadSlice(chunkPool); + if (sliceResult.Error) { + return TPartitionResult{.Error = *sliceResult.Error}; + } + if (!sliceResult.Slice.Defined()) { + break; + } + auto& slice = *sliceResult.Slice; + if (!currentSlices.empty() && currentWeight + slice.Weight > maxWeight) { + tasks.emplace_back(CreateTaskInputFromSlices(currentSlices, inputTables, false)); + currentSlices.clear(); + currentWeight = 0; + } + currentWeight += slice.Weight; + currentSlices.push_back(std::move(slice)); + } + if (!currentSlices.empty()) { + tasks.emplace_back(CreateTaskInputFromSlices(currentSlices, inputTables, true)); + } + } catch (...) { + return TPartitionResult{ + .Error = TFmrError { + .Component = EFmrComponent::Coordinator, + .Reason = EFmrErrorReason::FallbackOperation, + .ErrorMessage = CurrentExceptionMessage() + } + }; + } + + if (maxParts < tasks.size()) { + return TPartitionResult{.Error = TFmrError{ + .Component = EFmrComponent::Coordinator, + .Reason = EFmrErrorReason::RestartQuery, + .ErrorMessage = TStringBuilder() << "Partitioner produced " << tasks.size() + << " tasks which exceeds max_parts=" << maxParts + }}; + } + + return TPartitionResult{.TaskInputs = tasks}; +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.h b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.h new file mode 100644 index 00000000000..494bf4041b2 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.h @@ -0,0 +1,143 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace NYql::NFmr { + +struct TPartitionerFilterBoundary { + TFmrTableKeysBoundary FilterBoundary; + bool IsInclusive; +}; + +struct TChunkUnit { + TString TableId; + TString PartId; + ui64 ChunkIndex = 0; + ui64 DataWeight = 0; + TFmrTableKeysRange KeyRange; +}; + +class TChunkContainer { +public: + TChunkContainer() = default; + + void Push(TChunkUnit chunk); + bool IsEmpty() const; + const std::vector& GetChunks() const; + void Clear(); + + const TFmrTableKeysRange& GetKeysRange() const; + void UpdateKeyRange(const TFmrTableKeysRange& KeyRange); + +private: + std::vector Chunks_; + TFmrTableKeysRange KeysRange_; +}; + +class TFmrTablesChunkPool { +public: + TFmrTablesChunkPool( + const std::vector& inputTables, + const std::unordered_map>& partIdsForTables, + const std::unordered_map>& partIdStats, + const TSortingColumns& KeyColumns + ); + + void PutBack(TChunkUnit chunk); + void UpdateFilterBoundary(const TString& tableId, const TPartitionerFilterBoundary& filterBoundary); + bool IsNotEmpty() const; + + const std::vector& GetTableOrder() const; + TMaybe ReadNextChunk(const TString& tableId); + + TMaybe GetFilterBoundary(const TString& tableId) const; + TPartitionerFilterBoundary GetMaxFilterBoundary(const TString& tableId, const TPartitionerFilterBoundary& filterBoundary) const; + TFmrTableKeysRange GetEffectiveKeysRange(const TChunkUnit& chunk) const; + + void SetError(TFmrError error); + TMaybe GetError() const; + +private: + void InitTableInputs(const std::vector& inputTables); + + std::vector TableOrder_; + std::unordered_map> TableInputs_; + std::unordered_map FilterBoundaries_; + const std::unordered_map>& PartIdsForTables_; + const std::unordered_map>& PartIdStats_; + const TSortingColumns& KeyColumns_; + TMaybe Error_; +}; + +struct TSlice { + std::unordered_map> ChunksByTable; + TFmrTableKeysRange RangeForRead; + std::unordered_map PerTableLeft; + ui64 Weight = 0; +}; + +struct TReadSliceResult { + TMaybe Slice; + TMaybe Error; +}; + +class TSortedPartitionerBase: public TThrRefBase { +public: + using TPtr = TIntrusivePtr; + + virtual ~TSortedPartitionerBase() = default; + + TSortedPartitionerBase( + const std::unordered_map>& partIdsForTables, + const std::unordered_map>& partIdStats, + const TSortingColumns& keyColumns, + const TFmrPartitionerSettings& fmrPartitionSettings + ); + + TPartitionResult PartitionTablesIntoTasks(const std::vector& inputTables); + +protected: + ui64 CollectFmrTotalWeight(const std::vector& inputTables); + + TReadSliceResult ReadSlice(TFmrTablesChunkPool& chunkPool); + + virtual TTaskTableInputRef CreateTaskInputFromSlices( + const std::vector& slices, + const std::vector& inputTables, + bool isLastRange + ) = 0; + + TPartitionResult PartitionFmrTables( + const std::vector& inputTables + ); + + virtual TFmrTableKeysRange GetReadRangeFromSlices(const std::vector& slices, bool isLastRange) = 0; + + virtual void ChangeLeftKeyBoundaryIfNeeded( + TFmrTableKeysBoundary& leftKey, + bool& isLeftInclusive, + const TPartitionerFilterBoundary& filterBoundary + ) = 0; + + virtual void ChangeRightKeyBoundaryIfNeeded(TFmrTableKeysBoundary& rightKey, const TFmrTableKeysBoundary& taskRangeLastKey) = 0; + + TTaskTableInputRef CreateTaskInputFromSlicesImpl( + const std::vector& slices, + const std::vector& inputTables, + bool isLastRange + ); + + virtual void ExtendChunksPerTable(std::unordered_map>& chunksByTable) = 0; + +private: + const std::unordered_map> PartIdsForTables_; + const std::unordered_map> PartIdStats_; + TSortingColumns KeyColumns_; + TFmrPartitionerSettings FmrPartitionSettings_; +}; + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make index 0f2075ad1b3..e22b59f1b73 100644 --- a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make +++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make @@ -6,6 +6,8 @@ SRCS( PEERDIR( yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file + yt/yql/providers/yt/fmr/utils + yt/yql/providers/yt/fmr/utils/comparator ) YQL_LAST_ABI_VERSION() diff --git a/yt/yql/providers/yt/fmr/job/impl/ya.make b/yt/yql/providers/yt/fmr/job/impl/ya.make index c2c9aef24c7..dd113355f5b 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ya.make +++ b/yt/yql/providers/yt/fmr/job/impl/ya.make @@ -6,6 +6,7 @@ SRCS( yql_yt_job_impl.cpp yql_yt_raw_table_queue_reader.cpp yql_yt_raw_table_queue_writer.cpp + yql_yt_reduce_reader.cpp yql_yt_sorted_merge_reader.cpp yql_yt_table_data_service_reader.cpp yql_yt_table_data_service_base_writer.cpp diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp index 8339056abe8..56fac989de3 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp @@ -133,6 +133,7 @@ public: neededColumns, columnGroups, input.IsFirstRowInclusive, + input.IsLastRowInclusive, input.FirstRowKeys, input.LastRowKeys, Settings_.FmrReaderSettings.ReadAheadChunks @@ -240,6 +241,7 @@ public: fmrInput->Columns, fmrInput->SerializedColumnGroups, fmrInput->IsFirstRowInclusive, + fmrInput->IsLastRowInclusive, fmrInput->FirstRowKeys, fmrInput->LastRowKeys, Settings_.FmrReaderSettings.ReadAheadChunks @@ -316,6 +318,7 @@ public: fmrInput->Columns, fmrInput->SerializedColumnGroups, fmrInput->IsFirstRowInclusive, + fmrInput->IsLastRowInclusive, fmrInput->FirstRowKeys, fmrInput->LastRowKeys, Settings_.FmrReaderSettings.ReadAheadChunks @@ -341,6 +344,28 @@ public: return HandleFmrJob(localSortJobFunc, ETaskType::LocalSort); } + std::variant Reduce( + const TReduceTaskParams& params, + const std::unordered_map& clusterConnections, + std::shared_ptr> /* cancelFlag */, + const TMaybe& jobEnvironmentDir, + const std::vector& jobFiles, + const std::vector& jobYtResources, + const std::vector& jobFmrResources + ) override { + auto reduceFunc = [&, this] () { + TFmrUserJobSettings userJobSettings = Settings_.FmrUserJobSettings; + TFmrUserJob reduceJob; + // deserialize reduce job and fill params + TStringStream serializedJobStateStream(params.SerializedReduceJobState); + reduceJob.Load(serializedJobStateStream); + FillReduceFmrJob(reduceJob, params, clusterConnections, Discovery_, VanillaInfo_, userJobSettings, YtJobService_); + reduceJob.SetTvmSettings(TvmSettings_); + return JobLauncher_->LaunchJob(reduceJob, jobEnvironmentDir, jobFiles, jobYtResources, jobFmrResources); + }; + return HandleFmrJob(reduceFunc, ETaskType::Reduce); + } + private: std::variant HandleFmrJob(auto fmrJobFunc, ETaskType fmrJobType) { TString errorLogMessage; @@ -419,6 +444,8 @@ TJobResult RunJob( return job->SortedMerge(taskParams, task->ClusterConnections, cancelFlag); } else if constexpr (std::is_same_v) { return job->LocalSort(taskParams, task->ClusterConnections, cancelFlag); + } else if constexpr (std::is_same_v) { + return job->Reduce(taskParams, task->ClusterConnections, cancelFlag, task->JobEnvironmentDir, task->Files, task->YtResources, task->FmrResources); } else { ythrow yexception() << "Unsupported task type"; } @@ -451,7 +478,29 @@ void FillMapFmrJob( mapJob.SetTaskFmrOutputTables(mapTaskParams.Output); mapJob.SetClusterConnections(clusterConnections); mapJob.SetYtJobService(jobService); - mapJob.SetIsOrdered(mapTaskParams.IsOrdered); + mapJob.SetFmrJobType(mapTaskParams.MapJobType); +} + +void FillReduceFmrJob( + TFmrUserJob& reduceJob, + const TReduceTaskParams& reduceTaskParams, + const std::unordered_map& clusterConnections, + ITableDataServiceDiscovery::TPtr discovery, + TMaybe vanillaInfo, + const TFmrUserJobSettings& userJobSettings, + IYtJobService::TPtr jobService +) { + reduceJob.SetSettings(userJobSettings); + if (vanillaInfo.Defined()) { + reduceJob.SetVanillaInfo(*vanillaInfo); + } + reduceJob.SetTableDataServiceDiscovery(std::move(discovery)); + reduceJob.SetTaskInputTables(reduceTaskParams.Input); + reduceJob.SetTaskFmrOutputTables(reduceTaskParams.Output); + reduceJob.SetClusterConnections(clusterConnections); + reduceJob.SetYtJobService(jobService); + reduceJob.SetFmrJobType(EFmrJobType::Reduce); + reduceJob.SetReduceOperationSpec(reduceTaskParams.ReduceOperationSpec); } TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task) { diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h index b72bb54ecc7..f99dfa7b552 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h @@ -69,4 +69,14 @@ void FillMapFmrJob( IYtJobService::TPtr jobService ); +void FillReduceFmrJob( + TFmrUserJob& reduceJob, + const TReduceTaskParams& reduceTaskParams, + const std::unordered_map& clusterConnections, + ITableDataServiceDiscovery::TPtr discovery, + TMaybe vanillaInfo, + const TFmrUserJobSettings& userJobSettings, + IYtJobService::TPtr jobService +); + } // namespace NYql diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.cpp new file mode 100644 index 00000000000..465a86fdef9 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.cpp @@ -0,0 +1,133 @@ +#include "yql_yt_reduce_reader.h" + +#include +#include + +namespace NYql::NFmr { + +TReduceReader::TReduceReader( + const std::vector& inputs, + const TSortingColumns& reduceBy +) + : TFmrIndexedBlockReader(inputs) + , ReduceBy_(reduceBy) +{ + for (ui32 i = 0; i < Sources_.size(); ++i) { + Sources_[i].EnsureRow(); + if (Sources_[i].Valid()) { + Heap_.push_back(i); + } + } + + HeapComparator_ = [this](ui32 a, ui32 b) { + return Sources_[b] < Sources_[a]; + }; + + std::make_heap(Heap_.begin(), Heap_.end(), HeapComparator_); +} + +TString TReduceReader::GetReduceKeyFromRow() { + auto& s = Sources_[ActiveSource_]; + TStringBuf rowData = s.Block.GetRowBytes(s.RowIndex); + TParserFragmentListIndex parser(rowData); + parser.Parse(); + auto rowMarkup = parser.MoveAllColumnsRows()[0]; + + TString result; + result.append(1, NYson::NDetail::BeginMapSymbol); + bool firstColumn = true; + THashSet columnsToKeep(ReduceBy_.Columns.begin(), ReduceBy_.Columns.end()); + + for (const auto& col : rowMarkup.Columns) { + if (!columnsToKeep.contains(col.Name)) { + continue; + } + + if (!firstColumn) { + result.append(1, NYson::NDetail::KeyedItemSeparatorSymbol); + } + firstColumn = false; + + const auto& kvRange = col.KeyValueRange; + result.append(rowData.data() + kvRange.StartOffset, kvRange.EndOffset - kvRange.StartOffset); + } + result.append(1, NYson::NDetail::EndMapSymbol); + result.append(1, NYson::NDetail::ListItemSeparatorSymbol); + return result; +} + + +size_t TReduceReader::DoRead(void* buf, size_t len) { + auto out = MakeArrayRef(static_cast(buf), len); + size_t total = 0; + size_t remaining = len; + + while (remaining > 0) { + if (IsKeyMarkerActive_) { + auto keySwitchMarker = TStringBuf(KeySwitchMarker); + Y_ENSURE(KeyMarkerOffset_ <= keySwitchMarker.size(), "key marker offset out of bounds"); + const size_t avail = keySwitchMarker.size() - KeyMarkerOffset_; + const size_t toCopy = std::min(avail, remaining); + + std::copy_n(keySwitchMarker.begin() + KeyMarkerOffset_, toCopy, out.begin() + total); + remaining -= toCopy; + total += toCopy; + KeyMarkerOffset_ += toCopy; + + if (KeyMarkerOffset_ == keySwitchMarker.size()) { + // finished writing marker, should write rows with fixed reduce key. + IsKeyMarkerActive_ = false; + KeyMarkerOffset_ = 0; + continue; + } + } + + if (HasActive_) { + auto& s = Sources_[ActiveSource_]; + TStringBuf row = s.Block.GetRowBytes(s.RowIndex); + + auto reduceKey = GetReduceKeyFromRow(); + + if (reduceKey != CurrentReduceKey_) { + CurrentReduceKey_ = reduceKey; + IsKeyMarkerActive_ = true; + ActiveOffset_ = 0; + continue; + } + + Y_ENSURE(ActiveOffset_ <= row.size(), "Active offset out of bounds"); + + const size_t avail = row.size() - ActiveOffset_; + const size_t toCopy = std::min(avail, remaining); + + std::copy_n(row.begin() + ActiveOffset_, toCopy, out.begin() + total); + remaining -= toCopy; + total += toCopy; + ActiveOffset_ += toCopy; + + if (ActiveOffset_ == row.size()) { + s.Next(); + if (s.Valid()) { + Heap_.push_back(ActiveSource_); + std::push_heap(Heap_.begin(), Heap_.end(), HeapComparator_); + } + HasActive_ = false; + } + + continue; + } + + if (Heap_.empty()) { + break; + } + + std::pop_heap(Heap_.begin(), Heap_.end(), HeapComparator_); + ActiveSource_ = Heap_.back(); + Heap_.pop_back(); + ActiveOffset_ = 0; + HasActive_ = true; + } + return total; +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.h new file mode 100644 index 00000000000..81ac1ff1331 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.h @@ -0,0 +1,33 @@ +#pragma once + +#include "yql_yt_fmr_indexed_block_reader.h" +#include + +namespace NYql::NFmr { + +class TReduceReader final: public TFmrIndexedBlockReader { +public: + TReduceReader( + const std::vector& inputs, + const TSortingColumns& reduceBy + ); + +private: + size_t DoRead(void* buf, size_t len) final; + + TString GetReduceKeyFromRow(); + +private: + std::vector Heap_; // SortBy as comparator. + std::function HeapComparator_; + bool HasActive_ = false; + ui64 ActiveSource_ = 0; + TSortingColumns ReduceBy_; + TString CurrentReduceKey_; + + const TString KeySwitchMarker = GetBinaryYson("<\"key_switch\"=%true;>#;\n"); + bool IsKeyMarkerActive_ = false; + ui64 KeyMarkerOffset_ = 0; +}; + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h index bab6367a676..54e5d54bcf8 100644 --- a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h +++ b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h @@ -31,6 +31,16 @@ public: virtual std::variant SortedUpload(const TSortedUploadTaskParams& params, const std::unordered_map& clusterConnections = {}, std::shared_ptr> cancelFlag = nullptr) = 0; virtual std::variant LocalSort(const TLocalSortTaskParams& params, const std::unordered_map& clusterConnections = {}, std::shared_ptr> cancelFlag = nullptr) = 0; + + virtual std::variant Reduce( + const TReduceTaskParams& params, + const std::unordered_map& clusterConnections = {}, + std::shared_ptr> cancelFlag = nullptr, + const TMaybe& jobEnvironmentDir = Nothing(), + const std::vector& jobFiles = {}, + const std::vector& jobYtResources = {}, + const std::vector& jobFmrResources = {} + ) = 0; }; } // namespace NYql diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make index 3beaf40ddfc..600c60efa05 100644 --- a/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make +++ b/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make @@ -7,6 +7,8 @@ SRCS( PEERDIR( yt/yql/providers/yt/fmr/job_factory/interface yt/yql/providers/yt/fmr/job_factory/impl + yt/yql/providers/yt/fmr/utils/comparator + yt/yql/providers/yt/fmr/utils ) YQL_LAST_ABI_VERSION() diff --git a/yt/yql/providers/yt/fmr/job_launcher/yql_yt_job_launcher.cpp b/yt/yql/providers/yt/fmr/job_launcher/yql_yt_job_launcher.cpp index 47b152e39c0..1e7a3c4d959 100644 --- a/yt/yql/providers/yt/fmr/job_launcher/yql_yt_job_launcher.cpp +++ b/yt/yql/providers/yt/fmr/job_launcher/yql_yt_job_launcher.cpp @@ -68,8 +68,8 @@ std::variant TFmrUserJobLauncher::LaunchJob( InitializeJobEnvironment(*jobEnvironmentDir, jobFiles, jobYtResources, jobFmrResources); - TFile jobStateFile(jobtmpDir.Child("fmrjob.bin"), CreateNew | RdWr); - TFile mapResultStatsFile(jobtmpDir.Child("stats.bin"), CreateNew | RdWr); + TFile jobStateFile(jobtmpDir.Child("fmrjob.bin"), CreateAlways | RdWr); + TFile jobResultStatsFile(jobtmpDir.Child("stats.bin"), CreateAlways | RdWr); if (!TableDataServiceDiscoveryFilePath_.empty()) { TString tmpDirTableDataServiceDiscoveryPath = "tds_discovery.txt"; @@ -85,7 +85,7 @@ std::variant TFmrUserJobLauncher::LaunchJob( job.Save(jobStateFileOutputStream); jobStateFileOutputStream.Flush(); - // execute map in separate process + // execute fmrJob in separate process TShellCommandOptions opts; TStringStream fmrJobOutputStream, fmrJobErrorStream; opts.SetUseShell(false).SetDetachSession(false).SetOutputStream(&fmrJobOutputStream).SetErrorStream(&fmrJobErrorStream); @@ -107,7 +107,7 @@ std::variant TFmrUserJobLauncher::LaunchJob( YQL_CLOG(DEBUG, FastMapReduce) << "Process cerr: " << fmrJobErrorStream.Str(); - TFileInput statsStream(mapResultStatsFile); + TFileInput statsStream(jobResultStatsFile); auto serializedProtoStats = statsStream.ReadAll(); NProto::TStatistics protoStats; protoStats.ParseFromStringOrThrow(serializedProtoStats); diff --git a/yt/yql/providers/yt/fmr/process/ya.make b/yt/yql/providers/yt/fmr/process/ya.make index 92a55453f9a..b05d7bb930e 100644 --- a/yt/yql/providers/yt/fmr/process/ya.make +++ b/yt/yql/providers/yt/fmr/process/ya.make @@ -15,6 +15,7 @@ PEERDIR( yt/yql/providers/yt/fmr/yt_job_service/file yt/yql/providers/yt/fmr/yt_job_service/impl yt/yql/providers/yt/fmr/utils + yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl yt/yql/providers/yt/job ) diff --git a/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.cpp b/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.cpp index e452acb5997..6fe4ccb1fde 100644 --- a/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.cpp +++ b/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.cpp @@ -1,10 +1,12 @@ #include "yql_yt_job_fmr.h" #include #include +#include #include #include #include #include +#include #include #include #include @@ -24,10 +26,11 @@ void TFmrUserJob::Save(IOutputStream& s) const { ClusterConnections_, TableDataServiceDiscoveryFilePath_, YtJobServiceType_, - IsOrdered_, + FmrJobType_, Settings_, TvmSettings_, - VanillaInfo_ + VanillaInfo_, + ReduceOperationSpec_ ); } @@ -39,10 +42,11 @@ void TFmrUserJob::Load(IInputStream& s) { ClusterConnections_, TableDataServiceDiscoveryFilePath_, YtJobServiceType_, - IsOrdered_, + FmrJobType_, Settings_, TvmSettings_, - VanillaInfo_ + VanillaInfo_, + ReduceOperationSpec_ ); } @@ -132,6 +136,45 @@ void TFmrUserJob::FillQueueFromInputTablesUnordered() { } } +void TFmrUserJob::FillQueueFromReduceInput() { + std::vector blockIterators; + YQL_ENSURE(ReduceOperationSpec_.Defined()); + auto reduceBy = ReduceOperationSpec_->ReduceBy; + auto sortBy = ReduceOperationSpec_->SortBy; + for (const auto& inputTableRef : InputTables_.Inputs) { + if (auto fmrInput = std::get_if(&inputTableRef)) { + blockIterators.push_back(MakeIntrusive( + fmrInput->TableId, + fmrInput->TableRanges, + TableDataService_, + sortBy.Columns, + sortBy.SortOrders, + fmrInput->Columns, + fmrInput->SerializedColumnGroups, + fmrInput->IsFirstRowInclusive, + fmrInput->IsLastRowInclusive, + fmrInput->FirstRowKeys, + fmrInput->LastRowKeys + )); + } else { + ythrow TFmrNonRetryableJobException() << "YtTables unsupported inside Reduce task for now"; + } + } + ThreadPool_->SafeAddFunc([this, blockIterators, reduceBy]() mutable { + try { + NYT::TRawTableReaderPtr reduceReader = MakeIntrusive(blockIterators, reduceBy); + auto queueTableWriter = MakeIntrusive(UnionInputTablesQueue_); + ParseRecords(reduceReader, queueTableWriter, 1, 1000000, CancelFlag_); + queueTableWriter->Flush(); + for (ui64 i = 0; i < InputTables_.Inputs.size(); ++i) { + UnionInputTablesQueue_->NotifyInputFinished(i); + } + } catch (...) { + UnionInputTablesQueue_->SetException(CurrentExceptionMessage()); + } + }); +} + void TFmrUserJob::InitializeFmrUserJob() { if (!YtJobService_) { YQL_ENSURE(YtJobServiceType_ == "native" || YtJobServiceType_ == "file"); @@ -214,10 +257,12 @@ TStatistics TFmrUserJob::GetStatistics(const TFmrUserJobOptions& options) { TStatistics TFmrUserJob::DoFmrJob(const TFmrUserJobOptions& options) { InitializeFmrUserJob(); - if (IsOrdered_) { + if (FmrJobType_ == EFmrJobType::OrderedMap) { FillQueueFromInputTablesOrdered(); - } else { + } else if (FmrJobType_ == EFmrJobType::Map) { FillQueueFromInputTablesUnordered(); + } else if (FmrJobType_ == EFmrJobType::Reduce) { + FillQueueFromReduceInput(); } TYqlUserJobBase::Do(); return GetStatistics(options); diff --git a/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h b/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h index 1956f99b0cd..c4c60c1ece4 100644 --- a/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h +++ b/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h @@ -40,7 +40,6 @@ struct TFmrUserJobOptions { bool WriteStatsToFile = false; }; - class TFmrUserJob: public TYqlUserJobBase { public: TFmrUserJob(); @@ -88,8 +87,8 @@ public: VanillaInfo_ = std::move(info); } - void SetIsOrdered(bool isOrdered) { - IsOrdered_ = isOrdered; + void SetFmrJobType(EFmrJobType jobType) { + FmrJobType_ = jobType; } void SetSettings(const TFmrUserJobSettings& settings) { @@ -100,6 +99,10 @@ public: TvmSettings_ = tvmSettings; } + void SetReduceOperationSpec(const TReduceOperationSpec& reduceOperationSpec) { + ReduceOperationSpec_ = reduceOperationSpec; + } + void Save(IOutputStream& s) const override; void Load(IInputStream& s) override; @@ -116,6 +119,7 @@ private: void FillQueueFromSingleInputTable(ui64 tableIndex); void FillQueueFromInputTablesUnordered(); void FillQueueFromInputTablesOrdered(); + void FillQueueFromReduceInput(); void InitializeFmrUserJob(); @@ -127,10 +131,11 @@ private: std::unordered_map ClusterConnections_; TString TableDataServiceDiscoveryFilePath_; TString YtJobServiceType_; // file or native - bool IsOrdered_ = false; + EFmrJobType FmrJobType_ = EFmrJobType::Map; TFmrUserJobSettings Settings_ = TFmrUserJobSettings(); TMaybe TvmSettings_ = Nothing(); TMaybe VanillaInfo_ = Nothing(); + TMaybe ReduceOperationSpec_; // End of serializable part // Non-serialized: set only for in-process execution via SetTableDataServiceDiscovery. diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto index d670ad03e3f..9f4712adcb5 100644 --- a/yt/yql/providers/yt/fmr/proto/request_options.proto +++ b/yt/yql/providers/yt/fmr/proto/request_options.proto @@ -107,8 +107,9 @@ message TFmrTableInputRef { repeated string Columns = 3; optional string ColumnGroups = 4; optional bool IsFirstRowInclusive = 5; - optional bytes FirstRowKeys = 6; - optional bytes LastRowKeys = 7; + optional bool IsLastRowInclusive = 6; + optional bytes FirstRowKeys = 7; + optional bytes LastRowKeys = 8; } message TSortingColumns { @@ -233,18 +234,49 @@ message TMergeTaskParams { TFmrTableOutputRef Output = 2; } +enum EFmrJobType { + MAP = 0; + ORDERED_MAP = 1; + REDUCE = 2; +} + +enum EReduceType { + SORTED_REDUCE = 0; + JOIN_REDUCE = 1; +} + message TMapOperationParams { repeated TOperationTableRef Input = 1; repeated TFmrTableRef Output = 2; bytes SerializedMapJobState = 3; - bool IsOrdered = 4; + EFmrJobType MapJobType = 4; } message TMapTaskParams { TTaskTableInputRef Input = 1; repeated TFmrTableOutputRef Output = 2; bytes SerializedMapJobState = 3; - bool IsOrdered = 4; + EFmrJobType MapJobType = 4; +} + +message TReduceOperationSpec { + TSortingColumns ReduceBy = 1; + TSortingColumns SortBy = 2; + EReduceType ReduceType = 3; +} + +message TReduceOperationParams { + repeated TOperationTableRef Input = 1; + repeated TFmrTableRef Output = 2; + bytes SerializedReduceJobState = 3; + TReduceOperationSpec ReduceOperationSpec = 4; +} + +message TReduceTaskParams { + TTaskTableInputRef Input = 1; + repeated TFmrTableOutputRef Output = 2; + bytes SerializedReduceJobState = 3; + TReduceOperationSpec ReduceOperationSpec = 4; } message TSortOperationParams { @@ -266,6 +298,7 @@ message TOperationParams { TSortedUploadOperationParams SortedUploadOperationParams = 5; TSortedMergeOperationParams SortedMergeOperationParams = 6; TSortOperationParams SortOperationParams = 7; + TReduceOperationParams ReduceOperationParams = 8; } } @@ -278,6 +311,7 @@ message TTaskParams { TSortedUploadTaskParams SortedUploadTaskParams = 5; TSortedMergeTaskParams SortedMergeTaskParams = 6; TLocalSortTaskParams LocalSortTaskParams = 7; + TReduceTaskParams ReduceTaskParams = 8; } } diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp index 43cf0963546..a2311538ac6 100644 --- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp @@ -183,6 +183,9 @@ NProto::TFmrTableInputRef FmrTableInputRefToProto(const TFmrTableInputRef& fmrTa if (fmrTableInputRef.IsFirstRowInclusive) { protoFmrTableInputRef.SetIsFirstRowInclusive(*fmrTableInputRef.IsFirstRowInclusive); } + if (fmrTableInputRef.IsLastRowInclusive) { + protoFmrTableInputRef.SetIsLastRowInclusive(*fmrTableInputRef.IsLastRowInclusive); + } if (fmrTableInputRef.FirstRowKeys) { protoFmrTableInputRef.SetFirstRowKeys(*fmrTableInputRef.FirstRowKeys); } @@ -208,6 +211,9 @@ TFmrTableInputRef FmrTableInputRefFromProto(const NProto::TFmrTableInputRef& pro fmrTableInputRef.IsFirstRowInclusive = protoFmrTableInputRef.HasIsFirstRowInclusive() ? TMaybe(protoFmrTableInputRef.GetIsFirstRowInclusive()) : Nothing(); + fmrTableInputRef.IsLastRowInclusive = protoFmrTableInputRef.HasIsLastRowInclusive() + ? TMaybe(protoFmrTableInputRef.GetIsLastRowInclusive()) + : Nothing(); fmrTableInputRef.FirstRowKeys = protoFmrTableInputRef.HasFirstRowKeys() ? TMaybe(protoFmrTableInputRef.GetFirstRowKeys()) : Nothing(); fmrTableInputRef.LastRowKeys = protoFmrTableInputRef.HasLastRowKeys() ? TMaybe(protoFmrTableInputRef.GetLastRowKeys()) : Nothing(); return fmrTableInputRef; @@ -594,7 +600,7 @@ NProto::TMapOperationParams MapOperationParamsToProto(const TMapOperationParams& protoMapOperationParams.AddOutput()->Swap(&protoFmrTableRef); } protoMapOperationParams.SetSerializedMapJobState(mapOperationParams.SerializedMapJobState); - protoMapOperationParams.SetIsOrdered(mapOperationParams.IsOrdered); + protoMapOperationParams.SetMapJobType(static_cast(mapOperationParams.MapJobType)); return protoMapOperationParams; } @@ -607,7 +613,7 @@ TMapOperationParams MapOperationParamsFromProto(const NProto::TMapOperationParam for (auto& protoFmrTableRef: protoMapOperationParams.GetOutput()) { outputTables.emplace_back(FmrTableRefFromProto(protoFmrTableRef)); } - return TMapOperationParams{.Input = inputTables, .Output = outputTables, .SerializedMapJobState = protoMapOperationParams.GetSerializedMapJobState(), .IsOrdered = protoMapOperationParams.GetIsOrdered()}; + return TMapOperationParams{.Input = inputTables, .Output = outputTables, .SerializedMapJobState = protoMapOperationParams.GetSerializedMapJobState(), .MapJobType = static_cast(protoMapOperationParams.GetMapJobType())}; } NProto::TMapTaskParams MapTaskParamsToProto(const TMapTaskParams& mapTaskParams) { @@ -619,7 +625,7 @@ NProto::TMapTaskParams MapTaskParamsToProto(const TMapTaskParams& mapTaskParams) protoMapTaskParams.AddOutput()->Swap(&protoFmrTableOutputRef); } protoMapTaskParams.SetSerializedMapJobState(mapTaskParams.SerializedMapJobState); - protoMapTaskParams.SetIsOrdered(mapTaskParams.IsOrdered); + protoMapTaskParams.SetMapJobType(static_cast(mapTaskParams.MapJobType)); return protoMapTaskParams; } @@ -632,10 +638,88 @@ TMapTaskParams MapTaskParamsFromProto(const NProto::TMapTaskParams& protoMapTask } mapTaskParams.Output = outputTables; mapTaskParams.SerializedMapJobState = protoMapTaskParams.GetSerializedMapJobState(); - mapTaskParams.IsOrdered = protoMapTaskParams.GetIsOrdered(); + mapTaskParams.MapJobType = static_cast(protoMapTaskParams.GetMapJobType()); return mapTaskParams; } +NProto::TReduceOperationSpec ReduceOperationSpecToProto(const TReduceOperationSpec& reduceOperationSpec) { + NProto::TReduceOperationSpec protoReduceOperationSpec; + auto protoReduceBy = SortingColumnsToProto(reduceOperationSpec.ReduceBy); + protoReduceOperationSpec.MutableReduceBy()->Swap(&protoReduceBy); + auto protoSortBy = SortingColumnsToProto(reduceOperationSpec.SortBy); + protoReduceOperationSpec.MutableSortBy()->Swap(&protoSortBy); + protoReduceOperationSpec.SetReduceType(static_cast(reduceOperationSpec.ReduceType)); + return protoReduceOperationSpec; +} + +TReduceOperationSpec ReduceOperationSpecFromProto (const NProto::TReduceOperationSpec& protoReduceOperationSpec) { + TReduceOperationSpec reduceOperationSpec; + reduceOperationSpec.ReduceBy = SortingColumnsFromProto(protoReduceOperationSpec.GetReduceBy()); + reduceOperationSpec.SortBy = SortingColumnsFromProto(protoReduceOperationSpec.GetSortBy()); + reduceOperationSpec.ReduceType = static_cast(protoReduceOperationSpec.GetReduceType()); + return reduceOperationSpec; +} + +NProto::TReduceOperationParams ReduceOperationParamsToProto(const TReduceOperationParams& reduceOperationParams) { + NProto::TReduceOperationParams protoReduceOperationParams; + for (auto& operationTableRef: reduceOperationParams.Input) { + auto protoOperationTableRef = OperationTableRefToProto(operationTableRef); + protoReduceOperationParams.AddInput()->Swap(&protoOperationTableRef); + } + for (auto& fmrTableRef: reduceOperationParams.Output) { + auto protoFmrTableRef = FmrTableRefToProto(fmrTableRef); + protoReduceOperationParams.AddOutput()->Swap(&protoFmrTableRef); + } + protoReduceOperationParams.SetSerializedReduceJobState(reduceOperationParams.SerializedReduceJobState); + auto protoReduceOperationSpec = ReduceOperationSpecToProto(reduceOperationParams.ReduceOperationSpec); + protoReduceOperationParams.MutableReduceOperationSpec()->Swap(&protoReduceOperationSpec); + return protoReduceOperationParams; +} + +TReduceOperationParams ReduceOperationParamsFromProto(const NProto::TReduceOperationParams& protoReduceOperationParams) { + std::vector inputTables; + std::vector outputTables; + for (auto& protoOperationTableRef: protoReduceOperationParams.GetInput()) { + inputTables.emplace_back(OperationTableRefFromProto(protoOperationTableRef)); + } + for (auto& protoFmrTableRef: protoReduceOperationParams.GetOutput()) { + outputTables.emplace_back(FmrTableRefFromProto(protoFmrTableRef)); + } + return TReduceOperationParams{ + .Input = inputTables, + .Output = outputTables, + .SerializedReduceJobState = protoReduceOperationParams.GetSerializedReduceJobState(), + .ReduceOperationSpec = ReduceOperationSpecFromProto(protoReduceOperationParams.GetReduceOperationSpec()) + }; +} + +NProto::TReduceTaskParams ReduceTaskParamsToProto(const TReduceTaskParams& reduceTaskParams) { + NProto::TReduceTaskParams protoReduceTaskParams; + auto protoTaskTableInputRef = TaskTableInputRefToProto(reduceTaskParams.Input); + protoReduceTaskParams.MutableInput()->Swap(&protoTaskTableInputRef); + for (auto& fmrTableOutputRef: reduceTaskParams.Output) { + auto protoFmrTableOutputRef = FmrTableOutputRefToProto(fmrTableOutputRef); + protoReduceTaskParams.AddOutput()->Swap(&protoFmrTableOutputRef); + } + protoReduceTaskParams.SetSerializedReduceJobState(reduceTaskParams.SerializedReduceJobState); + auto protoReduceOperationSpec = ReduceOperationSpecToProto(reduceTaskParams.ReduceOperationSpec); + protoReduceTaskParams.MutableReduceOperationSpec()->Swap(&protoReduceOperationSpec); + return protoReduceTaskParams; +} + +TReduceTaskParams ReduceTaskParamsFromProto(const NProto::TReduceTaskParams& protoReduceTaskParams) { + TReduceTaskParams reduceTaskParams; + reduceTaskParams.Input = TaskTableInputRefFromProto(protoReduceTaskParams.GetInput()); + std::vector outputTables; + for (auto& protoFmrTableOutputRef: protoReduceTaskParams.GetOutput()) { + outputTables.emplace_back(FmrTableOutputRefFromProto(protoFmrTableOutputRef)); + } + reduceTaskParams.Output = outputTables; + reduceTaskParams.SerializedReduceJobState = protoReduceTaskParams.GetSerializedReduceJobState(); + reduceTaskParams.ReduceOperationSpec = ReduceOperationSpecFromProto(protoReduceTaskParams.GetReduceOperationSpec()); + return reduceTaskParams; +} + NProto::TSortOperationParams SortOperationParamsToProto(const TSortOperationParams& sortOperationParams) { NProto::TSortOperationParams protoSortOperationParams; for (size_t i = 0; i < sortOperationParams.Input.size(); ++i) { @@ -699,6 +783,9 @@ NProto::TOperationParams OperationParamsToProto(const TOperationParams& operatio } else if (auto* SortOperationParamsPtr = std::get_if(&operationParams)) { NProto::TSortOperationParams protoSortOperationParams = SortOperationParamsToProto(*SortOperationParamsPtr); protoOperationParams.MutableSortOperationParams()->Swap(&protoSortOperationParams); + } else if (auto* ReduceOperationParamsPtr = std::get_if(&operationParams)) { + NProto::TReduceOperationParams protoReduceOperationParams = ReduceOperationParamsToProto(*ReduceOperationParamsPtr); + protoOperationParams.MutableReduceOperationParams()->Swap(&protoReduceOperationParams); } return protoOperationParams; } @@ -718,6 +805,8 @@ TOperationParams OperationParamsFromProto(const NProto::TOperationParams& protoO return SortedMergeOperationParamsFromProto(protoOperationParams.GetSortedMergeOperationParams()); } else if (protoOperationParams.HasSortOperationParams()) { return SortOperationParamsFromProto(protoOperationParams.GetSortOperationParams()); + } else if (protoOperationParams.HasReduceOperationParams()) { + return ReduceOperationParamsFromProto(protoOperationParams.GetReduceOperationParams()); } return TOperationParams(); } @@ -771,6 +860,9 @@ NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams) { } else if (auto* LocalSortTaskParamsPtr = std::get_if(&taskParams)) { NProto::TLocalSortTaskParams protoLocalSortTaskParams = LocalSortTaskParamsToProto(*LocalSortTaskParamsPtr); protoTaskParams.MutableLocalSortTaskParams()->Swap(&protoLocalSortTaskParams); + } else if (auto* reduceTaskParamsPtr = std::get_if(&taskParams)) { + NProto::TReduceTaskParams protoReduceTaskParams = ReduceTaskParamsToProto(*reduceTaskParamsPtr); + protoTaskParams.MutableReduceTaskParams()->Swap(&protoReduceTaskParams); } return protoTaskParams; } @@ -791,6 +883,8 @@ TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) { taskParams = SortedMergeTaskParamsFromProto(protoTaskParams.GetSortedMergeTaskParams()); } else if (protoTaskParams.HasLocalSortTaskParams()) { taskParams = LocalSortTaskParamsFromProto(protoTaskParams.GetLocalSortTaskParams()); + } else if (protoTaskParams.HasReduceTaskParams()) { + taskParams = ReduceTaskParamsFromProto(protoTaskParams.GetReduceTaskParams()); } return taskParams; } diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h index 85794aa467b..9d0257fa923 100644 --- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h @@ -109,6 +109,18 @@ NProto::TMapTaskParams MapTaskParamsToProto(const TMapTaskParams& mapTaskParams) TMapTaskParams MapTaskParamsFromProto(const NProto::TMapTaskParams& protoMapTaskParams); +NProto::TReduceOperationSpec ReduceOperationSpecToProto(const TReduceOperationSpec& reduceOperationSpec); + +TReduceOperationSpec ReduceOperationSpecFromProto (const NProto::TReduceOperationSpec& protoReduceOperationSpec); + +NProto::TReduceOperationParams ReduceOperationParamsToProto(const TReduceOperationParams& reduceOperationParams); + +TReduceOperationParams ReduceOperationParamsFromProto(const NProto::TReduceOperationParams& protoReduceOperationParams); + +NProto::TReduceTaskParams ReduceTaskParamsToProto(const TReduceTaskParams& reduceTaskParams); + +TReduceTaskParams ReduceTaskParamsFromProto(const NProto::TReduceTaskParams& protoReduceTaskParams); + NProto::TSortOperationParams SortOperationParamsToProto(const TSortOperationParams& sortOperationParams); TSortOperationParams SortOperationParamsFromProto(const NProto::TSortOperationParams& protoSortOperationParams); diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp index 10c638d1693..dd462570908 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp @@ -166,6 +166,7 @@ void TFmrTableInputRef::Save(IOutputStream* buffer) const { Columns, SerializedColumnGroups, IsFirstRowInclusive, + IsLastRowInclusive, FirstRowKeys, LastRowKeys ); @@ -179,6 +180,7 @@ void TFmrTableInputRef::Load(IInputStream* buffer) { Columns, SerializedColumnGroups, IsFirstRowInclusive, + IsLastRowInclusive, FirstRowKeys, LastRowKeys ); @@ -287,6 +289,24 @@ NYT::TRichYPath DeserializeRichPath(const TString& serializedRichPath) { return richPath; } +void TReduceOperationSpec::Save(IOutputStream* buffer) const { + ::SaveMany( + buffer, + ReduceBy, + SortBy, + ReduceType + ); +} + +void TReduceOperationSpec::Load(IInputStream* buffer) { + ::LoadMany( + buffer, + ReduceBy, + SortBy, + ReduceType + ); +} + } // namespace NYql::NFmr ////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h index a3976cba323..b1f37585424 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h @@ -39,7 +39,8 @@ enum EOperationType { Map = 4, SortedUpload = 5, SortedMerge = 6, - Sort = 7 + Sort = 7, + Reduce = 8 }; enum class ETaskType { @@ -50,7 +51,8 @@ enum class ETaskType { Map = 4, SortedUpload = 5, SortedMerge = 6, - LocalSort = 7 + LocalSort = 7, + Reduce = 8 }; enum class EFmrComponent { @@ -202,6 +204,7 @@ struct TFmrTableInputRef { TString SerializedColumnGroups = TString(); TMaybe IsFirstRowInclusive; + TMaybe IsLastRowInclusive; TMaybe FirstRowKeys; // Binary YSON MAP TMaybe LastRowKeys; // Binary YSON MAP @@ -416,18 +419,24 @@ struct TSortedMergeTaskParams { TFmrTableOutputRef Output; }; +enum class EFmrJobType { + Map, + OrderedMap, + Reduce +}; + struct TMapOperationParams { std::vector Input; std::vector Output; TString SerializedMapJobState; - bool IsOrdered = false; + EFmrJobType MapJobType; }; struct TMapTaskParams { TTaskTableInputRef Input; std::vector Output; TString SerializedMapJobState; - bool IsOrdered; + EFmrJobType MapJobType; }; struct TSortOperationParams { @@ -440,10 +449,38 @@ struct TLocalSortTaskParams { TFmrTableOutputRef Output; }; +enum EReduceType { + SortedReduce, + JoinReduce +}; + +struct TReduceOperationSpec { + TSortingColumns ReduceBy; + TSortingColumns SortBy; + EReduceType ReduceType; + + void Save(IOutputStream* buffer) const; + void Load(IInputStream* buffer); +}; + +struct TReduceOperationParams { + std::vector Input; + std::vector Output; + TString SerializedReduceJobState; + TReduceOperationSpec ReduceOperationSpec; +}; + +struct TReduceTaskParams { + //std::vector Input; // all reduce inputs should be in fmr. + TTaskTableInputRef Input; + std::vector Output; + TString SerializedReduceJobState; + TReduceOperationSpec ReduceOperationSpec; +}; -using TOperationParams = std::variant; +using TOperationParams = std::variant; -using TTaskParams = std::variant; +using TTaskParams = std::variant; struct TFileInfo { TString LocalPath; // Path to local file, filled in worker. diff --git a/yt/yql/providers/yt/fmr/test_tools/sorted_partitioner/yql_yt_sorted_partitioner_test_tools.cpp b/yt/yql/providers/yt/fmr/test_tools/sorted_partitioner/yql_yt_sorted_partitioner_test_tools.cpp index a125458adfd..1c3f899a088 100644 --- a/yt/yql/providers/yt/fmr/test_tools/sorted_partitioner/yql_yt_sorted_partitioner_test_tools.cpp +++ b/yt/yql/providers/yt/fmr/test_tools/sorted_partitioner/yql_yt_sorted_partitioner_test_tools.cpp @@ -197,6 +197,7 @@ TFingerprintCounts CountTaskFingerprintsFromFiles( TYtBlockIteratorSettings{}, TVector(keyColumns.size(), ESortOrder::Ascending), fmr.IsFirstRowInclusive, + fmr.IsLastRowInclusive, fmr.FirstRowKeys, fmr.LastRowKeys )); diff --git a/yt/yql/providers/yt/fmr/utils/comparator/ya.make b/yt/yql/providers/yt/fmr/utils/comparator/ya.make index 335fcd32530..f253af0d001 100644 --- a/yt/yql/providers/yt/fmr/utils/comparator/ya.make +++ b/yt/yql/providers/yt/fmr/utils/comparator/ya.make @@ -12,6 +12,9 @@ PEERDIR( yql/essentials/utils ) - YQL_LAST_ABI_VERSION() +YQL_LAST_ABI_VERSION() + +GENERATE_ENUM_SERIALIZATION(yql_yt_binary_yson_compare_impl.h) + END() diff --git a/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.cpp b/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.cpp index 386b623ac4c..be69a701d0a 100644 --- a/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.cpp +++ b/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.cpp @@ -7,6 +7,22 @@ int CompareKeyRows(const TFmrTableKeysBoundary& lhs, const TFmrTableKeysBoundary return CompareKeyRowsAcrossYsonBlocks(lhs.Row, lhs.Markup, rhs.Row, rhs.Markup, lhs.SortOrders); } +void TSortingColumns::Save(IOutputStream* buffer) const { + ::SaveMany( + buffer, + Columns, + SortOrders + ); +} + +void TSortingColumns::Load(IInputStream* buffer) { + ::LoadMany( + buffer, + Columns, + SortOrders + ); +} + int TBinaryYsonComparator::CompareYsonValues(TColumnOffsetRange lhs, TColumnOffsetRange rhs) const { Y_ENSURE(lhs.IsValid(), "Invalid column offset range"); Y_ENSURE(rhs.IsValid(), "Invalid column offset range"); diff --git a/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.h b/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.h index 09d40be169d..e665370c5ea 100644 --- a/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.h +++ b/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.h @@ -14,6 +14,9 @@ struct TSortingColumns { std::vector Columns; std::vector SortOrders; bool operator==(const TSortingColumns&) const = default; + + void Save(IOutputStream* buffer) const; + void Load(IInputStream* buffer); }; struct TFmrTableKeysBoundary { diff --git a/yt/yql/providers/yt/fmr/utils/ya.make b/yt/yql/providers/yt/fmr/utils/ya.make index 2d110be7237..b5c5ed74de6 100644 --- a/yt/yql/providers/yt/fmr/utils/ya.make +++ b/yt/yql/providers/yt/fmr/utils/ya.make @@ -29,6 +29,7 @@ PEERDIR( yt/yql/providers/yt/codec yt/yql/providers/yt/lib/yson_helpers yql/essentials/utils + yt/yql/providers/yt/fmr/test_tools/yson ) YQL_LAST_ABI_VERSION() diff --git a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp index 761d61fbaa7..3ef22f42b2e 100644 --- a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp +++ b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp @@ -34,7 +34,7 @@ void ParseRecordsImpl( break; } curYsonRow.clear(); - CopyYson(cmd, inputBuf, curYsonRow); + CopyYsonWithAttrs(cmd, inputBuf, curYsonRow); bool needBreak = false; if (!inputBuf.TryRead(cmd)) { needBreak = true; diff --git a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h index b5aedb6e877..4c1fd4e03c6 100644 --- a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h +++ b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h @@ -13,7 +13,8 @@ void ParseRecords( ui64 blockCount, ui64 blockSize, std::shared_ptr> cancelFlag, - const TMaybe& writeMutex = Nothing()); + const TMaybe& writeMutex = Nothing() +); void StreamBulkToYtDistributed( NYT::TRawTableReaderPtr reader, diff --git a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.cpp b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.cpp index b96c7939681..164e83fed5d 100644 --- a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.cpp +++ b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.cpp @@ -18,6 +18,7 @@ TTableDataServiceBlockIterator::TTableDataServiceBlockIterator( std::vector neededColumns, TString serializedColumnGroupsSpec, TMaybe isFirstRowKeysInclusive, + TMaybe isLastRowKeysInclusive, TMaybe firstRowKeys, TMaybe lastRowKeys, ui64 readAheadChunks @@ -45,6 +46,8 @@ TTableDataServiceBlockIterator::TTableDataServiceBlockIterator( } if (lastRowKeys) { LastBoundary_ = TFmrTableKeysBoundary(*lastRowKeys, KeyColumns_, SortOrders_); + Y_ENSURE(isLastRowKeysInclusive.Defined(), "isLastRowKeysInclusive must be defined for Last Boundary"); + IsLastBoundInclusive_ = *isLastRowKeysInclusive; } if (SerializedColumnGroupsSpec_.empty()) { @@ -152,6 +155,8 @@ bool TTableDataServiceBlockIterator::RowInKeyBounds(const TString& blob, const T ); if (c > 0) { // if row > last boundary return false; + } else if (!IsLastBoundInclusive_ && c == 0) { + return false; } } return true; diff --git a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.h b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.h index 0bf0a4e0268..4ab202869fc 100644 --- a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.h +++ b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.h @@ -24,6 +24,7 @@ public: std::vector neededColumns, TString serializedColumnGroupsSpec = {}, TMaybe isFirstRowKeysInclusive = Nothing(), + TMaybe isLastRowKeysInclusive = Nothing(), TMaybe firstRowKeys = Nothing(), TMaybe lastRowKeys = Nothing(), ui64 readAheadChunks = 4 @@ -67,6 +68,7 @@ private: TMaybe FirstBoundary_; TMaybe LastBoundary_; bool IsFirstBoundInclusive_ = true; + bool IsLastBoundInclusive_ = true; std::vector GroupNamesToRead_; diff --git a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.cpp b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.cpp index f3ae9cb8a54..70f256aa45f 100644 --- a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.cpp +++ b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.cpp @@ -12,6 +12,7 @@ TYtBlockIterator::TYtBlockIterator( TYtBlockIteratorSettings settings, std::vector sortOrders, TMaybe isFirstRowKeysInclusive, + TMaybe isLastRowKeysInclusive, TMaybe firstRowKeys, TMaybe lastRowKeys ) @@ -34,6 +35,8 @@ TYtBlockIterator::TYtBlockIterator( } if (lastRowKeys) { LastBoundary_ = TFmrTableKeysBoundary(*lastRowKeys, KeyColumns_, SortOrders_); + Y_ENSURE(isLastRowKeysInclusive.Defined(), "isLastRowKeysInclusive must be defined for Last Boundary"); + IsLastBoundInclusive_ = *isLastRowKeysInclusive; } } @@ -64,6 +67,8 @@ bool TYtBlockIterator::RowInKeyBounds(const TString& blob, const TRowIndexMarkup ); if (c > 0) { // if row > last bound return false; + } else if (!IsLastBoundInclusive_ && c == 0) { // if row == first bound + return false; } } return true; diff --git a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.h b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.h index d9cc9c21e52..dd788cc93fe 100644 --- a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.h +++ b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.h @@ -27,6 +27,7 @@ public: TYtBlockIteratorSettings settings, std::vector sortOrders = {}, TMaybe isFirstRowKeysInclusive = Nothing(), + TMaybe isLastRowKeysInclusive = Nothing(), TMaybe firstRowKeys = Nothing(), TMaybe lastRowKeys = Nothing() ); @@ -55,6 +56,7 @@ private: TMaybe FirstBoundary_; TMaybe LastBoundary_; bool IsFirstBoundInclusive_ = true; + bool IsLastBoundInclusive_ = true; }; } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_job_service/file/ut/ya.make b/yt/yql/providers/yt/fmr/yt_job_service/file/ut/ya.make index 884cda6f788..a7e0c544d1e 100644 --- a/yt/yql/providers/yt/fmr/yt_job_service/file/ut/ya.make +++ b/yt/yql/providers/yt/fmr/yt_job_service/file/ut/ya.make @@ -7,6 +7,8 @@ SRCS( PEERDIR( yt/yql/providers/yt/fmr/yt_job_service/file yt/yql/providers/yt/gateway/file + yt/yql/providers/yt/fmr/utils/comparator + yt/yql/providers/yt/fmr/utils ) YQL_LAST_ABI_VERSION() diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp index 333d8aa457a..b016e92ff5f 100644 --- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -504,6 +505,8 @@ public: future = DoMap(op.Cast(), execCtx, ctx); } else if (auto op = opBase.Maybe()) { future = DoSort(execCtx); + } else if (auto op = opBase.Maybe()) { + future = DoReduce(op.Cast(), execCtx, ctx); } else { // We don't support this operation return UploadFmrInputsAndForwardToUnderlyingGateway(execCtx, node, ctx, std::move(options), nodePos); @@ -2113,8 +2116,6 @@ private: auto outputTable = outputTables[0]; TString outputCluster = execCtx->Cluster_; - - auto outputTableColumnGroups = GetOutputTablesColumnGroups(execCtx); TFmrTableId outputTableFmrId(outputCluster, outputTable.Path); auto columnGroupSpec = outputTableColumnGroups[0]; @@ -2223,6 +2224,93 @@ private: return GetRunningOperationFuture(sortedMergeOperationRequest, sessionId); } + TFuture GetUploadFilesToDistributedCacheFuture( + const TExecContextSimple::TPtr& execCtx, + std::shared_ptr fmrJob, + TString& lambdaCode, + std::vector& filesToUpload, + std::vector ytResources, + std::vector& fmrResources + ) { + if (!FmrServices_->FileUploadService) { + // For now, logic for file gateway is not implemented yet, and fileUpload service is not set. + // TODO (@cdzyura171) - all udfs should be executed locally, use TFileLabmdaBuilder and file transformer. + return MakeFuture(); + } + YQL_ENSURE(UrlMapper_ && Clusters_); + + TString sessionId = execCtx->GetSessionId(); + + execCtx->MakeUserFiles(); + auto tmpFiles = MakeIntrusive(execCtx->FileStorage_->GetTemp()); + + auto client = execCtx->CreateYtClient(execCtx->Options_.Config()); + + auto downloader = MakeYtNativeFileDownloader(execCtx->Gateway, sessionId, execCtx->Cluster_, execCtx->Options_.Config(), client, tmpFiles); + TTransformerFiles transformerFiles; + { + TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),execCtx->FunctionRegistry_->SupportsSizedAllocators()); + alloc.SetLimit(execCtx->Options_.Config()->DefaultCalcMemoryLimit.Get().GetOrElse(0)); + TMapJobBuilder jobBuilder; + // TODO - this function is the same for map and reduce, make function with template builder argument instead of method. + transformerFiles = jobBuilder.UpdateAndSetMapLambda(alloc, execCtx, downloader, lambdaCode, fmrJob.get()); + } + + for (auto& fileInfo: transformerFiles.LocalFiles) { + filesToUpload.emplace_back(TFileInfo{ + .LocalPath = fileInfo.first, + .Md5Key = fileInfo.second.Hash, + .Alias = TFsPath(fileInfo.first).GetName() // uniqueId + }); + } + for (auto& fileInfo: transformerFiles.DeferredUdfFiles) { + filesToUpload.emplace_back(TFileInfo{.LocalPath = fileInfo.first, .Md5Key = fileInfo.second.Hash}); + } + + auto remoteFilesClusterConnection = GetTableClusterConnection(execCtx->Cluster_, sessionId, execCtx->Options_.Config()); + + for (auto& richPath: transformerFiles.RemoteFiles) { + // Remote files all should have the same cluster, and GatewayTransformer clears it from richPaths, so we need to fill it. + richPath.Cluster(execCtx->Cluster_); + + // Checking in case remotePath is a table which is already inserted in fmr. + TFmrTableId fmrTableId(richPath); + auto fmrTablePresenceStatus = GetTablePresenceStatus(fmrTableId, sessionId); + if (fmrTablePresenceStatus == ETablePresenceStatus::OnlyInFmr || fmrTablePresenceStatus == ETablePresenceStatus::Both) { + TFmrTableRef fmrTableRef{.FmrTableId = fmrTableId}; + fmrTableRef.SerializedColumnGroups = GetColumnGroupSpec(fmrTableRef.FmrTableId, sessionId); + if (!richPath.Columns_.Empty()) { + std::vector neededColumns(richPath.Columns_->Parts_.begin(), richPath.Columns_->Parts_.end()); + fmrTableRef.Columns = neededColumns; + } + + YQL_ENSURE(richPath.FileName_.Defined()); // uniqueId, filled in transformer. + fmrResources.emplace_back(TFmrResourceOperationInfo{.FmrTable = fmrTableRef, .Alias = *richPath.FileName_}); + continue; + } + + // adding remotePath info to list of ytResources to download in jobs. + + TYtResourceInfo ytResourceInfo{.RichPath = richPath}; + ytResourceInfo.YtServerName = remoteFilesClusterConnection.YtServerName; + if (remoteFilesClusterConnection.Token.Defined()) { + ytResourceInfo.Token = *remoteFilesClusterConnection.Token; + } + ytResources.emplace_back(ytResourceInfo); + } + + for (auto& fileInfo: filesToUpload) { + for (auto& [udfModule, udfPrefix]: transformerFiles.JobUdfs) { + if (fileInfo.Alias.empty() && fileInfo.LocalPath.EndsWith(udfModule.substr(2))) { + YQL_CLOG(DEBUG, FastMapReduce) << "Setting file alias " << udfModule << " for udf with path " << fileInfo.LocalPath; + fileInfo.Alias = udfModule; + } + } + } + + return UploadFilesToDistributedCache(filesToUpload); + } + TFuture DoMap( TYtMap map, const TExecContextSimple::TPtr& execCtx, @@ -2237,7 +2325,6 @@ private: auto [mapInputTables, clusterConnections] = GetInputTablesAndConnections(execCtx->InputTables_, sessionId, execCtx->Options_.Config()); - auto mapJob = std::make_shared(); TMapJobBuilder mapJobBuilder; @@ -2251,96 +2338,15 @@ private: bool useSkiff = false; bool forceYsonInputFormat = true; mapJobBuilder.SetMapJobParams(mapJob.get(), execCtx,remapperMap, remapperAllFiles, useSkiff, forceYsonInputFormat, false); - mapJob->SetIsOrdered(ordered); + auto mapJobType = ordered ? EFmrJobType::OrderedMap : EFmrJobType::Map; + mapJob->SetFmrJobType(mapJobType); mapJob->SetSettings(TFmrUserJobSettings()); - TFuture uploadFilesToDistributedCacheIfNeededFuture; std::vector filesToUpload; // Udfs and local files to upload to dist cache. std::vector ytResources; // Yt files and small tables which we need to download as files in jobs. std::vector fmrResources; // Yt small tables, which are already in fmr and we need to download as files in jobs. - if (!FmrServices_->FileUploadService) { - uploadFilesToDistributedCacheIfNeededFuture = MakeFuture(); - } else { - YQL_ENSURE(UrlMapper_ && Clusters_); - - execCtx->MakeUserFiles(); - auto tmpFiles = MakeIntrusive(execCtx->FileStorage_->GetTemp()); - - auto client = execCtx->CreateYtClient(execCtx->Options_.Config()); - - auto downloader = MakeYtNativeFileDownloader(execCtx->Gateway, sessionId, execCtx->Cluster_, execCtx->Options_.Config(), client, tmpFiles); - TTransformerFiles transformerFiles; - { - TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),execCtx->FunctionRegistry_->SupportsSizedAllocators()); - alloc.SetLimit(execCtx->Options_.Config()->DefaultCalcMemoryLimit.Get().GetOrElse(0)); - transformerFiles = mapJobBuilder.UpdateAndSetMapLambda(alloc, execCtx, downloader, mapLambda, mapJob.get(), useSkiff); - } - - for (auto& fileInfo: transformerFiles.LocalFiles) { - filesToUpload.emplace_back(TFileInfo{ - .LocalPath = fileInfo.first, - .Md5Key = fileInfo.second.Hash, - .Alias = TFsPath(fileInfo.first).GetName() // uniqueId - }); - } - for (auto& fileInfo: transformerFiles.DeferredUdfFiles) { - filesToUpload.emplace_back(TFileInfo{.LocalPath = fileInfo.first, .Md5Key = fileInfo.second.Hash}); - } - - auto remoteFilesClusterConnection = GetTableClusterConnection(execCtx->Cluster_, sessionId, execCtx->Options_.Config()); - - for (auto& richPath: transformerFiles.RemoteFiles) { - // Remote files all should have the same cluster, and GatewayTransformer clears it from richPaths, so we need to fill it. - richPath.Cluster(execCtx->Cluster_); - - // Checking in case remotePath is a table which is already inserted in fmr. - TFmrTableId fmrTableId(richPath); - auto fmrTablePresenceStatus = GetTablePresenceStatus(fmrTableId, sessionId); - if (fmrTablePresenceStatus == ETablePresenceStatus::OnlyInFmr || fmrTablePresenceStatus == ETablePresenceStatus::Both) { - TFmrTableRef fmrTableRef{.FmrTableId = fmrTableId}; - fmrTableRef.SerializedColumnGroups = GetColumnGroupSpec(fmrTableRef.FmrTableId, sessionId); - if (!richPath.Columns_.Empty()) { - std::vector neededColumns(richPath.Columns_->Parts_.begin(), richPath.Columns_->Parts_.end()); - fmrTableRef.Columns = neededColumns; - } - - YQL_ENSURE(richPath.FileName_.Defined()); // uniqueId, filled in transformer. - fmrResources.emplace_back(TFmrResourceOperationInfo{.FmrTable = fmrTableRef, .Alias = *richPath.FileName_}); - continue; - } - - // adding remotePath info to list of ytResources to download in jobs. - - if (!richPath.TransactionId_.Defined() && remoteFilesClusterConnection.TransactionId) { - richPath.TransactionId(GetGuid(remoteFilesClusterConnection.TransactionId)); - } - - TYtResourceInfo ytResourceInfo{.RichPath = richPath}; - ytResourceInfo.YtServerName = remoteFilesClusterConnection.YtServerName; - if (remoteFilesClusterConnection.Token.Defined()) { - ytResourceInfo.Token = *remoteFilesClusterConnection.Token; - } - ytResources.emplace_back(ytResourceInfo); - } - - for (auto& fileInfo: filesToUpload) { - for (auto& [udfModule, udfPrefix]: transformerFiles.JobUdfs) { - if (fileInfo.Alias.empty() && fileInfo.LocalPath.EndsWith(udfModule.substr(2))) { - YQL_CLOG(DEBUG, FastMapReduce) << "Setting file alias " << udfModule << " for udf with path " << fileInfo.LocalPath; - fileInfo.Alias = udfModule; - } - } - } - - if (!filesToUpload.empty()) { - YQL_ENSURE(FmrServices_->FileUploadService, "FileUploadService is not configured, but map operation requires uploading " - << filesToUpload.size() << " files (UDFs/local files) to distributed cache. " - << "Please configure FileRemoteCacheName in FmrConfigurations and FileCacheConfigurations in gateways.conf"); - } - - uploadFilesToDistributedCacheIfNeededFuture = UploadFilesToDistributedCache(filesToUpload); - } + TFuture uploadFilesToDistributedCacheIfNeededFuture = GetUploadFilesToDistributedCacheFuture(execCtx, mapJob, mapLambda, filesToUpload, ytResources, fmrResources); return uploadFilesToDistributedCacheIfNeededFuture.Apply([=, this] (const auto& f) mutable { f.GetValue(); @@ -2348,12 +2354,7 @@ private: TStringStream jobStateStream; mapJob->Save(jobStateStream); - TMapOperationParams mapOperationParams{ - .Input = mapInputTables, - .Output = fmrOutputTables, - .SerializedMapJobState = jobStateStream.Str(), - .IsOrdered = ordered - }; + TMapOperationParams mapOperationParams{.Input = mapInputTables,.Output = fmrOutputTables, .SerializedMapJobState = jobStateStream.Str(), .MapJobType = mapJobType}; TStartOperationRequest mapOperationRequest{ .OperationType = EOperationType::Map, .OperationParams = mapOperationParams, @@ -2430,6 +2431,158 @@ private: return GetRunningOperationFuture(sortOperationRequest, sessionId); } + TSortingColumns GetSortingColumnsFromColumnPairList(const TVector>& sortColumns) { + TSortingColumns sortingColumns; + for (auto& [colName, isAscending]: sortColumns) { + sortingColumns.Columns.emplace_back(colName); + ESortOrder sortOrder = isAscending ? ESortOrder::Ascending : ESortOrder::Descending; + sortingColumns.SortOrders.emplace_back(sortOrder); + } + return sortingColumns; + } + + TFuture DoReduce( + TYtReduce reduce, + const TExecContextSimple::TPtr& execCtx, + TExprContext& ctx + ) { + TString sessionId = execCtx->GetSessionId(); + YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); + YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); + + auto reduceBy = NYql::GetSettingAsColumnPairList(reduce.Settings().Ref(), EYtSettingType::ReduceBy); + auto sortBy = NYql::GetSettingAsColumnPairList(reduce.Settings().Ref(), EYtSettingType::SortBy); + bool joinReduce = NYql::HasSetting(reduce.Settings().Ref(), EYtSettingType::JoinReduce); + bool useFirstAsPrimary = NYql::HasSetting(reduce.Settings().Ref(), EYtSettingType::FirstAsPrimary); + + TVector sortLimitBy = NYql::GetSettingAsColumnList(reduce.Settings().Ref(), EYtSettingType::SortLimitBy); + TMaybe limit = GetLimit(reduce.Settings().Ref()); + if (limit && !sortLimitBy.empty() && *limit > execCtx->Options_.Config()->TopSortMaxLimit.Get().GetOrElse(DEFAULT_TOP_SORT_LIMIT)) { + limit.Clear(); + } + + auto [reduceInputTables, clusterConnections] = GetInputTablesAndConnections(execCtx->InputTables_, sessionId, execCtx->Options_.Config()); + auto reduceOutputTables = GetOutputTables(execCtx); + + auto reduceJob = std::make_shared(); + TReduceJobBuilder reduceJobBuilder; + + TVector groups; + TVector tables; + TVector rowOffsets; + ui64 currentRowOffset = 0; + std::vector primaryInputTablesPaths; + + YQL_ENSURE(!execCtx->InputTables_.empty()); + const ui32 primaryGroup = useFirstAsPrimary ? execCtx->InputTables_.front().Group : execCtx->InputTables_.back().Group; + for (const auto& table : execCtx->InputTables_) { + if (joinReduce) { + auto yPath = table.Path; + if (table.Group == primaryGroup) { + primaryInputTablesPaths.emplace_back(yPath); + } + } + + if (!groups.empty() && groups.back() != table.Group) { + currentRowOffset = 0; + } + + groups.push_back(table.Group); + tables.push_back(table.Temp ? TString() : table.Name); + rowOffsets.push_back(currentRowOffset); + currentRowOffset += table.Records; + } + + THashSet auxColumns; + std::for_each(reduceBy.begin(), reduceBy.end(), [&auxColumns](const auto& it) { auxColumns.insert(it.first); }); + if (!sortBy.empty()) { + std::for_each(sortBy.begin(), sortBy.end(), [&auxColumns](const auto& it) { auxColumns.insert(it.first); }); + } + + if (sortBy.empty() && !joinReduce) { + sortBy = reduceBy; + } + + // handle unsupported reduce types. + auto fallbackReduceOperationResult = TFmrOperationResult{ + .Errors = { + TFmrError{ + .Component = EFmrComponent::Gateway, + .Reason = EFmrErrorReason::FallbackOperation, + } + } + }; + + if (joinReduce) { + fallbackReduceOperationResult.Errors[0].ErrorMessage = "Join Reduce is not supported yet, falling back to underlying gateway"; + return MakeFuture(fallbackReduceOperationResult); + } + + for (auto& table: reduceInputTables) { + if (std::holds_alternative(table)) { + auto ytTable = std::get(table); + fallbackReduceOperationResult.Errors[0].ErrorMessage = TStringBuilder() << "Table " << ytTable.GetPath() << " is not in fmr - falling back to underlying gateway"; + return MakeFuture(fallbackReduceOperationResult); + } + } + + + TReduceOperationSpec reduceOperationSpec{ + .ReduceBy = GetSortingColumnsFromColumnPairList(reduceBy), + .SortBy = GetSortingColumnsFromColumnPairList(sortBy), + .ReduceType = joinReduce ? EReduceType::JoinReduce : EReduceType::SortedReduce + }; // TODO - add JoinReduceSpec, for now not supported. + + reduceJobBuilder.SetInputType(reduceJob.get(), reduce); + reduceJobBuilder.SetReduceJobParams(reduceJob.get(), execCtx, groups, tables, rowOffsets, auxColumns); + TString reduceLambda = reduceJobBuilder.SetReduceLambdaCode(reduceJob.get(), reduce, execCtx, ctx); + + reduceJob->SetSettings(TFmrUserJobSettings()); + + std::vector filesToUpload; // Udfs and local files to upload to dist cache. + std::vector ytResources; // Yt files and small tables which we need to download as files in jobs. + std::vector fmrResources; // Yt small tables, which are already in fmr and we need to download as files in jobs. + + TFuture uploadFilesToDistributedCacheIfNeededFuture = GetUploadFilesToDistributedCacheFuture(execCtx, reduceJob, reduceLambda, filesToUpload, ytResources, fmrResources); + + return uploadFilesToDistributedCacheIfNeededFuture.Apply([=, this] (const auto& f) mutable { + f.GetValue(); + // serializing job State + TStringStream jobStateStream; + reduceJob->Save(jobStateStream); + + TReduceOperationParams reduceOperationParams{ + .Input = reduceInputTables, + .Output = reduceOutputTables, + .SerializedReduceJobState = jobStateStream.Str(), + .ReduceOperationSpec = reduceOperationSpec + }; + TStartOperationRequest reduceOperationRequest{ + .OperationType = EOperationType::Reduce, + .OperationParams = reduceOperationParams, + .SessionId = sessionId, + .IdempotencyKey = GenerateId(), + .NumRetries = 1, + .ClusterConnections = clusterConnections, + .FmrOperationSpec = execCtx->Options_.Config()->FmrOperationSpec.Get(execCtx->Cluster_), + .Files = filesToUpload, + .YtResources = ytResources, + .FmrResources = fmrResources + }; + + std::vector inputPaths, outputPaths; + std::transform(execCtx->InputTables_.begin(), execCtx->InputTables_.end(), std::back_inserter(inputPaths), [](const auto& table) { + return table.Cluster + "." + table.Name;} + ); + std::transform(execCtx->OutTables_.begin(), execCtx->OutTables_.end(), std::back_inserter(outputPaths), [execCtx](const auto& table) { + return execCtx->Cluster_ + "." + table.Path;} + ); + + YQL_CLOG(INFO, FastMapReduce) << "Starting reduce from yt tables: " << JoinRange(' ', inputPaths.begin(), inputPaths.end()) << " to yt tables: " << JoinRange(' ', outputPaths.begin(), outputPaths.end()); + return GetRunningOperationFuture(reduceOperationRequest, sessionId); + }); + } + TFuture GetSuccessfulFmrOperationResult() { TFmrOperationResult fmrOperationResult = TFmrOperationResult(); fmrOperationResult.SetSuccess(); -- cgit v1.3