summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2026-05-06 15:06:33 +0300
committerrobot-piglet <[email protected]>2026-05-06 15:37:21 +0300
commit9ea558ee3ea55df6daa3059cd641b80c6ee29c2c (patch)
treeca58e42e2e7406ddc9afe8ab99433c4f76d39b1b
parent9d6fa2aad999a2a43c1c85369fb43bfbc0bd61a7 (diff)
Intermediate changes
commit_hash:3e37e70e0c7a87457a3fa32235a0f54a3b1dee70
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/default_operation_settings.yson5
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make2
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_reduce_partitioner_ut.cpp178
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.cpp177
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.h51
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_ut.cpp185
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/map/yql_yt_map_stage_operation_manager.cpp6
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/ya.make11
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.cpp145
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.h9
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sort/yql_yt_sort_stage_operation_manager.cpp2
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_merge/yql_yt_sorted_merge_stage_operation_manager.cpp2
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_default_stage_operation_manager.cpp3
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_partition_settings_helpers.h9
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/partitioner/ya.make2
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.cpp168
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.h62
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.cpp554
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.h119
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.cpp530
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.h143
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make2
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp51
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h10
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.cpp133
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.h33
-rw-r--r--yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h10
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make2
-rw-r--r--yt/yql/providers/yt/fmr/job_launcher/yql_yt_job_launcher.cpp8
-rw-r--r--yt/yql/providers/yt/fmr/process/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.cpp57
-rw-r--r--yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h13
-rw-r--r--yt/yql/providers/yt/fmr/proto/request_options.proto42
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp102
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h12
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp20
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h49
-rw-r--r--yt/yql/providers/yt/fmr/test_tools/sorted_partitioner/yql_yt_sorted_partitioner_test_tools.cpp1
-rw-r--r--yt/yql/providers/yt/fmr/utils/comparator/ya.make5
-rw-r--r--yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.cpp16
-rw-r--r--yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.h3
-rw-r--r--yt/yql/providers/yt/fmr/utils/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp2
-rw-r--r--yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h3
-rw-r--r--yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.cpp5
-rw-r--r--yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.h2
-rw-r--r--yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.cpp5
-rw-r--r--yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.h2
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/file/ut/ya.make2
-rw-r--r--yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp339
52 files changed, 2360 insertions, 936 deletions
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/default_operation_settings.yson b/yt/yql/providers/yt/fmr/coordinator/impl/default_operation_settings.yson
index 750b26ac23d..ef51e954180 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/default_operation_settings.yson
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/default_operation_settings.yson
@@ -71,4 +71,9 @@
"retries_before_throw" = 5;
"timeout_sec" = 300;
};
+ "reduce" = {
+ "max_data_weight_per_part" = 204288000;
+ "max_parts" = 64;
+ "max_key_size_per_part" = 10000000;
+ }
}
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make
index feb0219d42d..8ca30f57288 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make
@@ -6,6 +6,8 @@ SRCS(
yql_yt_ordered_partitioner_ut.cpp
yql_yt_fmr_partitioner_ut.cpp
yql_yt_sorted_partitioner_ut.cpp
+ yql_yt_sorted_partitioner_base_ut.cpp
+ yql_yt_reduce_partitioner_ut.cpp
yql_yt_fmr_boundary_keys_ut.cpp
)
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_reduce_partitioner_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_reduce_partitioner_ut.cpp
new file mode 100644
index 00000000000..63ea2c6298c
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_reduce_partitioner_ut.cpp
@@ -0,0 +1,178 @@
+#include "yql_yt_sorted_partitioner_base_ut.h"
+
+namespace NYql::NFmr::NPartitionerTest {
+
+Y_UNIT_TEST_SUITE(ReducePartitionerTests) {
+ Y_UNIT_TEST(ReducePartitioningCases) {
+ const TVector<TCase> cases = {
+ {
+ .Input = {
+ {{"A","B"}, {"C","D"}},
+ },
+ .Expected = {
+ {"TAB0-[A:B)"},
+ {"TAB0-[B:D]"}
+ },
+ .MaxWeight = 1,
+ .PartitionerType = EPartitionerType::Reduce,
+ .MaxKeySizePerPart = 1
+ },
+ {
+ .Input = {
+ {{"A","B"}, {"B","B"}, {"B","C"}, {"C","D"}},
+ },
+ .Expected = {
+ {"TAB0-[A:D]"},
+ },
+ .MaxWeight = 1000,
+ .PartitionerType = EPartitionerType::Reduce,
+ .MaxKeySizePerPart = 1
+ },
+ {
+ .Input = {
+ {{"A","B"}, {"B","B"}, {"B","C"}, {"C","D"}},
+ },
+ .Expected = {
+ {"TAB0-[A:B)"},
+ {"TAB0-[B:D]"}
+ },
+ .MaxWeight = 2,
+ .PartitionerType = EPartitionerType::Reduce,
+ .MaxKeySizePerPart = 1
+ },
+ {
+ .Input = {
+ {{"A","B"}, {"B","B"}, {"B", "B"}, {"B","C"}, {"C","D"}},
+ },
+ .MaxWeight = 2,
+ .PartitionerType = EPartitionerType::Reduce,
+ .MaxKeySizePerPart = 1,
+ .ExpectedError = "cannot partition"
+ },
+ {
+ .Input = {
+ {{"A","B"}, {"B","C"}, {"C","D"}},
+ {{"A","D"}},
+ },
+ .Expected = {
+ {"TAB0-[A:B)", "TAB1-[A:B)"},
+ {"TAB0-[B:C)", "TAB1-[B:C)"},
+ {"TAB0-[C:D]", "TAB1-[C:D]"},
+ },
+ .MaxWeight = 1,
+ .PartitionerType = EPartitionerType::Reduce,
+ .MaxKeySizePerPart = 1
+ },
+ {
+ .Input = {
+ {{"A","B"}, {"B","B"}, {"B","B"}, {"B","C"}},
+ {{"A","D"}},
+ },
+ .MaxWeight = 1,
+ .PartitionerType = EPartitionerType::Reduce,
+ .MaxKeySizePerPart = 1,
+ .ExpectedError = "cannot partition"
+ },
+ {
+ .Input = {
+ {{"A","B"}, {"B","B"}, {"B","B"}, {"B","C"}},
+ {{"A","D"}},
+ },
+ .Expected = {
+ {"TAB0-[A:C)", "TAB1-[A:C)"},
+ {"TAB0-[C:D]", "TAB1-[C:D]"},
+ },
+ .MaxWeight = 4,
+ .PartitionerType = EPartitionerType::Reduce,
+ .MaxKeySizePerPart = 2,
+ },
+ {
+ .Input = {
+ {{"A","B"}, {"B","C"}},
+ {{"X","Y"}},
+ },
+ .Expected = {
+ {"TAB0-[A:Y]", "TAB1-[A:Y]"},
+ },
+ .MaxWeight = 1000,
+ .PartitionerType = EPartitionerType::Reduce,
+ .MaxKeySizePerPart = 1
+ },
+ {
+ .Input = {
+ {{"A","B"}},
+ {{"X","Y"}},
+ },
+ .Expected = {
+ {"TAB0-[A:B)"},
+ {"TAB0-[B:Y]", "TAB1-[B:Y]"},
+ },
+ .MaxWeight = 1,
+ .PartitionerType = EPartitionerType::Reduce,
+ .MaxKeySizePerPart = 1,
+ },
+ {
+ .Input = {
+ {{"A","B"}},
+ {{"A","B"}},
+ },
+ .Expected = {
+ {"TAB0-[A:B]", "TAB1-[A:B]"},
+ },
+ .MaxWeight = 1,
+ .PartitionerType = EPartitionerType::Reduce,
+ .MaxKeySizePerPart = 1
+ },
+ {
+ .Input = {
+ {{"B","B"}, {"B","B"}},
+ {{"A","D"}},
+ },
+ .Expected = {
+ {"TAB0-[A:D]", "TAB1-[A:D]"},
+ },
+ .MaxWeight = 1000,
+ .PartitionerType = EPartitionerType::Reduce,
+ .MaxKeySizePerPart = 3
+ },
+ {
+ .Input = {
+ {{"B","B"}, {"B","B"}},
+ {{"A","D"}},
+ },
+ .Expected = {
+ {"TAB0-[A:B)", "TAB1-[A:B)"},
+ {"TAB0-[B:D]", "TAB1-[B:D]"},
+ },
+ .MaxWeight = 2,
+ .PartitionerType = EPartitionerType::Reduce,
+ .MaxKeySizePerPart = 2
+ },
+ {
+ .Input = {
+ {{"A","B"}, {"B","C"}},
+ {{"A","C"}, {"C","D"}},
+ {{"A","D"}, {"D","E"}},
+ {{"A","E"}, {"E","F"}, {"F","G"}},
+ {{"A","F"}, {"F","G"}, {"G","H"}},
+ },
+ .Expected = {
+ {"TAB0-[A:B)", "TAB1-[A:B)", "TAB2-[A:B)", "TAB3-[A:B)", "TAB4-[A:B)"},
+ {"TAB0-[B:C)", "TAB1-[B:C)", "TAB2-[B:C)", "TAB3-[B:C)", "TAB4-[B:C)"},
+ {"TAB0-[C:D)", "TAB1-[C:D)", "TAB2-[C:D)", "TAB3-[C:D)", "TAB4-[C:D)"},
+ {"TAB1-[D:E)", "TAB2-[D:E)", "TAB3-[D:E)", "TAB4-[D:E)"},
+ {"TAB2-[E:F)", "TAB3-[E:F)", "TAB4-[E:F)"},
+ {"TAB3-[F:G)", "TAB4-[F:G)"},
+ {"TAB3-[G:H]", "TAB4-[G:H]"}
+ },
+ .MaxWeight = 1,
+ .PartitionerType = EPartitionerType::Reduce,
+ .MaxKeySizePerPart = 1
+ },
+ };
+
+ CheckPartitionCorrectness(cases);
+ }
+}
+
+} // namespace NYql::NFmr::NPartitionerTest
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.cpp
new file mode 100644
index 00000000000..583766a7de7
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.cpp
@@ -0,0 +1,177 @@
+#include "yql_yt_sorted_partitioner_base_ut.h"
+
+namespace NYql::NFmr::NPartitionerTest {
+
+NYT::TNode Key(ui64 v) {
+ NYT::TNode m = NYT::TNode::CreateMap();
+ m["k"] = static_cast<i64>(v);
+ return m;
+}
+
+TChunkStats MakeSortedChunk(ui64 weight, ui64 firstK, ui64 lastK) {
+ TSortedChunkStats s;
+ s.IsSorted = true;
+ s.FirstRowKeys = Key(firstK);
+ s.LastRowKeys = Key(lastK);
+ return TChunkStats{.Rows = 1, .DataWeight = weight, .SortedChunkStats = s};
+}
+
+NYT::TNode Key(const TString& v) {
+ NYT::TNode m = NYT::TNode::CreateMap();
+ m["k"] = v;
+ return m;
+}
+
+TChunkStats MakeSortedChunk(ui64 weight, const TString& firstK, const TString& lastK) {
+ TSortedChunkStats s;
+ s.IsSorted = true;
+ s.FirstRowKeys = Key(firstK);
+ s.LastRowKeys = Key(lastK);
+ return TChunkStats{.Rows = 1, .DataWeight = weight, .SortedChunkStats = s};
+}
+
+void AssertTaskHasRangesForTable(
+ const TTaskTableInputRef& task,
+ const TString& tableId,
+ size_t expectedRangesCount,
+ bool expectLowerKey,
+ bool expectUpperKey
+) {
+ for (const auto& input : task.Inputs) {
+ if (auto* fmrInput = std::get_if<TFmrTableInputRef>(&input)) {
+ if (fmrInput->TableId == tableId) {
+ UNIT_ASSERT_VALUES_EQUAL(fmrInput->TableRanges.size(), expectedRangesCount);
+ UNIT_ASSERT_VALUES_EQUAL(fmrInput->FirstRowKeys.Defined(), expectLowerKey);
+ UNIT_ASSERT_VALUES_EQUAL(fmrInput->LastRowKeys.Defined(), expectUpperKey);
+ if (expectLowerKey) {
+ UNIT_ASSERT_C(fmrInput->IsFirstRowInclusive.Defined(), "IsFirstRowInclusive must be set when FirstRowKeys is set");
+ }
+ return;
+ }
+ }
+ }
+ UNIT_FAIL("Table not found in task inputs: " + tableId);
+}
+
+TString ExtractKeyValueK(const TString& ysonRow) {
+ const NYT::TNode n = NYT::NodeFromYsonString(ysonRow);
+ return n["k"].AsString();
+}
+
+TVector<TVector<TString>> NormalizeTasksAsProtoStrings(const std::vector<TTaskTableInputRef>& tasks, size_t tablesCount) {
+ TVector<TVector<TString>> out;
+ out.reserve(tasks.size());
+
+ for (const auto& task : tasks) {
+ TVector<TString> parts;
+ parts.reserve(tablesCount);
+
+ for (size_t tableIdx = 0; tableIdx < tablesCount; ++tableIdx) {
+ const TString tableId = TStringBuilder() << "c.t" << tableIdx;
+
+ const TFmrTableInputRef* ref = nullptr;
+ for (const auto& input : task.Inputs) {
+ const auto* fmr = std::get_if<TFmrTableInputRef>(&input);
+ if (!fmr) {
+ continue;
+ }
+ if (fmr->TableId == tableId) {
+ ref = fmr;
+ break;
+ }
+ }
+ if (!ref) {
+ continue;
+ }
+
+ const bool leftInclusive = ref->IsFirstRowInclusive.Defined() ? *ref->IsFirstRowInclusive : true;
+ const bool rightInclusive = ref->IsLastRowInclusive.Defined() ? *ref->IsLastRowInclusive : true;
+ const TString left = ref->FirstRowKeys ? ExtractKeyValueK(*ref->FirstRowKeys) : TString();
+ const TString right = ref->LastRowKeys ? ExtractKeyValueK(*ref->LastRowKeys) : TString();
+
+ TStringBuilder s;
+ s << "TAB" << tableIdx << "-" << (leftInclusive ? "[" : "(") << left << ":" << right << (rightInclusive ? "]" : ")");
+ parts.push_back(s);
+ }
+
+ out.push_back(std::move(parts));
+ }
+
+ return out;
+}
+
+TString DumpTasks(const TVector<TVector<TString>>& tasks) {
+ TStringBuilder b;
+ b << "[";
+ for (size_t i = 0; i < tasks.size(); ++i) {
+ if (i) {
+ b << ", ";
+ }
+ b << "[";
+ for (size_t j = 0; j < tasks[i].size(); ++j) {
+ if (j) {
+ b << ", ";
+ }
+ b << tasks[i][j];
+ }
+ b << "]";
+ }
+ b << "]";
+ return b;
+}
+
+void CheckPartitionCorrectness(const std::vector<TCase>& cases) {
+ for (size_t caseIdx = 0; caseIdx < cases.size(); ++caseIdx) {
+ const auto& c = cases[caseIdx];
+
+ std::unordered_map<TFmrTableId, std::vector<TString>> partIdsForTables;
+ std::unordered_map<TString, std::vector<TChunkStats>> partIdStats;
+ TVector<TOperationTableRef> inputTables;
+ inputTables.reserve(c.Input.size());
+
+ for (size_t tableIdx = 0; tableIdx < c.Input.size(); ++tableIdx) {
+ TFmrTableId tableId("c", TStringBuilder() << "t" << tableIdx);
+ TFmrTableRef ref{.FmrTableId = tableId};
+ inputTables.push_back(ref);
+
+ const TString partId = TStringBuilder() << "p" << tableIdx;
+ partIdsForTables[tableId] = {partId};
+
+ auto& stats = partIdStats[partId];
+ stats.reserve(c.Input[tableIdx].size());
+ for (const auto& interval : c.Input[tableIdx]) {
+ stats.push_back(MakeSortedChunk(1, interval.first, interval.second));
+ }
+ }
+
+ TFmrPartitionerSettings fmrPartitionSettings {.MaxDataWeightPerPart = c.MaxWeight, .MaxParts = 1000};
+ TSortingColumns keyColumns{.Columns = {"k"}, .SortOrders = {ESortOrder::Ascending}};
+
+ TSortedPartitionerBase::TPtr partitioner;
+ if (c.PartitionerType == EPartitionerType::Sort) {
+ TSortedPartitionSettings settings;
+ settings.FmrPartitionSettings = fmrPartitionSettings;
+ partitioner = MakeIntrusive<TSortedPartitioner>(partIdsForTables, partIdStats, keyColumns, settings);
+ } else {
+ TReducePartitionSettings settings;
+ settings.FmrPartitionSettings = fmrPartitionSettings;
+ settings.MaxKeySizePerPart = c.MaxKeySizePerPart;
+ partitioner = MakeIntrusive<TReducePartitioner>(partIdsForTables, partIdStats, keyColumns, settings);
+ }
+
+ auto [tasks, error] = partitioner->PartitionTablesIntoTasks(inputTables);
+ if (error) {
+ YQL_ENSURE(!c.ExpectedError.empty(), "Partitioner fell with unexpected error " << error->ErrorMessage);
+ UNIT_ASSERT(error->ErrorMessage.Contains(c.ExpectedError));
+ } else {
+ const auto actual = NormalizeTasksAsProtoStrings(tasks, c.Input.size());
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ DumpTasks(actual),
+ DumpTasks(c.Expected),
+ TStringBuilder() << "caseIdx=" << caseIdx
+ );
+ }
+ }
+}
+
+} // namespace NYql::NFmr::NPartitionerTest
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.h b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.h
new file mode 100644
index 00000000000..1f96f3e253d
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_base_ut.h
@@ -0,0 +1,51 @@
+#pragma once
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.h>
+#include <yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.h>
+#include <yt/yql/providers/yt/fmr/test_tools/sorted_partitioner/yql_yt_sorted_partitioner_test_tools.h>
+
+#include <util/string/builder.h>
+
+namespace NYql::NFmr::NPartitionerTest {
+
+NYT::TNode Key(ui64 v);
+
+TChunkStats MakeSortedChunk(ui64 weight, ui64 firstK, ui64 lastK);
+
+NYT::TNode Key(const TString& v);
+
+TChunkStats MakeSortedChunk(ui64 weight, const TString& firstK, const TString& lastK);
+
+void AssertTaskHasRangesForTable(
+ const TTaskTableInputRef& task,
+ const TString& tableId,
+ size_t expectedRangesCount,
+ bool expectLowerKey,
+ bool expectUpperKey
+);
+
+TString ExtractKeyValueK(const TString& ysonRow);
+
+TVector<TVector<TString>> NormalizeTasksAsProtoStrings(const std::vector<TTaskTableInputRef>& tasks, size_t tablesCount);
+
+TString DumpTasks(const TVector<TVector<TString>>& tasks);
+
+enum EPartitionerType {
+ Sort,
+ Reduce
+};
+
+struct TCase {
+ TVector<TVector<std::pair<TString, TString>>> Input;
+ TVector<TVector<TString>> Expected;
+ ui64 MaxWeight = 1;
+ EPartitionerType PartitionerType = EPartitionerType::Sort;
+ ui64 MaxKeySizePerPart = 1;
+ TString ExpectedError;
+};
+
+void CheckPartitionCorrectness(const std::vector<TCase>& cases);
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_ut.cpp
index 07b95d19cbf..a5a08201f3e 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_ut.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_sorted_partitioner_ut.cpp
@@ -1,132 +1,6 @@
-#include <library/cpp/testing/unittest/registar.h>
+#include "yql_yt_sorted_partitioner_base_ut.h"
-#include <yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.h>
-#include <yt/yql/providers/yt/fmr/test_tools/sorted_partitioner/yql_yt_sorted_partitioner_test_tools.h>
-
-#include <util/string/builder.h>
-
-namespace NYql::NFmr {
-
-namespace {
-
-NYT::TNode Key(ui64 v) {
- NYT::TNode m = NYT::TNode::CreateMap();
- m["k"] = static_cast<i64>(v);
- return m;
-}
-
-TChunkStats MakeSortedChunk(ui64 weight, ui64 firstK, ui64 lastK) {
- TSortedChunkStats s;
- s.IsSorted = true;
- s.FirstRowKeys = Key(firstK);
- s.LastRowKeys = Key(lastK);
- return TChunkStats{.Rows = 1, .DataWeight = weight, .SortedChunkStats = s};
-}
-
-NYT::TNode Key(const TString& v) {
- NYT::TNode m = NYT::TNode::CreateMap();
- m["k"] = v;
- return m;
-}
-
-TChunkStats MakeSortedChunk(ui64 weight, const TString& firstK, const TString& lastK) {
- TSortedChunkStats s;
- s.IsSorted = true;
- s.FirstRowKeys = Key(firstK);
- s.LastRowKeys = Key(lastK);
- return TChunkStats{.Rows = 1, .DataWeight = weight, .SortedChunkStats = s};
-}
-
-void AssertTaskHasRangesForTable(
- const TTaskTableInputRef& task,
- const TString& tableId,
- size_t expectedRangesCount,
- bool expectLowerKey,
- bool expectUpperKey
-) {
- for (const auto& input : task.Inputs) {
- if (auto* fmrInput = std::get_if<TFmrTableInputRef>(&input)) {
- if (fmrInput->TableId == tableId) {
- UNIT_ASSERT_VALUES_EQUAL(fmrInput->TableRanges.size(), expectedRangesCount);
- UNIT_ASSERT_VALUES_EQUAL(fmrInput->FirstRowKeys.Defined(), expectLowerKey);
- UNIT_ASSERT_VALUES_EQUAL(fmrInput->LastRowKeys.Defined(), expectUpperKey);
- if (expectLowerKey) {
- UNIT_ASSERT_C(fmrInput->IsFirstRowInclusive.Defined(), "IsFirstRowInclusive must be set when FirstRowKeys is set");
- }
- return;
- }
- }
- }
- UNIT_FAIL("Table not found in task inputs: " + tableId);
-}
-
-TString ExtractKeyValueK(const TString& ysonRow) {
- const NYT::TNode n = NYT::NodeFromYsonString(ysonRow);
- return n["k"].AsString();
-}
-
-TVector<TVector<TString>> NormalizeTasksAsProtoStrings(const std::vector<TTaskTableInputRef>& tasks, size_t tablesCount) {
- TVector<TVector<TString>> out;
- out.reserve(tasks.size());
-
- for (const auto& task : tasks) {
- TVector<TString> parts;
- parts.reserve(tablesCount);
-
- for (size_t tableIdx = 0; tableIdx < tablesCount; ++tableIdx) {
- const TString tableId = TStringBuilder() << "c.t" << tableIdx;
-
- const TFmrTableInputRef* ref = nullptr;
- for (const auto& input : task.Inputs) {
- const auto* fmr = std::get_if<TFmrTableInputRef>(&input);
- if (!fmr) {
- continue;
- }
- if (fmr->TableId == tableId) {
- ref = fmr;
- break;
- }
- }
- if (!ref) {
- continue;
- }
-
- const bool inclusive = ref->IsFirstRowInclusive.Defined() ? *ref->IsFirstRowInclusive : true;
- const TString left = ref->FirstRowKeys ? ExtractKeyValueK(*ref->FirstRowKeys) : TString();
- const TString right = ref->LastRowKeys ? ExtractKeyValueK(*ref->LastRowKeys) : TString();
-
- TStringBuilder s;
- s << "TAB" << tableIdx << "-" << (inclusive ? "[" : "(") << left << ":" << right << "]";
- parts.push_back(s);
- }
-
- out.push_back(std::move(parts));
- }
-
- return out;
-}
-
-TString DumpTasks(const TVector<TVector<TString>>& tasks) {
- TStringBuilder b;
- b << "[";
- for (size_t i = 0; i < tasks.size(); ++i) {
- if (i) {
- b << ", ";
- }
- b << "[";
- for (size_t j = 0; j < tasks[i].size(); ++j) {
- if (j) {
- b << ", ";
- }
- b << tasks[i][j];
- }
- b << "]";
- }
- b << "]";
- return b;
-}
-
-} // namespace
+namespace NYql::NFmr::NPartitionerTest {
Y_UNIT_TEST_SUITE(SortedPartitionerTests) {
Y_UNIT_TEST(SplitsIntoKeyAlignedTasksAndSetsBounds) {
@@ -151,7 +25,7 @@ Y_UNIT_TEST_SUITE(SortedPartitionerTests) {
TSortingColumns keyColumns{.Columns = {"k"}, .SortOrders = {ESortOrder::Ascending}};
TSortedPartitioner partitioner(partIdsForTables, partIdStats, keyColumns, settings);
- auto [tasks, error] = partitioner.PartitionTablesIntoTasksSorted({r1, r2});
+ auto [tasks, error] = partitioner.PartitionTablesIntoTasks({r1, r2});
UNIT_ASSERT(!error);
UNIT_ASSERT(!tasks.empty());
@@ -177,7 +51,7 @@ Y_UNIT_TEST_SUITE(SortedPartitionerTests) {
TSortedPartitioner partitioner(partIdsForTables, partIdStats, keyColumns, settings);
UNIT_ASSERT_EXCEPTION_CONTAINS(
- partitioner.PartitionTablesIntoTasksSorted({r1}),
+ partitioner.PartitionTablesIntoTasks({r1}),
yexception,
"at least one partition");
}
@@ -199,7 +73,7 @@ Y_UNIT_TEST_SUITE(SortedPartitionerTests) {
TSortedPartitioner partitioner(partIdsForTables, partIdStats, keyColumns, settings);
UNIT_ASSERT_EXCEPTION_CONTAINS(
- partitioner.PartitionTablesIntoTasksSorted({r1}),
+ partitioner.PartitionTablesIntoTasks({r1}),
yexception,
"at least one chunk");
}
@@ -235,7 +109,7 @@ Y_UNIT_TEST_SUITE(SortedPartitionerTests) {
TSortingColumns sortingColumns{.Columns = keyColumns, .SortOrders = {ESortOrder::Ascending}};
TSortedPartitioner partitioner(partIdsForTables, partIdStats, sortingColumns, settings);
- auto [tasks, error] = partitioner.PartitionTablesIntoTasksSorted({refTable1, refTable2});
+ auto [tasks, error] = partitioner.PartitionTablesIntoTasks({refTable1, refTable2});
UNIT_ASSERT_C(!error, "Partitioner returned non-ok");
UNIT_ASSERT_C(!tasks.empty(), "Partitioner returned no tasks");
@@ -251,12 +125,6 @@ Y_UNIT_TEST_SUITE(SortedPartitionerTests) {
}
Y_UNIT_TEST(PrototypeLikePartitioningCases) {
- struct TCase {
- TVector<TVector<std::pair<TString, TString>>> Input;
- TVector<TVector<TString>> Expected;
- ui64 MaxWeight = 1;
- };
-
const TVector<TCase> cases = {
{
.Input = {
@@ -380,45 +248,8 @@ Y_UNIT_TEST_SUITE(SortedPartitionerTests) {
},
};
- for (size_t caseIdx = 0; caseIdx < cases.size(); ++caseIdx) {
- const auto& c = cases[caseIdx];
-
- std::unordered_map<TFmrTableId, std::vector<TString>> partIdsForTables;
- std::unordered_map<TString, std::vector<TChunkStats>> partIdStats;
- TVector<TOperationTableRef> inputTables;
- inputTables.reserve(c.Input.size());
-
- for (size_t tableIdx = 0; tableIdx < c.Input.size(); ++tableIdx) {
- TFmrTableId tableId("c", TStringBuilder() << "t" << tableIdx);
- TFmrTableRef ref{.FmrTableId = tableId};
- inputTables.push_back(ref);
-
- const TString partId = TStringBuilder() << "p" << tableIdx;
- partIdsForTables[tableId] = {partId};
-
- auto& stats = partIdStats[partId];
- stats.reserve(c.Input[tableIdx].size());
- for (const auto& interval : c.Input[tableIdx]) {
- stats.push_back(MakeSortedChunk(1, interval.first, interval.second));
- }
- }
-
- TSortedPartitionSettings settings;
- settings.FmrPartitionSettings = {.MaxDataWeightPerPart = c.MaxWeight, .MaxParts = 1000};
- TSortingColumns keyColumns{.Columns = {"k"}, .SortOrders = {ESortOrder::Ascending}};
-
- TSortedPartitioner partitioner(partIdsForTables, partIdStats, keyColumns, settings);
- auto [tasks, error] = partitioner.PartitionTablesIntoTasksSorted(inputTables);
- UNIT_ASSERT_C(!error, "Partitioner returned non-ok");
-
- const auto actual = NormalizeTasksAsProtoStrings(tasks, c.Input.size());
- UNIT_ASSERT_VALUES_EQUAL_C(
- DumpTasks(actual),
- DumpTasks(c.Expected),
- TStringBuilder() << "caseIdx=" << caseIdx
- );
- }
+ CheckPartitionCorrectness(cases);
}
}
-} // namespace NYql::NFmr
+} // namespace NYql::NFmr::NPartitionerTest
diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/map/yql_yt_map_stage_operation_manager.cpp b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/map/yql_yt_map_stage_operation_manager.cpp
index f35c76b4dc3..80bfe0894d0 100644
--- a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/map/yql_yt_map_stage_operation_manager.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/map/yql_yt_map_stage_operation_manager.cpp
@@ -24,7 +24,7 @@ public:
const auto& partIdStats = context.PartIdStats;
auto ytCoordinatorService = context.YtCoordinatorService;
- if (operationParams.IsOrdered) {
+ if (operationParams.MapJobType == EFmrJobType::OrderedMap) {
// Ordered map -> ordered partition
auto orderedPartitionerSettings = GetOrderedPartitionerSettings(fmrOperationSpec);
auto orderedPartitioner = TOrderedPartitioner(partIdsForTables, partIdStats, orderedPartitionerSettings);
@@ -56,7 +56,7 @@ public:
) override {
const auto& mapOperationParams = std::get<TMapOperationParams>(context.OperationParams);
- if (mapOperationParams.IsOrdered) {
+ if (mapOperationParams.MapJobType == EFmrJobType::OrderedMap) {
YQL_CLOG(INFO, FastMapReduce) << "Starting Ordered Map operation";
} else {
YQL_CLOG(INFO, FastMapReduce) << "Starting Map operation";
@@ -80,7 +80,7 @@ public:
mapTaskParams.Output = fmrTableOutputRefs;
mapTaskParams.SerializedMapJobState = mapOperationParams.SerializedMapJobState;
- mapTaskParams.IsOrdered = mapOperationParams.IsOrdered;
+ mapTaskParams.MapJobType = mapOperationParams.MapJobType;
generatedTasks.push_back(TGeneratedTaskInfo{
.TaskType = ETaskType::Map,
diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/ya.make b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/ya.make
new file mode 100644
index 00000000000..53d3ad22d36
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/ya.make
@@ -0,0 +1,11 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_reduce_stage_operation_manager.cpp
+)
+
+PEERDIR(
+ yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/base
+)
+
+END()
diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.cpp b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.cpp
new file mode 100644
index 00000000000..70922eb6747
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.cpp
@@ -0,0 +1,145 @@
+
+#include "yql_yt_reduce_stage_operation_manager.h"
+
+#include <yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_partition_settings_helpers.h>
+#include <yql/essentials/utils/yql_panic.h>
+#include <yql/essentials/utils/log/log.h>
+
+namespace NYql::NFmr {
+
+namespace {
+
+class TReduceStageOperationManager: public TFmrStageOperationManagerBase {
+public:
+ TReduceStageOperationManager(TIntrusivePtr<IRandomProvider> randomProvider)
+ : TFmrStageOperationManagerBase(randomProvider)
+ {
+ }
+
+ TPartitionResult PartitionOperationImpl(const TPrepareOperationStageContext& context) final {
+ const auto& operationParams = std::get<TReduceOperationParams>(context.OperationParams);
+ const auto& fmrOperationSpec = context.FmrOperationSpec;
+ const auto& partIdsForTables = context.PartIdsForTables;
+ const auto& partIdStats = context.PartIdStats;
+
+ TSortingColumns reduceBy = operationParams.ReduceOperationSpec.ReduceBy;
+ TSortingColumns sortBy = operationParams.ReduceOperationSpec.SortBy;
+
+
+ auto reduceParitionSettings = GetReducePartitionSettings(fmrOperationSpec);
+ auto reducePartitioner = TReducePartitioner(partIdsForTables, partIdStats, reduceBy, reduceParitionSettings);
+
+ std::vector<TOperationTableRef> inputTables = operationParams.Input;
+
+ return reducePartitioner.PartitionTablesIntoTasks(inputTables);
+ }
+
+ TGenerateTasksResult GenerateTasksImpl(const TGenerateTasksContext& context) final {
+ const auto& reduceOperationParams = std::get<TReduceOperationParams>(context.OperationParams);
+
+ YQL_CLOG(INFO, FastMapReduce) << "Starting Reduce operation";
+
+ TGenerateTasksResult result;
+ std::vector<TGeneratedTaskInfo> generatedTasks;
+ for (auto& task: context.PartitionResult.TaskInputs) {
+ TReduceTaskParams reduceTaskParams;
+ reduceTaskParams.Input = task;
+
+ std::vector<TFmrTableOutputRef> fmrTableOutputRefs;
+ std::transform(reduceOperationParams.Output.begin(), reduceOperationParams.Output.end(), std::back_inserter(fmrTableOutputRefs), [] (const TFmrTableRef& fmrTableRef) {
+ return TFmrTableOutputRef(fmrTableRef);
+ });
+
+ TString newPartId = GenerateId();
+ for (auto& fmrTableOutputRef: fmrTableOutputRefs) {
+ fmrTableOutputRef.PartId = newPartId;
+ result.PartIdsToUpdate[fmrTableOutputRef.TableId].emplace_back(newPartId);
+ }
+
+ reduceTaskParams.Output = fmrTableOutputRefs;
+ reduceTaskParams.SerializedReduceJobState = reduceOperationParams.SerializedReduceJobState;
+ reduceTaskParams.ReduceOperationSpec = reduceOperationParams.ReduceOperationSpec;
+
+ generatedTasks.push_back(TGeneratedTaskInfo{
+ .TaskType = ETaskType::Reduce,
+ .TaskParams = std::move(reduceTaskParams)
+ });
+ }
+ result.Tasks = std::move(generatedTasks);
+
+ return result;
+ }
+
+ TGetNewPartIdsForTaskResult GetNewPartIdsForTask(const TGetNewPartIdsForTaskContext& context) final {
+ TGetNewPartIdsForTaskResult result;
+ TReduceTaskParams& reduceTaskParams = std::get<TReduceTaskParams>(context.Task->TaskParams);
+
+ for (auto& fmrTableOutputRef: reduceTaskParams.Output) {
+ if (fmrTableOutputRef.PartId.empty()) {
+ return {.Error = TFmrError{
+ .Component = EFmrComponent::Coordinator,
+ .Reason = EFmrErrorReason::RestartQuery,
+ .ErrorMessage = "Reduce task has empty output PartId",
+ .TaskId = context.TaskId,
+ .OperationId = context.OperationId
+ }};
+ }
+ TString tableId = fmrTableOutputRef.TableId;
+ TString partId = fmrTableOutputRef.PartId;
+
+ const auto& partIdsIter = context.PartIdsForTables.find(tableId);
+ if (partIdsIter == context.PartIdsForTables.end()) {
+ return {.Error = TFmrError{
+ .Component = EFmrComponent::Coordinator,
+ .Reason = EFmrErrorReason::RestartQuery,
+ .ErrorMessage = "Reduce task output PartId is missing in coordinator part list",
+ .TaskId = context.TaskId,
+ .OperationId = context.OperationId
+ }};
+ }
+
+ const auto& partIds = partIdsIter->second;
+ if (std::find(partIds.begin(), partIds.end(), partId) == partIds.end()) {
+ return {.Error = TFmrError{
+ .Component = EFmrComponent::Coordinator,
+ .Reason = EFmrErrorReason::RestartQuery,
+ .ErrorMessage = "Reduce task output PartId is missing in coordinator part list",
+ .TaskId = context.TaskId,
+ .OperationId = context.OperationId
+ }};
+ }
+ }
+
+ return result;
+ }
+
+ std::vector<TPartIdInfo> GetPartIdsForTask(const GetPartIdsForTaskContext& context) final {
+ std::vector<TPartIdInfo> groupsToClear;
+ TReduceTaskParams& reduceTaskParams = std::get<TReduceTaskParams>(context.Task->TaskParams);
+ for (auto& fmrTableOutputRef: reduceTaskParams.Output) {
+ TString tableId = fmrTableOutputRef.TableId;
+ if (!fmrTableOutputRef.PartId.empty() && context.PartIdStats.contains(fmrTableOutputRef.PartId)) {
+ auto prevPartId = fmrTableOutputRef.PartId;
+ groupsToClear.emplace_back(tableId, prevPartId);
+ }
+ }
+ return groupsToClear;
+ }
+
+ std::vector<TString> GetExpectedOutputTableIds(const TOperationParams& params) const override {
+ const auto& reduceParams = std::get<TReduceOperationParams>(params);
+ std::vector<TString> ids;
+ for (const auto& output : reduceParams.Output) {
+ ids.emplace_back(output.FmrTableId.Id);
+ }
+ return ids;
+ }
+};
+
+} // namespace
+
+IFmrStageOperationManager::TPtr MakeReduceStageOperationManager(TIntrusivePtr<IRandomProvider> randomProvider) {
+ return MakeIntrusive<TReduceStageOperationManager>(randomProvider);
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.h b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.h
new file mode 100644
index 00000000000..b4055192897
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/base/yql_yt_base_stage_operation_manager.h>
+
+namespace NYql::NFmr {
+
+IFmrStageOperationManager::TPtr MakeReduceStageOperationManager(TIntrusivePtr<IRandomProvider> randomProvider);
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sort/yql_yt_sort_stage_operation_manager.cpp b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sort/yql_yt_sort_stage_operation_manager.cpp
index 280225a7527..1d8c3c3a85d 100644
--- a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sort/yql_yt_sort_stage_operation_manager.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sort/yql_yt_sort_stage_operation_manager.cpp
@@ -271,7 +271,7 @@ private:
auto sortedPartitioner = TSortedPartitioner(partIdsForTables, partIdStats, sortingColumns, sortedPartitionerSettings);
- return PartitionInputTablesIntoTasksSorted(inputTables, sortedPartitioner);
+ return sortedPartitioner.PartitionTablesIntoTasks(inputTables);
}
TPartitionResult DoUnorderedPartition(const TPrepareOperationStageContext& context) {
diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_merge/yql_yt_sorted_merge_stage_operation_manager.cpp b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_merge/yql_yt_sorted_merge_stage_operation_manager.cpp
index dd1b0c8ab1f..bf53e953e0f 100644
--- a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_merge/yql_yt_sorted_merge_stage_operation_manager.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_merge/yql_yt_sorted_merge_stage_operation_manager.cpp
@@ -31,7 +31,7 @@ public:
auto sortedPartitioner = TSortedPartitioner(partIdsForTables, partIdStats, sortingColumns, sortedPartitionerSettings);
std::vector<TOperationTableRef> inputTables = operationParams.Input;
- return PartitionInputTablesIntoTasksSorted(inputTables, sortedPartitioner);
+ return sortedPartitioner.PartitionTablesIntoTasks(inputTables);
}
TGenerateTasksResult GenerateTasksImpl(const TGenerateTasksContext& context) final {
diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/ya.make b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/ya.make
index cdfe66eb89c..0538e9e90dd 100644
--- a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/ya.make
+++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/ya.make
@@ -12,6 +12,7 @@ PEERDIR(
yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/merge
yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_merge
yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/map
+ yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce
yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_upload
yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sort
yql/essentials/utils
diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_default_stage_operation_manager.cpp b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_default_stage_operation_manager.cpp
index 8a95c23e4c5..b6a0af0537c 100644
--- a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_default_stage_operation_manager.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_default_stage_operation_manager.cpp
@@ -6,6 +6,7 @@
#include <yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/merge/yql_yt_merge_stage_operation_manager.h>
#include <yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_merge/yql_yt_sorted_merge_stage_operation_manager.h>
#include <yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/map/yql_yt_map_stage_operation_manager.h>
+#include <yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/reduce/yql_yt_reduce_stage_operation_manager.h>
#include <yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sorted_upload/yql_yt_sorted_upload_stage_operation_manager.h>
#include <yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/sort/yql_yt_sort_stage_operation_manager.h>
@@ -29,6 +30,8 @@ IFmrStageOperationManager::TPtr MakeStageOperationManager(EOperationType operati
return MakeSortedUploadStageOperationManager(randomProvider);
case EOperationType::Sort:
return MakeSortStageOperationManager(randomProvider);
+ case EOperationType::Reduce:
+ return MakeReduceStageOperationManager(randomProvider);
default:
ythrow yexception() << "Unknown operation type for stage operation manager";
}
diff --git a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_partition_settings_helpers.h b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_partition_settings_helpers.h
index 5fcb6358529..598a24649ca 100644
--- a/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_partition_settings_helpers.h
+++ b/yt/yql/providers/yt/fmr/coordinator/operation_manager/impl/yql_yt_partition_settings_helpers.h
@@ -4,6 +4,7 @@
#include <yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_fmr_partitioner.h>
#include <yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_ordered_partitioner.h>
#include <yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.h>
+#include <yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.h>
#include <yql/essentials/utils/yql_panic.h>
namespace NYql::NFmr {
@@ -55,6 +56,14 @@ inline TSortedPartitionSettings GetSortedPartitionerSettings(const NYT::TNode& f
return settings;
}
+inline TReducePartitionSettings GetReducePartitionSettings(const NYT::TNode& fmrOperationSpec) {
+ TReducePartitionSettings settings;
+ settings.FmrPartitionSettings.MaxDataWeightPerPart = fmrOperationSpec["reduce"]["max_data_weight_per_part"].AsInt64();
+ settings.FmrPartitionSettings.MaxParts = fmrOperationSpec["reduce"]["max_parts"].AsInt64();
+ settings.MaxKeySizePerPart = fmrOperationSpec["reduce"]["max_key_size_per_part"].AsInt64();
+ return settings;
+}
+
inline TFmrResourceTasksResult PartitionFmrResourcesIntoTasks(
const std::vector<TFmrResourceOperationInfo>& fmrResources,
const NYT::TNode& fmrOperationSpec,
diff --git a/yt/yql/providers/yt/fmr/coordinator/partitioner/ya.make b/yt/yql/providers/yt/fmr/coordinator/partitioner/ya.make
index db85f67241e..399f68d520f 100644
--- a/yt/yql/providers/yt/fmr/coordinator/partitioner/ya.make
+++ b/yt/yql/providers/yt/fmr/coordinator/partitioner/ya.make
@@ -3,7 +3,9 @@ LIBRARY()
SRCS(
yql_yt_fmr_partitioner.cpp
yql_yt_ordered_partitioner.cpp
+ yql_yt_reduce_partitioner.cpp
yql_yt_sorted_partitioner.cpp
+ yql_yt_sorted_partitioner_base.cpp
)
PEERDIR(
diff --git a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.cpp b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.cpp
new file mode 100644
index 00000000000..49dbafecd7b
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.cpp
@@ -0,0 +1,168 @@
+#include "yql_yt_reduce_partitioner.h"
+
+#include "yql_yt_sorted_partitioner_base.h"
+
+#include <yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_compare_impl.h>
+
+namespace NYql::NFmr {
+
+TReducePartitioner::TReducePartitioner(
+ const std::unordered_map<TFmrTableId, std::vector<TString>>& partIdsForTables,
+ const std::unordered_map<TString, std::vector<TChunkStats>>& partIdStats,
+ const TSortingColumns& reduceBy,
+ const TReducePartitionSettings& settings
+)
+ : TSortedPartitionerBase(partIdsForTables, partIdStats, reduceBy, settings.FmrPartitionSettings)
+ , ReduceBy_(reduceBy)
+ , Settings_(settings)
+{
+ YQL_ENSURE(Settings_.MaxKeySizePerPart <= Settings_.FmrPartitionSettings.MaxDataWeightPerPart);
+}
+
+TFmrTableKeysRange TReducePartitioner::GetReadRangeFromSlices(const std::vector<TSlice>& slices, bool isLastRange) {
+ YQL_ENSURE(!slices.empty());
+ TFmrTableKeysRange taskRange{.IsEmpty = false};
+ auto minSliceReadRange = slices[0].RangeForRead, maxSliceReadRange = slices[slices.size() - 1].RangeForRead;
+ YQL_ENSURE(minSliceReadRange.FirstKeysBound.Defined());
+ YQL_ENSURE(maxSliceReadRange.LastKeysBound.Defined());
+
+ auto taskRangeLeftBorder = *minSliceReadRange.FirstKeysBound;
+ bool taskRangeLeftBorderInclusive = minSliceReadRange.IsFirstBoundInclusive;
+ if (LeftBoundary_.Defined()) {
+ taskRangeLeftBorder = *LeftBoundary_;
+ taskRangeLeftBorderInclusive = true;
+ }
+
+ auto taskRangeRightBorder = *maxSliceReadRange.LastKeysBound;
+ bool taskRangeRightBorderInclusive = maxSliceReadRange.IsLastBoundInclusive;
+ LeftBoundary_ = Nothing();
+ if (maxSliceReadRange.IsLastBoundInclusive) {
+ // don't take last key if inclusive, because we need to guarantee that all reduce keys are in the same job.
+ LeftBoundary_ = *maxSliceReadRange.LastKeysBound;
+ if (!isLastRange) {
+ taskRangeRightBorderInclusive = false;
+ }
+ }
+
+ YQL_ENSURE(taskRangeLeftBorderInclusive, " task range left border should always be true for reduce paritioner");
+
+ if (taskRangeLeftBorder == taskRangeRightBorder && !taskRangeRightBorderInclusive) {
+ ythrow yexception() << "Key " << taskRangeLeftBorder.Row << " takes all slice and is too large, cannot partition";
+ }
+
+ taskRange.SetFirstKeysBound(taskRangeLeftBorder, taskRangeLeftBorderInclusive);
+ taskRange.SetLastKeysBound(taskRangeRightBorder, taskRangeRightBorderInclusive);
+
+ std::unordered_map<TString, std::vector<TChunkUnit>> chunksByTable;
+ std::unordered_map<TString, std::unordered_set<TString>> seenChunksByTable;
+
+ for (const auto& slice : slices) {
+ for (const auto& [tableId, sliceChunks] : slice.ChunksByTable) {
+ auto& out = chunksByTable[tableId];
+ auto& seen = seenChunksByTable[tableId];
+ for (const auto& chunk : sliceChunks) {
+ if (chunk.KeyRange.LastKeysBound != taskRangeRightBorder || !chunk.KeyRange.IsLastBoundInclusive) {
+ continue;
+ }
+
+ TString chunkKey = TStringBuilder() << chunk.PartId << "#" << chunk.ChunkIndex;
+ if (!seen.insert(chunkKey).second) {
+ continue;
+ }
+ out.push_back(chunk);
+ }
+ }
+ }
+ LeftBoundaryChunks_.push(chunksByTable);
+
+ return taskRange;
+}
+
+void TReducePartitioner::CheckMaxKeySizePerSlices(const std::vector<TSlice>& slices) {
+ ui64 maxKeySizePerPart = Settings_.MaxKeySizePerPart;
+ YQL_ENSURE(!slices.empty());
+ TMaybe<TFmrTableKeysBoundary> curLeftBorder;
+ ui64 currentFilledSlicesWeight = CurrentLastKeyWeight_;
+ for (const auto& slice: slices) {
+ if (!curLeftBorder.Defined()) {
+ curLeftBorder = slice.RangeForRead.FirstKeysBound;
+ }
+ YQL_ENSURE(curLeftBorder.Defined());
+ YQL_ENSURE(slice.RangeForRead.LastKeysBound.Defined());
+ auto curRightBorder = *slice.RangeForRead.LastKeysBound;
+
+ if (*curLeftBorder != curRightBorder) {
+ if (currentFilledSlicesWeight > maxKeySizePerPart) {
+ ythrow yexception() << "Key " << curLeftBorder->Row << " weighs at least " << currentFilledSlicesWeight << " bytes which is larget than maxKeySizePerPart - " << maxKeySizePerPart << " - cannot partition";
+ }
+ curLeftBorder = Nothing();
+ currentFilledSlicesWeight = 0;
+ } else {
+ currentFilledSlicesWeight += slice.Weight;
+ }
+ }
+ if (currentFilledSlicesWeight > maxKeySizePerPart) {
+ ythrow yexception() << "Key " << curLeftBorder->Row << " weighs at least " << currentFilledSlicesWeight << " bytes which is larget than maxKeySizePerPart - " << maxKeySizePerPart;
+ }
+ CurrentLastKeyWeight_ = currentFilledSlicesWeight;
+}
+
+void TReducePartitioner::ChangeLeftKeyBoundaryIfNeeded(
+ TFmrTableKeysBoundary&,
+ bool& isLeftInclusive,
+ const TPartitionerFilterBoundary&
+) {
+ isLeftInclusive = true;
+}
+
+void TReducePartitioner::ChangeRightKeyBoundaryIfNeeded(TFmrTableKeysBoundary& rightKey, const TFmrTableKeysBoundary& taskRangeLastKey) {
+ rightKey = taskRangeLastKey;
+}
+
+void TReducePartitioner::ExtendChunksPerTable(std::unordered_map<TString, std::vector<TChunkUnit>>& chunksByTable) {
+ if (LeftBoundaryChunks_.size() < 2) {
+ return;
+ }
+
+ auto leftBoundaryChunks = LeftBoundaryChunks_.front();
+ LeftBoundaryChunks_.pop();
+ if (leftBoundaryChunks.empty()) {
+ return;
+ }
+
+ for (auto& [tableId, chunks]: leftBoundaryChunks) {
+ if (!chunksByTable.contains(tableId)) {
+ chunksByTable[tableId] = chunks;
+ } else {
+ // extending current chunks for table with all other non-intersecting elements from LeftBoundaryChunks_.
+ auto currentChunksByTable = chunksByTable[tableId];
+ std::unordered_set<TString> chunkKeys;
+ std::vector<TChunkUnit> neededChunks;
+ for (auto& chunk: currentChunksByTable) {
+ TString chunkKey = TStringBuilder() << chunk.PartId << "#" << chunk.ChunkIndex;
+ chunkKeys.emplace(chunkKey);
+ }
+ for (auto& chunk: chunks) {
+ auto curChunkKey = TStringBuilder() << chunk.PartId << "#" << chunk.ChunkIndex;
+ if (!chunkKeys.contains(curChunkKey)) {
+ neededChunks.emplace_back(chunk);
+ chunkKeys.emplace(curChunkKey);
+ }
+ }
+ // since all needed chunks are from left boundary, write them to beginning of existing chunks.
+ neededChunks.insert(neededChunks.end(), currentChunksByTable.begin(), currentChunksByTable.end());
+ chunksByTable[tableId] = neededChunks;
+ }
+ }
+}
+
+TTaskTableInputRef TReducePartitioner::CreateTaskInputFromSlices(
+ const std::vector<TSlice>& slices,
+ const std::vector<TFmrTableRef>& inputTables,
+ bool isLastRange
+) {
+ CheckMaxKeySizePerSlices(slices);
+ return CreateTaskInputFromSlicesImpl(slices, inputTables, isLastRange);
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.h b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.h
new file mode 100644
index 00000000000..d717d4dc0de
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_reduce_partitioner.h
@@ -0,0 +1,62 @@
+#pragma once
+
+#include "yql_yt_sorted_partitioner_base.h"
+
+#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
+#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h>
+#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_sorted_writer.h>
+#include <yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_fmr_partitioner.h>
+#include <yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.h>
+#include <yql/essentials/utils/log/log.h>
+
+#include <queue>
+
+namespace NYql::NFmr {
+
+struct TReducePartitionSettings {
+ TFmrPartitionerSettings FmrPartitionSettings;
+ ui64 MaxKeySizePerPart = 0;
+};
+
+class TReducePartitioner: public TSortedPartitionerBase {
+public:
+ TReducePartitioner(
+ const std::unordered_map<TFmrTableId, std::vector<TString>>& partIdsForTables,
+ const std::unordered_map<TString, std::vector<TChunkStats>>& partIdStats,
+ const TSortingColumns& reduceBy,
+ const TReducePartitionSettings& settings
+ );
+
+private:
+ TPartitionResult PartitionFmrTables(const std::vector<TFmrTableRef>& inputTables);
+
+ TTaskTableInputRef CreateTaskInputFromSlices(
+ const std::vector<TSlice>& slices,
+ const std::vector<TFmrTableRef>& inputTables,
+ bool isLastRange
+ ) override;
+
+ TFmrTableKeysRange GetReadRangeFromSlices(const std::vector<TSlice>& slices, bool isLastRange) override;
+
+ void CheckMaxKeySizePerSlices(const std::vector<TSlice>& slices);
+
+ void ChangeLeftKeyBoundaryIfNeeded(
+ TFmrTableKeysBoundary& leftKey,
+ bool& isLeftInclusive,
+ const TPartitionerFilterBoundary& filterBoundary
+ ) override;
+
+ void ChangeRightKeyBoundaryIfNeeded(TFmrTableKeysBoundary& rightKey, const TFmrTableKeysBoundary& taskRangeLastKey) override;
+
+
+ void ExtendChunksPerTable(std::unordered_map<TString, std::vector<TChunkUnit>>& chunksByTable) override;
+
+private:
+ const TSortingColumns ReduceBy_;
+ const TReducePartitionSettings Settings_;
+ TMaybe<TFmrTableKeysBoundary> LeftBoundary_; // key right boundary with which we non-inclusively chopped previous reduce job.
+ std::queue<std::unordered_map<TString, std::vector<TChunkUnit>>> LeftBoundaryChunks_;
+ ui64 CurrentLastKeyWeight_ = 0 ;
+};
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.cpp b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.cpp
index 2facecde3e4..78f78c0a856 100644
--- a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.cpp
@@ -6,548 +6,56 @@
namespace NYql::NFmr {
-void TSortedPartitioner::TChunkContainer::Push(TChunkUnit chunk) {
- Chunks_.push_back(std::move(chunk));
+TFmrTableKeysRange TSortedPartitioner::GetReadRangeFromSlices(const std::vector<TSlice>& slices, bool /*isLastRange*/) {
+ YQL_ENSURE(!slices.empty());
+ TFmrTableKeysRange taskRange{.IsEmpty = false};
+ auto minSliceReadRange = slices[0].RangeForRead, maxSliceReadRange = slices[slices.size() - 1].RangeForRead;
+ YQL_ENSURE(minSliceReadRange.FirstKeysBound.Defined());
+ YQL_ENSURE(maxSliceReadRange.LastKeysBound.Defined());
+ taskRange.SetFirstKeysBound(*minSliceReadRange.FirstKeysBound, minSliceReadRange.IsFirstBoundInclusive);
+ taskRange.SetLastKeysBound(*maxSliceReadRange.LastKeysBound, maxSliceReadRange.IsLastBoundInclusive);
+ return taskRange;
}
-bool TSortedPartitioner::TChunkContainer::IsEmpty() const {
- return Chunks_.empty();
-}
-
-const std::vector<TSortedPartitioner::TChunkUnit>& TSortedPartitioner::TChunkContainer::GetChunks() const {
- return Chunks_;
-}
-
-void TSortedPartitioner::TChunkContainer::Clear() {
- Chunks_.clear();
- KeysRange_ = TFmrTableKeysRange{.IsEmpty = true};
-}
-
-const TFmrTableKeysRange& TSortedPartitioner::TChunkContainer::GetKeysRange() const {
- return KeysRange_;
-}
-
-void TSortedPartitioner::TChunkContainer::UpdateKeyRange(const TFmrTableKeysRange& KeyRange) {
- if (KeyRange.IsEmpty) {
- return;
- }
-
- if (KeysRange_.IsEmpty) {
- KeysRange_.IsEmpty = false;
- KeysRange_.SetFirstKeysBound(*KeyRange.FirstKeysBound, KeyRange.IsFirstBoundInclusive);
- KeysRange_.SetLastKeysBound(*KeyRange.LastKeysBound, KeyRange.IsLastBoundInclusive);
- }
-
- if (!KeysRange_.IsFirstKeySet()) {
- KeysRange_.SetFirstKeysBound(*KeyRange.FirstKeysBound, KeyRange.IsFirstBoundInclusive);
- } else {
- bool isInclusive = true;
- if (*KeysRange_.FirstKeysBound < *KeyRange.FirstKeysBound) {
- isInclusive = KeysRange_.IsFirstBoundInclusive;
- } else if (*KeysRange_.FirstKeysBound > *KeyRange.FirstKeysBound) {
- isInclusive = KeyRange.IsFirstBoundInclusive;
- } else {
- isInclusive = KeysRange_.IsFirstBoundInclusive || KeyRange.IsFirstBoundInclusive;
- }
- KeysRange_.SetFirstKeysBound(std::min(*KeysRange_.FirstKeysBound, *KeyRange.FirstKeysBound), isInclusive);
- }
-
- if (!KeysRange_.IsLastKeySet()) {
- KeysRange_.SetLastKeysBound(*KeyRange.LastKeysBound, KeyRange.IsLastBoundInclusive);
- } else {
- bool isInclusive = true;
- if (*KeysRange_.LastKeysBound < *KeyRange.LastKeysBound) {
- isInclusive = KeysRange_.IsLastBoundInclusive;
- } else if (*KeysRange_.LastKeysBound > *KeyRange.LastKeysBound) {
- isInclusive = KeyRange.IsLastBoundInclusive;
- } else {
- isInclusive = KeysRange_.IsLastBoundInclusive && KeyRange.IsLastBoundInclusive;
- }
- KeysRange_.SetLastKeysBound(std::min(*KeysRange_.LastKeysBound, *KeyRange.LastKeysBound), isInclusive);
- }
-}
-
-TSortedPartitioner::TFmrTablesChunkPool::TFmrTablesChunkPool(
+TTaskTableInputRef TSortedPartitioner::CreateTaskInputFromSlices(
+ const std::vector<TSlice>& slices,
const std::vector<TFmrTableRef>& inputTables,
- const std::unordered_map<TFmrTableId, std::vector<TString>>& partIdsForTables,
- const std::unordered_map<TString, std::vector<TChunkStats>>& partIdStats,
- const TSortingColumns& KeyColumns
-)
- : PartIdsForTables_(partIdsForTables)
- , PartIdStats_(partIdStats)
- , KeyColumns_(KeyColumns)
-{
- InitTableInputs(inputTables);
-}
-
-void TSortedPartitioner::TFmrTablesChunkPool::InitTableInputs(const std::vector<TFmrTableRef>& inputTables) {
- TableOrder_.clear();
- TableOrder_.reserve(inputTables.size());
-
- for (const auto& table : inputTables) {
- const TString& tableId = table.FmrTableId.Id;
- Y_ENSURE(
- PartIdsForTables_.contains(tableId),
- "No partitions metadata found for input FMR table: " << tableId);
-
- const auto& partIds = PartIdsForTables_.at(tableId);
- Y_ENSURE(
- !partIds.empty(),
- "SortedPartitioner requires at least one partition for input table: " << tableId);
- TableOrder_.push_back(tableId);
-
- std::vector<TChunkUnit> chunks;
- ui64 tableChunkCount = 0;
- for (const auto& partId : partIds) {
- Y_ENSURE(
- PartIdStats_.contains(partId),
- "No chunk stats found for partId: " << partId << " (table: " << tableId << ")");
-
- const auto& stats = PartIdStats_.at(partId);
- tableChunkCount += stats.size();
- for (ui64 chunkIdx = 0; chunkIdx < stats.size(); ++chunkIdx) {
- const auto& chunk = stats[chunkIdx];
- Y_ENSURE(chunk.SortedChunkStats.IsSorted, "Every FMR chunk inside SortedPartitioner must be sorted");
- if (chunk.SortedChunkStats.FirstRowKeys.IsUndefined() || chunk.SortedChunkStats.LastRowKeys.IsUndefined()) {
- Error_ = TFmrError{
- .Component = EFmrComponent::Coordinator,
- .Reason = EFmrErrorReason::RestartQuery,
- .ErrorMessage = "Every FMR chunk inside SortedPartitioner must have first and last key bounds"
- };
- return;
- }
-
- auto firstBound = MakeKeyBound(chunk.SortedChunkStats.FirstRowKeys, KeyColumns_);
- auto lastBound = MakeKeyBound(chunk.SortedChunkStats.LastRowKeys, KeyColumns_);
- TFmrTableKeysRange KeyRange{.IsEmpty = false};
- KeyRange.SetFirstKeysBound(std::move(firstBound), true);
- KeyRange.SetLastKeysBound(std::move(lastBound), true);
-
- chunks.push_back(TChunkUnit{
- .TableId = tableId,
- .PartId = partId,
- .ChunkIndex = chunkIdx,
- .DataWeight = chunk.DataWeight,
- .KeyRange = KeyRange,
- });
- }
- }
- Y_ENSURE(
- tableChunkCount > 0,
- "SortedPartitioner requires at least one chunk for input table: " << tableId);
-
- std::stable_sort(chunks.begin(), chunks.end(), [](const TChunkUnit& a, const TChunkUnit& b) {
- if (*a.KeyRange.FirstKeysBound != *b.KeyRange.FirstKeysBound) {
- return *a.KeyRange.FirstKeysBound < *b.KeyRange.FirstKeysBound;
- }
- return *a.KeyRange.LastKeysBound < *b.KeyRange.LastKeysBound;
- });
-
- TableInputs_[tableId] = std::deque<TChunkUnit>(chunks.begin(), chunks.end());
-
- if (!chunks.empty()) {
- TSortedPartitionerFilterBoundary FilterBoundary{.FilterBoundary = *chunks.front().KeyRange.FirstKeysBound, .IsInclusive = true};
- FilterBoundaries_[tableId] = FilterBoundary;
- }
- }
-}
-
-void TSortedPartitioner::TFmrTablesChunkPool::SetError(TFmrError error) {
- Error_ = std::move(error);
-}
-
-TMaybe<TFmrError> TSortedPartitioner::TFmrTablesChunkPool::GetError() const {
- return Error_;
-}
-
-bool TSortedPartitioner::TFmrTablesChunkPool::IsNotEmpty() const {
- return std::any_of(TableInputs_.begin(), TableInputs_.end(),
- [](const auto& pair) { return !pair.second.empty(); });
-}
-
-void TSortedPartitioner::TFmrTablesChunkPool::PutBack(TChunkUnit chunk) {
- TableInputs_[chunk.TableId].push_front(std::move(chunk));
-}
-
-void TSortedPartitioner::TFmrTablesChunkPool::UpdateFilterBoundary(const TString& tableId, const TSortedPartitionerFilterBoundary& FilterBoundary) {
- auto [it, inserted] = FilterBoundaries_.try_emplace(tableId, FilterBoundary);
- if (!inserted) {
- it->second = GetMaxFilterBoundary(tableId, FilterBoundary);
- }
-}
-
-TMaybe<TSortedPartitionerFilterBoundary> TSortedPartitioner::TFmrTablesChunkPool::GetFilterBoundary(const TString& tableId) const {
- auto it = FilterBoundaries_.find(tableId);
- if (it == FilterBoundaries_.end()) {
- return Nothing();
- }
- return TMaybe<TSortedPartitionerFilterBoundary>(it->second);
-}
-
-TFmrTableKeysRange TSortedPartitioner::TFmrTablesChunkPool::GetEffectiveKeysRange(const TChunkUnit& chunk) const {
- auto FilterBoundary = GetFilterBoundary(chunk.TableId);
- if (FilterBoundary.Defined()) {
- auto tmpFilterBoundary = TSortedPartitionerFilterBoundary{.FilterBoundary=*chunk.KeyRange.FirstKeysBound, .IsInclusive=true};
- const auto maxKey = GetMaxFilterBoundary(chunk.TableId, tmpFilterBoundary);
- TFmrTableKeysRange out{.IsEmpty = false};
- out.SetFirstKeysBound(maxKey.FilterBoundary, maxKey.IsInclusive);
- out.SetLastKeysBound(*chunk.KeyRange.LastKeysBound, chunk.KeyRange.IsLastBoundInclusive);
- return out;
- }
- return chunk.KeyRange;
-}
-
-const std::vector<TString>& TSortedPartitioner::TFmrTablesChunkPool::GetTableOrder() const {
- return TableOrder_;
-}
-
-TMaybe<TSortedPartitioner::TChunkUnit> TSortedPartitioner::TFmrTablesChunkPool::ReadNextChunk(const TString& tableId) {
- auto it = TableInputs_.find(tableId);
- if (it == TableInputs_.end()) {
- return Nothing();
- }
- auto& chunkQueue = it->second;
- if (chunkQueue.empty()) {
- return Nothing();
- }
-
- TChunkUnit chunk = chunkQueue.front();
- chunkQueue.pop_front();
- return chunk;
+ bool isLastRange
+) {
+ return CreateTaskInputFromSlicesImpl(slices, inputTables, isLastRange);
}
-TSortedPartitionerFilterBoundary TSortedPartitioner::TFmrTablesChunkPool::GetMaxFilterBoundary(const TString& tableId, const TSortedPartitionerFilterBoundary& FilterBoundary) const {
- auto it = FilterBoundaries_.find(tableId);
- if (it == FilterBoundaries_.end() || it->second.FilterBoundary < FilterBoundary.FilterBoundary) {
- return FilterBoundary;
- } else if (it->second.FilterBoundary == FilterBoundary.FilterBoundary) {
- if (!FilterBoundary.IsInclusive) {
- return FilterBoundary;
- } else {
- return it->second;
- }
+void TSortedPartitioner::ChangeLeftKeyBoundaryIfNeeded(
+ TFmrTableKeysBoundary& leftKey,
+ bool& leftInclusive,
+ const TPartitionerFilterBoundary& filterBoundary
+) {
+ if (filterBoundary.FilterBoundary > leftKey) {
+ leftKey = filterBoundary.FilterBoundary;
+ leftInclusive = filterBoundary.IsInclusive;
+ } else if (filterBoundary.FilterBoundary == leftKey) {
+ leftInclusive = leftInclusive && filterBoundary.IsInclusive;
}
- return it->second;
}
-TSortedPartitioner::TReadSliceResult TSortedPartitioner::ReadSlice(TFmrTablesChunkPool& chunkPool) {
- TChunkContainer container;
- std::vector<TFmrTableKeysRange> effectiveRanges;
- effectiveRanges.reserve(chunkPool.GetTableOrder().size());
-
- for (const auto& tableId : chunkPool.GetTableOrder()) {
- auto chunk = chunkPool.ReadNextChunk(tableId);
- if (!chunk.Defined()) {
- continue;
- }
- auto effectiveRange = chunkPool.GetEffectiveKeysRange(*chunk);
- container.Push(std::move(*chunk));
- container.UpdateKeyRange(effectiveRange);
- effectiveRanges.push_back(std::move(effectiveRange));
- }
-
- if (container.IsEmpty()) {
- return TReadSliceResult{};
+void TSortedPartitioner::ChangeRightKeyBoundaryIfNeeded(TFmrTableKeysBoundary& rightKey, const TFmrTableKeysBoundary& taskRangeLastKey) {
+ if (taskRangeLastKey < rightKey) {
+ rightKey = taskRangeLastKey;
}
- const TFmrTableKeysRange& rangeForRead = container.GetKeysRange();
-
- const TFmrTableKeysBoundary& sepKey = *rangeForRead.LastKeysBound;
-
- TSlice slice;
- slice.RangeForRead = rangeForRead;
-
- const auto& chunks = container.GetChunks();
-
- for (size_t i = 0; i < chunks.size(); ++i) {
- const auto& chunk = chunks[i];
- const auto& effectiveRange = effectiveRanges[i];
-
- slice.PerTableLeft[chunk.TableId] = TSortedPartitionerFilterBoundary{
- .FilterBoundary = *effectiveRange.FirstKeysBound,
- .IsInclusive = effectiveRange.IsFirstBoundInclusive
- };
-
- const TFmrTableKeysRange intersection = rangeForRead.GetIntersection(effectiveRange);
- if (intersection.IsEmpty) {
- chunkPool.PutBack(chunk);
- continue;
- }
-
- slice.ChunksByTable[chunk.TableId].push_back(chunk);
-
- const TFmrTableKeysBoundary& interFirst = *intersection.FirstKeysBound;
- const TFmrTableKeysBoundary& effLast = *effectiveRange.LastKeysBound;
-
- if (interFirst <= sepKey && sepKey < effLast) {
- chunkPool.UpdateFilterBoundary(chunk.TableId, TSortedPartitionerFilterBoundary{.FilterBoundary = sepKey, .IsInclusive = false});
- chunkPool.PutBack(chunk);
- } else if (interFirst <= sepKey && sepKey == effLast) {
- slice.Weight += chunk.DataWeight;
- chunkPool.UpdateFilterBoundary(chunk.TableId, TSortedPartitionerFilterBoundary{.FilterBoundary = sepKey, .IsInclusive = true});
- } else {
- YQL_CLOG(WARN, FastMapReduce) << "FMR fallback to YT: undefined behaviour in ReadSlice, intersection doesn't reach separator key";
- return TReadSliceResult{.Error = TFmrError{
- .Component = EFmrComponent::Coordinator,
- .Reason = EFmrErrorReason::FallbackOperation,
- .ErrorMessage = "Undefined behaviour in ReadSlice: intersection doesn't reach separator key"
- }};
- }
- }
-
- return TReadSliceResult{.Slice = std::move(slice)};
}
-TTaskTableInputRef TSortedPartitioner::CreateTaskInputFromSlices(const std::vector<TSlice>& slices, const std::vector<TFmrTableRef>& inputTables) const {
- TFmrTableKeysRange taskRange{.IsEmpty = true};
- std::unordered_map<TString, std::vector<TChunkUnit>> chunksByTable;
- std::unordered_map<TString, TSortedPartitionerFilterBoundary> perTableLeft;
- std::unordered_map<TString, std::unordered_set<TString>> seenChunksByTable;
-
- for (const auto& slice : slices) {
- if (!slice.RangeForRead.IsEmpty) {
- if (taskRange.IsEmpty) {
- taskRange = slice.RangeForRead;
- } else {
- const auto& firstKey = *slice.RangeForRead.FirstKeysBound;
- if (!taskRange.IsFirstKeySet() || firstKey < *taskRange.FirstKeysBound) {
- taskRange.SetFirstKeysBound(firstKey, slice.RangeForRead.IsFirstBoundInclusive);
- } else if (taskRange.IsFirstKeySet() && firstKey == *taskRange.FirstKeysBound) {
- taskRange.IsFirstBoundInclusive = taskRange.IsFirstBoundInclusive || slice.RangeForRead.IsFirstBoundInclusive;
- }
-
- const auto& lastKey = *slice.RangeForRead.LastKeysBound;
- if (!taskRange.IsLastKeySet() || lastKey > *taskRange.LastKeysBound) {
- taskRange.SetLastKeysBound(lastKey, true);
- } else if (taskRange.IsLastKeySet() && lastKey == *taskRange.LastKeysBound) {
- taskRange.IsLastBoundInclusive = true;
- }
- taskRange.IsEmpty = false;
- }
- }
-
- for (const auto& [tableId, sliceChunks] : slice.ChunksByTable) {
- auto& out = chunksByTable[tableId];
- auto& seen = seenChunksByTable[tableId];
- for (const auto& chunk : sliceChunks) {
- TString chunkKey = TStringBuilder() << chunk.PartId << "#" << chunk.ChunkIndex;
- if (!seen.insert(chunkKey).second) {
- continue;
- }
- out.push_back(chunk);
- }
- }
-
- for (const auto& [tableId, sliceFilterBoundary] : slice.PerTableLeft) {
- auto it = perTableLeft.find(tableId);
- if (it == perTableLeft.end()) {
- perTableLeft.emplace(tableId, sliceFilterBoundary);
- continue;
- }
- auto& currentFilterBoundary = it->second;
- if (sliceFilterBoundary.FilterBoundary < currentFilterBoundary.FilterBoundary) {
- currentFilterBoundary = sliceFilterBoundary;
- } else if (sliceFilterBoundary.FilterBoundary == currentFilterBoundary.FilterBoundary) {
- currentFilterBoundary.IsInclusive = currentFilterBoundary.IsInclusive || sliceFilterBoundary.IsInclusive;
- }
- }
- }
-
- TTaskTableInputRef taskInput;
- taskInput.Inputs.reserve(inputTables.size());
-
- const TFmrTableKeysBoundary& taskLastKey = *taskRange.LastKeysBound;
-
- for (const auto& t : inputTables) {
- const TString& tableId = t.FmrTableId.Id;
- auto it = chunksByTable.find(tableId);
- if (it == chunksByTable.end() || it->second.empty()) {
- continue;
- }
- const auto& chunks = it->second;
-
- std::vector<TTableRange> tableRanges;
- tableRanges.reserve(chunks.size());
- std::unordered_map<TString, size_t> byPartIndex;
- byPartIndex.reserve(chunks.size());
- std::unordered_map<TString, std::vector<ui64>> chunkIndexesByPart;
- chunkIndexesByPart.reserve(chunks.size());
- for (const auto& c : chunks) {
- auto [it, inserted] = byPartIndex.try_emplace(c.PartId, tableRanges.size());
- if (inserted) {
- tableRanges.push_back(TTableRange{
- .PartId = c.PartId,
- .MinChunk = c.ChunkIndex,
- .MaxChunk = c.ChunkIndex + 1
- });
- } else {
- auto& range = tableRanges[it->second];
- range.MinChunk = std::min(range.MinChunk, c.ChunkIndex);
- range.MaxChunk = std::max(range.MaxChunk, c.ChunkIndex + 1);
- }
- chunkIndexesByPart[c.PartId].push_back(c.ChunkIndex);
- }
-
- TFmrTableKeysBoundary leftKey = *taskRange.FirstKeysBound;
- bool leftInclusive = taskRange.IsFirstBoundInclusive;
- auto FilterBoundaryIt = perTableLeft.find(tableId);
- if (FilterBoundaryIt != perTableLeft.end()) {
- const auto& FilterBoundary = FilterBoundaryIt->second;
- if (FilterBoundary.FilterBoundary > leftKey) {
- leftKey = FilterBoundary.FilterBoundary;
- leftInclusive = FilterBoundary.IsInclusive;
- } else if (FilterBoundary.FilterBoundary == leftKey) {
- leftInclusive = leftInclusive && FilterBoundary.IsInclusive;
- }
- }
-
- TFmrTableKeysBoundary rightKey = *chunks.back().KeyRange.LastKeysBound;
- if (taskLastKey < rightKey) {
- rightKey = taskLastKey;
- }
-
- TFmrTableInputRef inputRef{
- .TableId = tableId,
- .TableRanges = std::move(tableRanges),
- .Columns = t.Columns,
- .SerializedColumnGroups = t.SerializedColumnGroups,
- .IsFirstRowInclusive = leftInclusive,
- .FirstRowKeys = TString(leftKey.Row),
- .LastRowKeys = TString(rightKey.Row),
- };
- taskInput.Inputs.emplace_back(std::move(inputRef));
- }
-
- return taskInput;
+void TSortedPartitioner::ExtendChunksPerTable(std::unordered_map<TString, std::vector<TChunkUnit>>&) {
+ return;
}
TSortedPartitioner::TSortedPartitioner(
const std::unordered_map<TFmrTableId, std::vector<TString>>& partIdsForTables,
const std::unordered_map<TString, std::vector<TChunkStats>>& partIdStats,
- TSortingColumns KeyColumns,
+ TSortingColumns keyColumns,
const TSortedPartitionSettings& settings
)
- : PartIdsForTables_(partIdsForTables)
- , PartIdStats_(partIdStats)
- , KeyColumns_(std::move(KeyColumns))
- , Settings_(settings)
+ : TSortedPartitionerBase(partIdsForTables, partIdStats, keyColumns, settings.FmrPartitionSettings)
{
- Y_ENSURE(Settings_.FmrPartitionSettings.MaxParts > 0);
- Y_ENSURE(Settings_.FmrPartitionSettings.MaxDataWeightPerPart > 0);
-}
-
-TPartitionResult TSortedPartitioner::PartitionTablesIntoTasksSorted(
- const std::vector<TOperationTableRef>& inputTables
-) {
- if (inputTables.empty()) {
- return TPartitionResult{};
- }
-
- std::vector<TFmrTableRef> inputFmrTables;
- inputFmrTables.reserve(inputTables.size());
- for (const auto& table : inputTables) {
- if (auto fmrTable = std::get_if<TFmrTableRef>(&table)) {
- inputFmrTables.push_back(*fmrTable);
- } else {
- return TPartitionResult{.Error = TFmrError{
- .Component = EFmrComponent::Coordinator,
- .Reason = EFmrErrorReason::RestartQuery,
- .ErrorMessage = "Unsupported table type for SortedPartitioner: only FMR tables are supported"
- }};
- }
- }
-
- return PartitionFmrTables(inputFmrTables);
-}
-
-TPartitionResult TSortedPartitioner::PartitionFmrTables(const std::vector<TFmrTableRef>& inputTables) {
- Y_ENSURE(Settings_.FmrPartitionSettings.MaxDataWeightPerPart > 0, "MaxDataWeightPerPart must be > 0");
- Y_ENSURE(!KeyColumns_.Columns.empty(), "KeyColumns must be set for SortedPartitioner");
-
- std::vector<TTaskTableInputRef> tasks;
- const ui64 maxParts = Settings_.FmrPartitionSettings.MaxParts;
-
- TFmrTablesChunkPool chunkPool(inputTables, PartIdsForTables_, PartIdStats_, KeyColumns_);
- if (auto error = chunkPool.GetError()) {
- return TPartitionResult{.Error = *error};
- }
- ui64 maxWeight = Settings_.FmrPartitionSettings.MaxDataWeightPerPart;
- if (Settings_.FmrPartitionSettings.AdjustDataWeightPerPartition) {
- ui64 totalWeight = CollectFmrTotalWeight(inputTables);
- ui64 estimatedParts = (totalWeight + maxWeight - 1) / maxWeight;
- if (estimatedParts > maxParts && maxParts > 0) {
- maxWeight = totalWeight / maxParts;
- YQL_CLOG(INFO, FastMapReduce) << "AdjustDataWeightPerPartition (sorted): adjusted MaxDataWeightPerPart from "
- << Settings_.FmrPartitionSettings.MaxDataWeightPerPart << " to " << maxWeight
- << " (totalWeight=" << totalWeight << ", maxParts=" << maxParts << ")";
- }
- }
-
- std::vector<TSlice> currentSlices;
- ui64 currentWeight = 0;
-
- while (chunkPool.IsNotEmpty()) {
- auto sliceResult = ReadSlice(chunkPool);
- if (sliceResult.Error) {
- return TPartitionResult{.Error = *sliceResult.Error};
- }
- if (!sliceResult.Slice.Defined()) {
- break;
- }
- auto& slice = *sliceResult.Slice;
-
- ui64 newWeight = slice.Weight;
-
- if (!currentSlices.empty() && currentWeight + newWeight > maxWeight) {
- tasks.emplace_back(CreateTaskInputFromSlices(currentSlices, inputTables));
- currentSlices.clear();
- currentWeight = 0;
- }
- currentWeight += newWeight;
- currentSlices.push_back(std::move(slice));
- }
- if (!currentSlices.empty()) {
- tasks.emplace_back(CreateTaskInputFromSlices(currentSlices, inputTables));
- }
-
- if (maxParts < tasks.size()) {
- return TPartitionResult{.Error = TFmrError{
- .Component = EFmrComponent::Coordinator,
- .Reason = EFmrErrorReason::RestartQuery,
- .ErrorMessage = TStringBuilder() << "SortedPartitioner produced " << tasks.size()
- << " tasks which exceeds max_parts=" << maxParts
- }};
- }
-
- return TPartitionResult{.TaskInputs = tasks};
-}
-
-ui64 TSortedPartitioner::CollectFmrTotalWeight(const std::vector<TFmrTableRef>& inputTables) {
- ui64 totalWeight = 0;
-
- for (const auto& table : inputTables) {
- const TString& tableId = table.FmrTableId.Id;
- const auto& partIds = PartIdsForTables_.at(tableId);
- for (const auto& partId : partIds) {
- const auto& stats = PartIdStats_.at(partId);
- for (const auto& chunkStat : stats) {
- totalWeight += chunkStat.DataWeight;
- }
- }
- }
-
- return totalWeight;
-}
-
-TPartitionResult PartitionInputTablesIntoTasksSorted(
- const std::vector<TOperationTableRef>& inputTables,
- TSortedPartitioner& partitioner
-) {
- auto result = partitioner.PartitionTablesIntoTasksSorted(inputTables);
- if (!result.Error) {
- YQL_CLOG(DEBUG, FastMapReduce) << "Successfully partitioned " << inputTables.size()
- << " input tables (sorted) into " << result.TaskInputs.size() << " tasks";
- }
- return result;
}
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.h b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.h
index 471fcbd21f4..c537b3add50 100644
--- a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.h
+++ b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner.h
@@ -1,5 +1,7 @@
#pragma once
+#include "yql_yt_sorted_partitioner_base.h"
+
#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h>
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_sorted_writer.h>
@@ -14,124 +16,29 @@ struct TSortedPartitionSettings {
TFmrPartitionerSettings FmrPartitionSettings;
};
-struct TSortedPartitionerFilterBoundary {
- TFmrTableKeysBoundary FilterBoundary;
- bool IsInclusive;
-};
-
-class TSortedPartitioner {
+class TSortedPartitioner: public TSortedPartitionerBase {
public:
TSortedPartitioner(
const std::unordered_map<TFmrTableId, std::vector<TString>>& partIdsForTables,
const std::unordered_map<TString, std::vector<TChunkStats>>& partIdStats,
- TSortingColumns KeyColumns,
+ TSortingColumns keyColumns,
const TSortedPartitionSettings& settings
);
- TPartitionResult PartitionTablesIntoTasksSorted(
- const std::vector<TOperationTableRef>& inputTables
- );
-
private:
- struct TChunkUnit {
- TString TableId;
- TString PartId;
- ui64 ChunkIndex = 0;
- ui64 DataWeight = 0;
- TFmrTableKeysRange KeyRange;
- };
-
- class TChunkContainer {
- public:
- TChunkContainer() = default;
-
- void Push(TChunkUnit chunk);
- bool IsEmpty() const;
- const std::vector<TChunkUnit>& GetChunks() const;
- void Clear();
-
- const TFmrTableKeysRange& GetKeysRange() const;
- void UpdateKeyRange(const TFmrTableKeysRange& KeyRange);
-
- private:
- std::vector<TChunkUnit> Chunks_;
- TFmrTableKeysRange KeysRange_;
- };
-
- class TFmrTablesChunkPool {
- public:
- TFmrTablesChunkPool(
- const std::vector<TFmrTableRef>& inputTables,
- const std::unordered_map<TFmrTableId, std::vector<TString>>& partIdsForTables,
- const std::unordered_map<TString, std::vector<TChunkStats>>& partIdStats,
- const TSortingColumns& KeyColumns
- );
-
- void PutBack(TChunkUnit chunk);
- void UpdateFilterBoundary(const TString& tableId, const TSortedPartitionerFilterBoundary& FilterBoundary);
- bool IsNotEmpty() const;
-
- const std::vector<TString>& GetTableOrder() const;
- TMaybe<TChunkUnit> ReadNextChunk(const TString& tableId);
+ TTaskTableInputRef CreateTaskInputFromSlices(const std::vector<TSlice>& slices, const std::vector<TFmrTableRef>& inputTables, bool isLastRange) override;
- TMaybe<TSortedPartitionerFilterBoundary> GetFilterBoundary(const TString& tableId) const;
- TSortedPartitionerFilterBoundary GetMaxFilterBoundary(const TString& tableId, const TSortedPartitionerFilterBoundary& FilterBoundary) const;
- TFmrTableKeysRange GetEffectiveKeysRange(const TChunkUnit& chunk) const;
+ TFmrTableKeysRange GetReadRangeFromSlices(const std::vector<TSlice>& slices, bool isLastRange) override;
- void SetError(TFmrError error);
- TMaybe<TFmrError> GetError() const;
+ void ChangeLeftKeyBoundaryIfNeeded(
+ TFmrTableKeysBoundary& leftKey,
+ bool& isLeftInclusive,
+ const TPartitionerFilterBoundary& filterBoundary
+ ) override;
- private:
- void InitTableInputs(const std::vector<TFmrTableRef>& inputTables);
+ void ChangeRightKeyBoundaryIfNeeded(TFmrTableKeysBoundary& rightKey, const TFmrTableKeysBoundary& taskRangeLastKey) override;
- std::vector<TString> TableOrder_;
- std::unordered_map<TString, std::deque<TChunkUnit>> TableInputs_;
- std::unordered_map<TString, TSortedPartitionerFilterBoundary> FilterBoundaries_;
- const std::unordered_map<TFmrTableId, std::vector<TString>>& PartIdsForTables_;
- const std::unordered_map<TString, std::vector<TChunkStats>>& PartIdStats_;
- const TSortingColumns& KeyColumns_;
- TMaybe<TFmrError> Error_;
- };
-
- TPartitionResult PartitionFmrTables(
- const std::vector<TFmrTableRef>& inputTables
- );
-
- struct TSlice {
- std::unordered_map<TString, std::vector<TChunkUnit>> ChunksByTable;
- TFmrTableKeysRange RangeForRead;
- std::unordered_map<TString, TSortedPartitionerFilterBoundary> PerTableLeft;
- ui64 Weight = 0;
- };
-
- struct TReadSliceResult {
- TMaybe<TSlice> Slice;
- TMaybe<TFmrError> Error;
- };
-
- TReadSliceResult ReadSlice(TFmrTablesChunkPool& chunkPool);
- TTaskTableInputRef CreateTaskInputFromSlices(const std::vector<TSlice>& slices, const std::vector<TFmrTableRef>& inputTables) const;
-
- std::unordered_map<TString, std::vector<TTableRange>> ProcessChunkForSlice(
- TFmrTablesChunkPool& chunkPool,
- const TChunkContainer& container,
- const TFmrTableKeysRange& taskRange
- );
-
- ui64 CollectFmrTotalWeight(
- const std::vector<TFmrTableRef>& inputTables
- );
-
-private:
- const std::unordered_map<TFmrTableId, std::vector<TString>> PartIdsForTables_;
- const std::unordered_map<TString, std::vector<TChunkStats>> PartIdStats_;
- const TSortingColumns KeyColumns_;
- const TSortedPartitionSettings Settings_;
+ void ExtendChunksPerTable(std::unordered_map<TString, std::vector<TChunkUnit>>& chunksByTable) override;
};
-TPartitionResult PartitionInputTablesIntoTasksSorted(
- const std::vector<TOperationTableRef>& inputTables,
- TSortedPartitioner& partitioner
-);
-
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.cpp b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.cpp
new file mode 100644
index 00000000000..4882972f2c6
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.cpp
@@ -0,0 +1,530 @@
+#include "yql_yt_sorted_partitioner_base.h"
+
+#include <yql/essentials/utils/log/log.h>
+
+namespace NYql::NFmr {
+
+
+void TChunkContainer::Push(TChunkUnit chunk) {
+ Chunks_.push_back(std::move(chunk));
+}
+
+bool TChunkContainer::IsEmpty() const {
+ return Chunks_.empty();
+}
+
+const std::vector<TChunkUnit>& TChunkContainer::GetChunks() const {
+ return Chunks_;
+}
+
+void TChunkContainer::Clear() {
+ Chunks_.clear();
+ KeysRange_ = TFmrTableKeysRange{.IsEmpty = true};
+}
+
+const TFmrTableKeysRange& TChunkContainer::GetKeysRange() const {
+ return KeysRange_;
+}
+
+void TChunkContainer::UpdateKeyRange(const TFmrTableKeysRange& KeyRange) {
+ if (KeyRange.IsEmpty) {
+ return;
+ }
+
+ if (KeysRange_.IsEmpty) {
+ KeysRange_.IsEmpty = false;
+ KeysRange_.SetFirstKeysBound(*KeyRange.FirstKeysBound, KeyRange.IsFirstBoundInclusive);
+ KeysRange_.SetLastKeysBound(*KeyRange.LastKeysBound, KeyRange.IsLastBoundInclusive);
+ }
+
+ if (!KeysRange_.IsFirstKeySet()) {
+ KeysRange_.SetFirstKeysBound(*KeyRange.FirstKeysBound, KeyRange.IsFirstBoundInclusive);
+ } else {
+ bool isInclusive = true;
+ if (*KeysRange_.FirstKeysBound < *KeyRange.FirstKeysBound) {
+ isInclusive = KeysRange_.IsFirstBoundInclusive;
+ } else if (*KeysRange_.FirstKeysBound > *KeyRange.FirstKeysBound) {
+ isInclusive = KeyRange.IsFirstBoundInclusive;
+ } else {
+ isInclusive = KeysRange_.IsFirstBoundInclusive || KeyRange.IsFirstBoundInclusive;
+ }
+ KeysRange_.SetFirstKeysBound(std::min(*KeysRange_.FirstKeysBound, *KeyRange.FirstKeysBound), isInclusive);
+ }
+
+ if (!KeysRange_.IsLastKeySet()) {
+ KeysRange_.SetLastKeysBound(*KeyRange.LastKeysBound, KeyRange.IsLastBoundInclusive);
+ } else {
+ bool isInclusive = true;
+ if (*KeysRange_.LastKeysBound < *KeyRange.LastKeysBound) {
+ isInclusive = KeysRange_.IsLastBoundInclusive;
+ } else if (*KeysRange_.LastKeysBound > *KeyRange.LastKeysBound) {
+ isInclusive = KeyRange.IsLastBoundInclusive;
+ } else {
+ isInclusive = KeysRange_.IsLastBoundInclusive && KeyRange.IsLastBoundInclusive;
+ }
+ KeysRange_.SetLastKeysBound(std::min(*KeysRange_.LastKeysBound, *KeyRange.LastKeysBound), isInclusive);
+ }
+}
+
+TFmrTablesChunkPool::TFmrTablesChunkPool(
+ const std::vector<TFmrTableRef>& inputTables,
+ const std::unordered_map<TFmrTableId, std::vector<TString>>& partIdsForTables,
+ const std::unordered_map<TString, std::vector<TChunkStats>>& partIdStats,
+ const TSortingColumns& KeyColumns
+)
+ : PartIdsForTables_(partIdsForTables)
+ , PartIdStats_(partIdStats)
+ , KeyColumns_(KeyColumns)
+{
+ InitTableInputs(inputTables);
+}
+
+void TFmrTablesChunkPool::InitTableInputs(const std::vector<TFmrTableRef>& inputTables) {
+ TableOrder_.clear();
+ TableOrder_.reserve(inputTables.size());
+
+ for (const auto& table : inputTables) {
+ const TString& tableId = table.FmrTableId.Id;
+ Y_ENSURE(
+ PartIdsForTables_.contains(tableId),
+ "No partitions metadata found for input FMR table: " << tableId);
+
+ const auto& partIds = PartIdsForTables_.at(tableId);
+ Y_ENSURE(
+ !partIds.empty(),
+ "SortedPartitioner requires at least one partition for input table: " << tableId);
+ TableOrder_.push_back(tableId);
+
+ std::vector<TChunkUnit> chunks;
+ ui64 tableChunkCount = 0;
+ for (const auto& partId : partIds) {
+ Y_ENSURE(
+ PartIdStats_.contains(partId),
+ "No chunk stats found for partId: " << partId << " (table: " << tableId << ")");
+
+ const auto& stats = PartIdStats_.at(partId);
+ tableChunkCount += stats.size();
+ for (ui64 chunkIdx = 0; chunkIdx < stats.size(); ++chunkIdx) {
+ const auto& chunk = stats[chunkIdx];
+ Y_ENSURE(chunk.SortedChunkStats.IsSorted, "Every FMR chunk inside SortedPartitioner must be sorted");
+ if (chunk.SortedChunkStats.FirstRowKeys.IsUndefined() || chunk.SortedChunkStats.LastRowKeys.IsUndefined()) {
+ Error_ = TFmrError{
+ .Component = EFmrComponent::Coordinator,
+ .Reason = EFmrErrorReason::RestartQuery,
+ .ErrorMessage = "Every FMR chunk inside SortedPartitioner must have first and last key bounds"
+ };
+ return;
+ }
+
+ auto firstBound = MakeKeyBound(chunk.SortedChunkStats.FirstRowKeys, KeyColumns_);
+ auto lastBound = MakeKeyBound(chunk.SortedChunkStats.LastRowKeys, KeyColumns_);
+ TFmrTableKeysRange KeyRange{.IsEmpty = false};
+ KeyRange.SetFirstKeysBound(std::move(firstBound), true);
+ KeyRange.SetLastKeysBound(std::move(lastBound), true);
+
+ chunks.push_back(TChunkUnit{
+ .TableId = tableId,
+ .PartId = partId,
+ .ChunkIndex = chunkIdx,
+ .DataWeight = chunk.DataWeight,
+ .KeyRange = KeyRange,
+ });
+ }
+ }
+ Y_ENSURE(
+ tableChunkCount > 0,
+ "SortedPartitioner requires at least one chunk for input table: " << tableId);
+
+ std::stable_sort(chunks.begin(), chunks.end(), [](const TChunkUnit& a, const TChunkUnit& b) {
+ if (*a.KeyRange.FirstKeysBound != *b.KeyRange.FirstKeysBound) {
+ return *a.KeyRange.FirstKeysBound < *b.KeyRange.FirstKeysBound;
+ }
+ return *a.KeyRange.LastKeysBound < *b.KeyRange.LastKeysBound;
+ });
+
+ TableInputs_[tableId] = std::deque<TChunkUnit>(chunks.begin(), chunks.end());
+
+ if (!chunks.empty()) {
+ TPartitionerFilterBoundary FilterBoundary{.FilterBoundary = *chunks.front().KeyRange.FirstKeysBound, .IsInclusive = true};
+ FilterBoundaries_[tableId] = FilterBoundary;
+ }
+ }
+}
+
+void TFmrTablesChunkPool::SetError(TFmrError error) {
+ Error_ = std::move(error);
+}
+
+TMaybe<TFmrError> TFmrTablesChunkPool::GetError() const {
+ return Error_;
+}
+
+bool TFmrTablesChunkPool::IsNotEmpty() const {
+ return std::any_of(TableInputs_.begin(), TableInputs_.end(),
+ [](const auto& pair) { return !pair.second.empty(); });
+}
+
+void TFmrTablesChunkPool::PutBack(TChunkUnit chunk) {
+ TableInputs_[chunk.TableId].push_front(std::move(chunk));
+}
+
+void TFmrTablesChunkPool::UpdateFilterBoundary(const TString& tableId, const TPartitionerFilterBoundary& filterBoundary) {
+ auto [it, inserted] = FilterBoundaries_.try_emplace(tableId, filterBoundary);
+ if (!inserted) {
+ it->second = GetMaxFilterBoundary(tableId, filterBoundary);
+ }
+}
+
+TMaybe<TPartitionerFilterBoundary> TFmrTablesChunkPool::GetFilterBoundary(const TString& tableId) const {
+ auto it = FilterBoundaries_.find(tableId);
+ if (it == FilterBoundaries_.end()) {
+ return Nothing();
+ }
+ return TMaybe<TPartitionerFilterBoundary>(it->second);
+}
+
+TFmrTableKeysRange TFmrTablesChunkPool::GetEffectiveKeysRange(const TChunkUnit& chunk) const {
+ auto FilterBoundary = GetFilterBoundary(chunk.TableId);
+ if (FilterBoundary.Defined()) {
+ auto tmpFilterBoundary = TPartitionerFilterBoundary{.FilterBoundary=*chunk.KeyRange.FirstKeysBound, .IsInclusive=true};
+ const auto maxKey = GetMaxFilterBoundary(chunk.TableId, tmpFilterBoundary);
+ TFmrTableKeysRange out{.IsEmpty = false};
+ out.SetFirstKeysBound(maxKey.FilterBoundary, maxKey.IsInclusive);
+ out.SetLastKeysBound(*chunk.KeyRange.LastKeysBound, chunk.KeyRange.IsLastBoundInclusive);
+ return out;
+ }
+ return chunk.KeyRange;
+}
+
+const std::vector<TString>& TFmrTablesChunkPool::GetTableOrder() const {
+ return TableOrder_;
+}
+
+TMaybe<TChunkUnit> TFmrTablesChunkPool::ReadNextChunk(const TString& tableId) {
+ auto it = TableInputs_.find(tableId);
+ if (it == TableInputs_.end()) {
+ return Nothing();
+ }
+ auto& chunkQueue = it->second;
+ if (chunkQueue.empty()) {
+ return Nothing();
+ }
+
+ TChunkUnit chunk = chunkQueue.front();
+ chunkQueue.pop_front();
+ return chunk;
+}
+
+TPartitionerFilterBoundary TFmrTablesChunkPool::GetMaxFilterBoundary(const TString& tableId, const TPartitionerFilterBoundary& filterBoundary) const {
+ auto it = FilterBoundaries_.find(tableId);
+ if (it == FilterBoundaries_.end() || it->second.FilterBoundary < filterBoundary.FilterBoundary) {
+ return filterBoundary;
+ } else if (it->second.FilterBoundary == filterBoundary.FilterBoundary) {
+ if (!filterBoundary.IsInclusive) {
+ return filterBoundary;
+ } else {
+ return it->second;
+ }
+ }
+ return it->second;
+}
+
+TSortedPartitionerBase::TSortedPartitionerBase(
+ const std::unordered_map<TFmrTableId, std::vector<TString>>& partIdsForTables,
+ const std::unordered_map<TString, std::vector<TChunkStats>>& partIdStats,
+ const TSortingColumns& keyColumns,
+ const TFmrPartitionerSettings& fmrPartitionSettings
+)
+ : PartIdsForTables_(partIdsForTables)
+ , PartIdStats_(partIdStats)
+ , KeyColumns_(keyColumns)
+ , FmrPartitionSettings_(fmrPartitionSettings)
+{
+}
+
+TReadSliceResult TSortedPartitionerBase::ReadSlice(TFmrTablesChunkPool& chunkPool) {
+ TChunkContainer container;
+ std::vector<TFmrTableKeysRange> effectiveRanges;
+ effectiveRanges.reserve(chunkPool.GetTableOrder().size());
+
+ for (const auto& tableId : chunkPool.GetTableOrder()) {
+ auto chunk = chunkPool.ReadNextChunk(tableId);
+ if (!chunk.Defined()) {
+ continue;
+ }
+ auto effectiveRange = chunkPool.GetEffectiveKeysRange(*chunk);
+ container.Push(std::move(*chunk));
+ container.UpdateKeyRange(effectiveRange);
+ effectiveRanges.push_back(std::move(effectiveRange));
+ }
+
+ if (container.IsEmpty()) {
+ return TReadSliceResult{};
+ }
+ const TFmrTableKeysRange& rangeForRead = container.GetKeysRange();
+
+ const TFmrTableKeysBoundary& sepKey = *rangeForRead.LastKeysBound;
+
+ TSlice slice;
+ slice.RangeForRead = rangeForRead;
+
+ const auto& chunks = container.GetChunks();
+
+ for (size_t i = 0; i < chunks.size(); ++i) {
+ const auto& chunk = chunks[i];
+ const auto& effectiveRange = effectiveRanges[i];
+
+ slice.PerTableLeft[chunk.TableId] = TPartitionerFilterBoundary{
+ .FilterBoundary = *effectiveRange.FirstKeysBound,
+ .IsInclusive = effectiveRange.IsFirstBoundInclusive
+ };
+
+ const TFmrTableKeysRange intersection = rangeForRead.GetIntersection(effectiveRange);
+ if (intersection.IsEmpty) {
+ chunkPool.PutBack(chunk);
+ continue;
+ }
+
+ slice.ChunksByTable[chunk.TableId].push_back(chunk);
+
+ const TFmrTableKeysBoundary& interFirst = *intersection.FirstKeysBound;
+ const TFmrTableKeysBoundary& effLast = *effectiveRange.LastKeysBound;
+
+ if (interFirst <= sepKey && sepKey < effLast) {
+ chunkPool.UpdateFilterBoundary(chunk.TableId, TPartitionerFilterBoundary{.FilterBoundary = sepKey, .IsInclusive = false});
+ chunkPool.PutBack(chunk);
+ } else if (interFirst <= sepKey && sepKey == effLast) {
+ slice.Weight += chunk.DataWeight;
+ chunkPool.UpdateFilterBoundary(chunk.TableId, TPartitionerFilterBoundary{.FilterBoundary = sepKey, .IsInclusive = true});
+ } else {
+ YQL_CLOG(WARN, FastMapReduce) << "FMR fallback to YT: undefined behaviour in ReadSlice, intersection doesn't reach separator key";
+ return TReadSliceResult{.Error = TFmrError{
+ .Component = EFmrComponent::Coordinator,
+ .Reason = EFmrErrorReason::FallbackOperation,
+ .ErrorMessage = "Undefined behaviour in ReadSlice: intersection doesn't reach separator key"
+ }};
+ }
+ }
+
+ return TReadSliceResult{.Slice = std::move(slice)};
+}
+
+ui64 TSortedPartitionerBase::CollectFmrTotalWeight(const std::vector<TFmrTableRef>& inputTables) {
+ ui64 totalWeight = 0;
+
+ for (const auto& table : inputTables) {
+ const TString& tableId = table.FmrTableId.Id;
+ const auto& partIds = PartIdsForTables_.at(tableId);
+ for (const auto& partId : partIds) {
+ const auto& stats = PartIdStats_.at(partId);
+ for (const auto& chunkStat : stats) {
+ totalWeight += chunkStat.DataWeight;
+ }
+ }
+ }
+
+ return totalWeight;
+}
+
+TTaskTableInputRef TSortedPartitionerBase::CreateTaskInputFromSlicesImpl(
+ const std::vector<TSlice>& slices,
+ const std::vector<TFmrTableRef>& inputTables,
+ bool isLastRange
+) {
+ TFmrTableKeysRange taskRange = GetReadRangeFromSlices(slices, isLastRange);
+ std::unordered_map<TString, std::vector<TChunkUnit>> chunksByTable;
+ std::unordered_map<TString, TPartitionerFilterBoundary> perTableLeft;
+ std::unordered_map<TString, std::unordered_set<TString>> seenChunksByTable;
+
+ for (const auto& slice : slices) {
+ for (const auto& [tableId, sliceChunks] : slice.ChunksByTable) {
+ auto& out = chunksByTable[tableId];
+ auto& seen = seenChunksByTable[tableId];
+ for (const auto& chunk : sliceChunks) {
+ TString chunkKey = TStringBuilder() << chunk.PartId << "#" << chunk.ChunkIndex;
+ if (!seen.insert(chunkKey).second) {
+ continue;
+ }
+ out.push_back(chunk);
+ }
+ }
+
+ for (const auto& [tableId, sliceFilterBoundary] : slice.PerTableLeft) {
+ auto it = perTableLeft.find(tableId);
+ if (it == perTableLeft.end()) {
+ perTableLeft.emplace(tableId, sliceFilterBoundary);
+ continue;
+ }
+ auto& currentFilterBoundary = it->second;
+ if (sliceFilterBoundary.FilterBoundary < currentFilterBoundary.FilterBoundary) {
+ currentFilterBoundary = sliceFilterBoundary;
+ } else if (sliceFilterBoundary.FilterBoundary == currentFilterBoundary.FilterBoundary) {
+ currentFilterBoundary.IsInclusive = currentFilterBoundary.IsInclusive || sliceFilterBoundary.IsInclusive;
+ }
+ }
+ }
+
+ ExtendChunksPerTable(chunksByTable);
+
+ TTaskTableInputRef taskInput;
+ taskInput.Inputs.reserve(inputTables.size());
+
+ const TFmrTableKeysBoundary& taskLastKey = *taskRange.LastKeysBound;
+
+ for (const auto& t : inputTables) {
+ const TString& tableId = t.FmrTableId.Id;
+ auto it = chunksByTable.find(tableId);
+ if (it == chunksByTable.end() || it->second.empty()) {
+ continue;
+ }
+ const auto& chunks = it->second;
+
+ std::vector<TTableRange> tableRanges;
+ tableRanges.reserve(chunks.size());
+ std::unordered_map<TString, size_t> byPartIndex;
+ byPartIndex.reserve(chunks.size());
+ std::unordered_map<TString, std::vector<ui64>> chunkIndexesByPart;
+ chunkIndexesByPart.reserve(chunks.size());
+ for (const auto& c : chunks) {
+ auto [it, inserted] = byPartIndex.try_emplace(c.PartId, tableRanges.size());
+ if (inserted) {
+ tableRanges.push_back(TTableRange{
+ .PartId = c.PartId,
+ .MinChunk = c.ChunkIndex,
+ .MaxChunk = c.ChunkIndex + 1
+ });
+ } else {
+ auto& range = tableRanges[it->second];
+ range.MinChunk = std::min(range.MinChunk, c.ChunkIndex);
+ range.MaxChunk = std::max(range.MaxChunk, c.ChunkIndex + 1);
+ }
+ chunkIndexesByPart[c.PartId].push_back(c.ChunkIndex);
+ }
+
+ TFmrTableKeysBoundary leftKey = *taskRange.FirstKeysBound;
+ bool leftInclusive = taskRange.IsFirstBoundInclusive;
+ auto FilterBoundaryIt = perTableLeft.find(tableId);
+ if (FilterBoundaryIt != perTableLeft.end()) {
+ ChangeLeftKeyBoundaryIfNeeded(leftKey, leftInclusive, FilterBoundaryIt->second);
+ }
+
+ TFmrTableKeysBoundary rightKey = *chunks.back().KeyRange.LastKeysBound;
+ bool rightInclusive = taskRange.IsLastBoundInclusive;
+ ChangeRightKeyBoundaryIfNeeded(rightKey, taskLastKey);
+
+ TFmrTableInputRef inputRef{
+ .TableId = tableId,
+ .TableRanges = std::move(tableRanges),
+ .Columns = t.Columns,
+ .SerializedColumnGroups = t.SerializedColumnGroups,
+ .IsFirstRowInclusive = leftInclusive,
+ .IsLastRowInclusive = rightInclusive,
+ .FirstRowKeys = TString(leftKey.Row),
+ .LastRowKeys = TString(rightKey.Row),
+ };
+ taskInput.Inputs.emplace_back(std::move(inputRef));
+ }
+
+ return taskInput;
+}
+
+TPartitionResult TSortedPartitionerBase::PartitionTablesIntoTasks(
+ const std::vector<TOperationTableRef>& inputTables
+) {
+ if (inputTables.empty()) {
+ return TPartitionResult{};
+ }
+
+ std::vector<TFmrTableRef> inputFmrTables;
+ inputFmrTables.reserve(inputTables.size());
+ for (const auto& table : inputTables) {
+ if (auto fmrTable = std::get_if<TFmrTableRef>(&table)) {
+ inputFmrTables.push_back(*fmrTable);
+ } else {
+ return TPartitionResult{.Error = TFmrError{
+ .Component = EFmrComponent::Coordinator,
+ .Reason = EFmrErrorReason::RestartQuery,
+ .ErrorMessage = "Unsupported table type for partitioner: only FMR tables are supported"
+ }};
+ }
+ }
+
+ auto result = PartitionFmrTables(inputFmrTables);
+ if (!result.Error) {
+ YQL_CLOG(DEBUG, FastMapReduce) << "Successfully partitioned " << inputTables.size()
+ << " input tables (sorted) into " << result.TaskInputs.size() << " tasks";
+ }
+ return result;
+}
+
+TPartitionResult TSortedPartitionerBase::PartitionFmrTables(const std::vector<TFmrTableRef>& inputTables) {
+ Y_ENSURE(FmrPartitionSettings_.MaxDataWeightPerPart > 0, "MaxDataWeightPerPart must be > 0");
+ Y_ENSURE(!KeyColumns_.Columns.empty(), "KeyColumns must be set");
+
+ std::vector<TTaskTableInputRef> tasks;
+ const ui64 maxParts = FmrPartitionSettings_.MaxParts;
+
+ TFmrTablesChunkPool chunkPool(inputTables, PartIdsForTables_, PartIdStats_, KeyColumns_);
+ if (auto error = chunkPool.GetError()) {
+ return TPartitionResult{.Error = *error};
+ }
+
+ ui64 maxWeight = FmrPartitionSettings_.MaxDataWeightPerPart;
+ if (FmrPartitionSettings_.AdjustDataWeightPerPartition) {
+ ui64 totalWeight = CollectFmrTotalWeight(inputTables);
+ ui64 estimatedParts = (totalWeight + maxWeight - 1) / maxWeight;
+ if (estimatedParts > maxParts && maxParts > 0) {
+ maxWeight = totalWeight / maxParts;
+ YQL_CLOG(INFO, FastMapReduce) << "AdjustDataWeightPerPartition (sorted): adjusted MaxDataWeightPerPart from "
+ << FmrPartitionSettings_.MaxDataWeightPerPart << " to " << maxWeight
+ << " (totalWeight=" << totalWeight << ", maxParts=" << maxParts << ")";
+ }
+ }
+
+ std::vector<TSlice> currentSlices;
+ ui64 currentWeight = 0;
+
+ try {
+ while (chunkPool.IsNotEmpty()) {
+ auto sliceResult = ReadSlice(chunkPool);
+ if (sliceResult.Error) {
+ return TPartitionResult{.Error = *sliceResult.Error};
+ }
+ if (!sliceResult.Slice.Defined()) {
+ break;
+ }
+ auto& slice = *sliceResult.Slice;
+ if (!currentSlices.empty() && currentWeight + slice.Weight > maxWeight) {
+ tasks.emplace_back(CreateTaskInputFromSlices(currentSlices, inputTables, false));
+ currentSlices.clear();
+ currentWeight = 0;
+ }
+ currentWeight += slice.Weight;
+ currentSlices.push_back(std::move(slice));
+ }
+ if (!currentSlices.empty()) {
+ tasks.emplace_back(CreateTaskInputFromSlices(currentSlices, inputTables, true));
+ }
+ } catch (...) {
+ return TPartitionResult{
+ .Error = TFmrError {
+ .Component = EFmrComponent::Coordinator,
+ .Reason = EFmrErrorReason::FallbackOperation,
+ .ErrorMessage = CurrentExceptionMessage()
+ }
+ };
+ }
+
+ if (maxParts < tasks.size()) {
+ return TPartitionResult{.Error = TFmrError{
+ .Component = EFmrComponent::Coordinator,
+ .Reason = EFmrErrorReason::RestartQuery,
+ .ErrorMessage = TStringBuilder() << "Partitioner produced " << tasks.size()
+ << " tasks which exceeds max_parts=" << maxParts
+ }};
+ }
+
+ return TPartitionResult{.TaskInputs = tasks};
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.h b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.h
new file mode 100644
index 00000000000..494bf4041b2
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_sorted_partitioner_base.h
@@ -0,0 +1,143 @@
+#pragma once
+
+#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
+#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h>
+#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_sorted_writer.h>
+#include <yt/yql/providers/yt/fmr/coordinator/partitioner/yql_yt_fmr_partitioner.h>
+#include <yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.h>
+
+namespace NYql::NFmr {
+
+struct TPartitionerFilterBoundary {
+ TFmrTableKeysBoundary FilterBoundary;
+ bool IsInclusive;
+};
+
+struct TChunkUnit {
+ TString TableId;
+ TString PartId;
+ ui64 ChunkIndex = 0;
+ ui64 DataWeight = 0;
+ TFmrTableKeysRange KeyRange;
+};
+
+class TChunkContainer {
+public:
+ TChunkContainer() = default;
+
+ void Push(TChunkUnit chunk);
+ bool IsEmpty() const;
+ const std::vector<TChunkUnit>& GetChunks() const;
+ void Clear();
+
+ const TFmrTableKeysRange& GetKeysRange() const;
+ void UpdateKeyRange(const TFmrTableKeysRange& KeyRange);
+
+private:
+ std::vector<TChunkUnit> Chunks_;
+ TFmrTableKeysRange KeysRange_;
+};
+
+class TFmrTablesChunkPool {
+public:
+ TFmrTablesChunkPool(
+ const std::vector<TFmrTableRef>& inputTables,
+ const std::unordered_map<TFmrTableId, std::vector<TString>>& partIdsForTables,
+ const std::unordered_map<TString, std::vector<TChunkStats>>& partIdStats,
+ const TSortingColumns& KeyColumns
+ );
+
+ void PutBack(TChunkUnit chunk);
+ void UpdateFilterBoundary(const TString& tableId, const TPartitionerFilterBoundary& filterBoundary);
+ bool IsNotEmpty() const;
+
+ const std::vector<TString>& GetTableOrder() const;
+ TMaybe<TChunkUnit> ReadNextChunk(const TString& tableId);
+
+ TMaybe<TPartitionerFilterBoundary> GetFilterBoundary(const TString& tableId) const;
+ TPartitionerFilterBoundary GetMaxFilterBoundary(const TString& tableId, const TPartitionerFilterBoundary& filterBoundary) const;
+ TFmrTableKeysRange GetEffectiveKeysRange(const TChunkUnit& chunk) const;
+
+ void SetError(TFmrError error);
+ TMaybe<TFmrError> GetError() const;
+
+private:
+ void InitTableInputs(const std::vector<TFmrTableRef>& inputTables);
+
+ std::vector<TString> TableOrder_;
+ std::unordered_map<TString, std::deque<TChunkUnit>> TableInputs_;
+ std::unordered_map<TString, TPartitionerFilterBoundary> FilterBoundaries_;
+ const std::unordered_map<TFmrTableId, std::vector<TString>>& PartIdsForTables_;
+ const std::unordered_map<TString, std::vector<TChunkStats>>& PartIdStats_;
+ const TSortingColumns& KeyColumns_;
+ TMaybe<TFmrError> Error_;
+};
+
+struct TSlice {
+ std::unordered_map<TString, std::vector<TChunkUnit>> ChunksByTable;
+ TFmrTableKeysRange RangeForRead;
+ std::unordered_map<TString, TPartitionerFilterBoundary> PerTableLeft;
+ ui64 Weight = 0;
+};
+
+struct TReadSliceResult {
+ TMaybe<TSlice> Slice;
+ TMaybe<TFmrError> Error;
+};
+
+class TSortedPartitionerBase: public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<TSortedPartitionerBase>;
+
+ virtual ~TSortedPartitionerBase() = default;
+
+ TSortedPartitionerBase(
+ const std::unordered_map<TFmrTableId, std::vector<TString>>& partIdsForTables,
+ const std::unordered_map<TString, std::vector<TChunkStats>>& partIdStats,
+ const TSortingColumns& keyColumns,
+ const TFmrPartitionerSettings& fmrPartitionSettings
+ );
+
+ TPartitionResult PartitionTablesIntoTasks(const std::vector<TOperationTableRef>& inputTables);
+
+protected:
+ ui64 CollectFmrTotalWeight(const std::vector<TFmrTableRef>& inputTables);
+
+ TReadSliceResult ReadSlice(TFmrTablesChunkPool& chunkPool);
+
+ virtual TTaskTableInputRef CreateTaskInputFromSlices(
+ const std::vector<TSlice>& slices,
+ const std::vector<TFmrTableRef>& inputTables,
+ bool isLastRange
+ ) = 0;
+
+ TPartitionResult PartitionFmrTables(
+ const std::vector<TFmrTableRef>& inputTables
+ );
+
+ virtual TFmrTableKeysRange GetReadRangeFromSlices(const std::vector<TSlice>& slices, bool isLastRange) = 0;
+
+ virtual void ChangeLeftKeyBoundaryIfNeeded(
+ TFmrTableKeysBoundary& leftKey,
+ bool& isLeftInclusive,
+ const TPartitionerFilterBoundary& filterBoundary
+ ) = 0;
+
+ virtual void ChangeRightKeyBoundaryIfNeeded(TFmrTableKeysBoundary& rightKey, const TFmrTableKeysBoundary& taskRangeLastKey) = 0;
+
+ TTaskTableInputRef CreateTaskInputFromSlicesImpl(
+ const std::vector<TSlice>& slices,
+ const std::vector<TFmrTableRef>& inputTables,
+ bool isLastRange
+ );
+
+ virtual void ExtendChunksPerTable(std::unordered_map<TString, std::vector<TChunkUnit>>& chunksByTable) = 0;
+
+private:
+ const std::unordered_map<TFmrTableId, std::vector<TString>> PartIdsForTables_;
+ const std::unordered_map<TString, std::vector<TChunkStats>> PartIdStats_;
+ TSortingColumns KeyColumns_;
+ TFmrPartitionerSettings FmrPartitionSettings_;
+};
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make
index 0f2075ad1b3..e22b59f1b73 100644
--- a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make
@@ -6,6 +6,8 @@ SRCS(
PEERDIR(
yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file
+ yt/yql/providers/yt/fmr/utils
+ yt/yql/providers/yt/fmr/utils/comparator
)
YQL_LAST_ABI_VERSION()
diff --git a/yt/yql/providers/yt/fmr/job/impl/ya.make b/yt/yql/providers/yt/fmr/job/impl/ya.make
index c2c9aef24c7..dd113355f5b 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ya.make
+++ b/yt/yql/providers/yt/fmr/job/impl/ya.make
@@ -6,6 +6,7 @@ SRCS(
yql_yt_job_impl.cpp
yql_yt_raw_table_queue_reader.cpp
yql_yt_raw_table_queue_writer.cpp
+ yql_yt_reduce_reader.cpp
yql_yt_sorted_merge_reader.cpp
yql_yt_table_data_service_reader.cpp
yql_yt_table_data_service_base_writer.cpp
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
index 8339056abe8..56fac989de3 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
@@ -133,6 +133,7 @@ public:
neededColumns,
columnGroups,
input.IsFirstRowInclusive,
+ input.IsLastRowInclusive,
input.FirstRowKeys,
input.LastRowKeys,
Settings_.FmrReaderSettings.ReadAheadChunks
@@ -240,6 +241,7 @@ public:
fmrInput->Columns,
fmrInput->SerializedColumnGroups,
fmrInput->IsFirstRowInclusive,
+ fmrInput->IsLastRowInclusive,
fmrInput->FirstRowKeys,
fmrInput->LastRowKeys,
Settings_.FmrReaderSettings.ReadAheadChunks
@@ -316,6 +318,7 @@ public:
fmrInput->Columns,
fmrInput->SerializedColumnGroups,
fmrInput->IsFirstRowInclusive,
+ fmrInput->IsLastRowInclusive,
fmrInput->FirstRowKeys,
fmrInput->LastRowKeys,
Settings_.FmrReaderSettings.ReadAheadChunks
@@ -341,6 +344,28 @@ public:
return HandleFmrJob(localSortJobFunc, ETaskType::LocalSort);
}
+ std::variant<TFmrError, TStatistics> Reduce(
+ const TReduceTaskParams& params,
+ const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections,
+ std::shared_ptr<std::atomic<bool>> /* cancelFlag */,
+ const TMaybe<TString>& jobEnvironmentDir,
+ const std::vector<TFileInfo>& jobFiles,
+ const std::vector<TYtResourceInfo>& jobYtResources,
+ const std::vector<TFmrResourceTaskInfo>& jobFmrResources
+ ) override {
+ auto reduceFunc = [&, this] () {
+ TFmrUserJobSettings userJobSettings = Settings_.FmrUserJobSettings;
+ TFmrUserJob reduceJob;
+ // deserialize reduce job and fill params
+ TStringStream serializedJobStateStream(params.SerializedReduceJobState);
+ reduceJob.Load(serializedJobStateStream);
+ FillReduceFmrJob(reduceJob, params, clusterConnections, Discovery_, VanillaInfo_, userJobSettings, YtJobService_);
+ reduceJob.SetTvmSettings(TvmSettings_);
+ return JobLauncher_->LaunchJob(reduceJob, jobEnvironmentDir, jobFiles, jobYtResources, jobFmrResources);
+ };
+ return HandleFmrJob(reduceFunc, ETaskType::Reduce);
+ }
+
private:
std::variant<TFmrError, TStatistics> HandleFmrJob(auto fmrJobFunc, ETaskType fmrJobType) {
TString errorLogMessage;
@@ -419,6 +444,8 @@ TJobResult RunJob(
return job->SortedMerge(taskParams, task->ClusterConnections, cancelFlag);
} else if constexpr (std::is_same_v<T, TLocalSortTaskParams>) {
return job->LocalSort(taskParams, task->ClusterConnections, cancelFlag);
+ } else if constexpr (std::is_same_v<T, TReduceTaskParams>) {
+ return job->Reduce(taskParams, task->ClusterConnections, cancelFlag, task->JobEnvironmentDir, task->Files, task->YtResources, task->FmrResources);
} else {
ythrow yexception() << "Unsupported task type";
}
@@ -451,7 +478,29 @@ void FillMapFmrJob(
mapJob.SetTaskFmrOutputTables(mapTaskParams.Output);
mapJob.SetClusterConnections(clusterConnections);
mapJob.SetYtJobService(jobService);
- mapJob.SetIsOrdered(mapTaskParams.IsOrdered);
+ mapJob.SetFmrJobType(mapTaskParams.MapJobType);
+}
+
+void FillReduceFmrJob(
+ TFmrUserJob& reduceJob,
+ const TReduceTaskParams& reduceTaskParams,
+ const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections,
+ ITableDataServiceDiscovery::TPtr discovery,
+ TMaybe<TVanillaInfo> vanillaInfo,
+ const TFmrUserJobSettings& userJobSettings,
+ IYtJobService::TPtr jobService
+) {
+ reduceJob.SetSettings(userJobSettings);
+ if (vanillaInfo.Defined()) {
+ reduceJob.SetVanillaInfo(*vanillaInfo);
+ }
+ reduceJob.SetTableDataServiceDiscovery(std::move(discovery));
+ reduceJob.SetTaskInputTables(reduceTaskParams.Input);
+ reduceJob.SetTaskFmrOutputTables(reduceTaskParams.Output);
+ reduceJob.SetClusterConnections(clusterConnections);
+ reduceJob.SetYtJobService(jobService);
+ reduceJob.SetFmrJobType(EFmrJobType::Reduce);
+ reduceJob.SetReduceOperationSpec(reduceTaskParams.ReduceOperationSpec);
}
TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task) {
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
index b72bb54ecc7..f99dfa7b552 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
@@ -69,4 +69,14 @@ void FillMapFmrJob(
IYtJobService::TPtr jobService
);
+void FillReduceFmrJob(
+ TFmrUserJob& reduceJob,
+ const TReduceTaskParams& reduceTaskParams,
+ const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections,
+ ITableDataServiceDiscovery::TPtr discovery,
+ TMaybe<TVanillaInfo> vanillaInfo,
+ const TFmrUserJobSettings& userJobSettings,
+ IYtJobService::TPtr jobService
+);
+
} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.cpp
new file mode 100644
index 00000000000..465a86fdef9
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.cpp
@@ -0,0 +1,133 @@
+#include "yql_yt_reduce_reader.h"
+
+#include <util/generic/array_ref.h>
+#include <yt/yql/providers/yt/fmr/utils/yql_yt_column_group_helpers.h>
+
+namespace NYql::NFmr {
+
+TReduceReader::TReduceReader(
+ const std::vector<IBlockIterator::TPtr>& inputs,
+ const TSortingColumns& reduceBy
+)
+ : TFmrIndexedBlockReader(inputs)
+ , ReduceBy_(reduceBy)
+{
+ for (ui32 i = 0; i < Sources_.size(); ++i) {
+ Sources_[i].EnsureRow();
+ if (Sources_[i].Valid()) {
+ Heap_.push_back(i);
+ }
+ }
+
+ HeapComparator_ = [this](ui32 a, ui32 b) {
+ return Sources_[b] < Sources_[a];
+ };
+
+ std::make_heap(Heap_.begin(), Heap_.end(), HeapComparator_);
+}
+
+TString TReduceReader::GetReduceKeyFromRow() {
+ auto& s = Sources_[ActiveSource_];
+ TStringBuf rowData = s.Block.GetRowBytes(s.RowIndex);
+ TParserFragmentListIndex parser(rowData);
+ parser.Parse();
+ auto rowMarkup = parser.MoveAllColumnsRows()[0];
+
+ TString result;
+ result.append(1, NYson::NDetail::BeginMapSymbol);
+ bool firstColumn = true;
+ THashSet<TStringBuf> columnsToKeep(ReduceBy_.Columns.begin(), ReduceBy_.Columns.end());
+
+ for (const auto& col : rowMarkup.Columns) {
+ if (!columnsToKeep.contains(col.Name)) {
+ continue;
+ }
+
+ if (!firstColumn) {
+ result.append(1, NYson::NDetail::KeyedItemSeparatorSymbol);
+ }
+ firstColumn = false;
+
+ const auto& kvRange = col.KeyValueRange;
+ result.append(rowData.data() + kvRange.StartOffset, kvRange.EndOffset - kvRange.StartOffset);
+ }
+ result.append(1, NYson::NDetail::EndMapSymbol);
+ result.append(1, NYson::NDetail::ListItemSeparatorSymbol);
+ return result;
+}
+
+
+size_t TReduceReader::DoRead(void* buf, size_t len) {
+ auto out = MakeArrayRef(static_cast<char*>(buf), len);
+ size_t total = 0;
+ size_t remaining = len;
+
+ while (remaining > 0) {
+ if (IsKeyMarkerActive_) {
+ auto keySwitchMarker = TStringBuf(KeySwitchMarker);
+ Y_ENSURE(KeyMarkerOffset_ <= keySwitchMarker.size(), "key marker offset out of bounds");
+ const size_t avail = keySwitchMarker.size() - KeyMarkerOffset_;
+ const size_t toCopy = std::min(avail, remaining);
+
+ std::copy_n(keySwitchMarker.begin() + KeyMarkerOffset_, toCopy, out.begin() + total);
+ remaining -= toCopy;
+ total += toCopy;
+ KeyMarkerOffset_ += toCopy;
+
+ if (KeyMarkerOffset_ == keySwitchMarker.size()) {
+ // finished writing marker, should write rows with fixed reduce key.
+ IsKeyMarkerActive_ = false;
+ KeyMarkerOffset_ = 0;
+ continue;
+ }
+ }
+
+ if (HasActive_) {
+ auto& s = Sources_[ActiveSource_];
+ TStringBuf row = s.Block.GetRowBytes(s.RowIndex);
+
+ auto reduceKey = GetReduceKeyFromRow();
+
+ if (reduceKey != CurrentReduceKey_) {
+ CurrentReduceKey_ = reduceKey;
+ IsKeyMarkerActive_ = true;
+ ActiveOffset_ = 0;
+ continue;
+ }
+
+ Y_ENSURE(ActiveOffset_ <= row.size(), "Active offset out of bounds");
+
+ const size_t avail = row.size() - ActiveOffset_;
+ const size_t toCopy = std::min(avail, remaining);
+
+ std::copy_n(row.begin() + ActiveOffset_, toCopy, out.begin() + total);
+ remaining -= toCopy;
+ total += toCopy;
+ ActiveOffset_ += toCopy;
+
+ if (ActiveOffset_ == row.size()) {
+ s.Next();
+ if (s.Valid()) {
+ Heap_.push_back(ActiveSource_);
+ std::push_heap(Heap_.begin(), Heap_.end(), HeapComparator_);
+ }
+ HasActive_ = false;
+ }
+
+ continue;
+ }
+
+ if (Heap_.empty()) {
+ break;
+ }
+
+ std::pop_heap(Heap_.begin(), Heap_.end(), HeapComparator_);
+ ActiveSource_ = Heap_.back();
+ Heap_.pop_back();
+ ActiveOffset_ = 0;
+ HasActive_ = true;
+ }
+ return total;
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.h
new file mode 100644
index 00000000000..81ac1ff1331
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include "yql_yt_fmr_indexed_block_reader.h"
+#include <yt/yql/providers/yt/fmr/test_tools/yson/yql_yt_yson_helpers.h>
+
+namespace NYql::NFmr {
+
+class TReduceReader final: public TFmrIndexedBlockReader {
+public:
+ TReduceReader(
+ const std::vector<IBlockIterator::TPtr>& inputs,
+ const TSortingColumns& reduceBy
+ );
+
+private:
+ size_t DoRead(void* buf, size_t len) final;
+
+ TString GetReduceKeyFromRow();
+
+private:
+ std::vector<ui32> Heap_; // SortBy as comparator.
+ std::function<bool(ui32, ui32)> HeapComparator_;
+ bool HasActive_ = false;
+ ui64 ActiveSource_ = 0;
+ TSortingColumns ReduceBy_;
+ TString CurrentReduceKey_;
+
+ const TString KeySwitchMarker = GetBinaryYson("<\"key_switch\"=%true;>#;\n");
+ bool IsKeyMarkerActive_ = false;
+ ui64 KeyMarkerOffset_ = 0;
+};
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h
index bab6367a676..54e5d54bcf8 100644
--- a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h
+++ b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h
@@ -31,6 +31,16 @@ public:
virtual std::variant<TFmrError, TStatistics> SortedUpload(const TSortedUploadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}, std::shared_ptr<std::atomic<bool>> cancelFlag = nullptr) = 0;
virtual std::variant<TFmrError, TStatistics> LocalSort(const TLocalSortTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}, std::shared_ptr<std::atomic<bool>> cancelFlag = nullptr) = 0;
+
+ virtual std::variant<TFmrError, TStatistics> Reduce(
+ const TReduceTaskParams& params,
+ const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {},
+ std::shared_ptr<std::atomic<bool>> cancelFlag = nullptr,
+ const TMaybe<TString>& jobEnvironmentDir = Nothing(),
+ const std::vector<TFileInfo>& jobFiles = {},
+ const std::vector<TYtResourceInfo>& jobYtResources = {},
+ const std::vector<TFmrResourceTaskInfo>& jobFmrResources = {}
+ ) = 0;
};
} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make
index 3beaf40ddfc..600c60efa05 100644
--- a/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make
@@ -7,6 +7,8 @@ SRCS(
PEERDIR(
yt/yql/providers/yt/fmr/job_factory/interface
yt/yql/providers/yt/fmr/job_factory/impl
+ yt/yql/providers/yt/fmr/utils/comparator
+ yt/yql/providers/yt/fmr/utils
)
YQL_LAST_ABI_VERSION()
diff --git a/yt/yql/providers/yt/fmr/job_launcher/yql_yt_job_launcher.cpp b/yt/yql/providers/yt/fmr/job_launcher/yql_yt_job_launcher.cpp
index 47b152e39c0..1e7a3c4d959 100644
--- a/yt/yql/providers/yt/fmr/job_launcher/yql_yt_job_launcher.cpp
+++ b/yt/yql/providers/yt/fmr/job_launcher/yql_yt_job_launcher.cpp
@@ -68,8 +68,8 @@ std::variant<TFmrError, TStatistics> TFmrUserJobLauncher::LaunchJob(
InitializeJobEnvironment(*jobEnvironmentDir, jobFiles, jobYtResources, jobFmrResources);
- TFile jobStateFile(jobtmpDir.Child("fmrjob.bin"), CreateNew | RdWr);
- TFile mapResultStatsFile(jobtmpDir.Child("stats.bin"), CreateNew | RdWr);
+ TFile jobStateFile(jobtmpDir.Child("fmrjob.bin"), CreateAlways | RdWr);
+ TFile jobResultStatsFile(jobtmpDir.Child("stats.bin"), CreateAlways | RdWr);
if (!TableDataServiceDiscoveryFilePath_.empty()) {
TString tmpDirTableDataServiceDiscoveryPath = "tds_discovery.txt";
@@ -85,7 +85,7 @@ std::variant<TFmrError, TStatistics> TFmrUserJobLauncher::LaunchJob(
job.Save(jobStateFileOutputStream);
jobStateFileOutputStream.Flush();
- // execute map in separate process
+ // execute fmrJob in separate process
TShellCommandOptions opts;
TStringStream fmrJobOutputStream, fmrJobErrorStream;
opts.SetUseShell(false).SetDetachSession(false).SetOutputStream(&fmrJobOutputStream).SetErrorStream(&fmrJobErrorStream);
@@ -107,7 +107,7 @@ std::variant<TFmrError, TStatistics> TFmrUserJobLauncher::LaunchJob(
YQL_CLOG(DEBUG, FastMapReduce) << "Process cerr: " << fmrJobErrorStream.Str();
- TFileInput statsStream(mapResultStatsFile);
+ TFileInput statsStream(jobResultStatsFile);
auto serializedProtoStats = statsStream.ReadAll();
NProto::TStatistics protoStats;
protoStats.ParseFromStringOrThrow(serializedProtoStats);
diff --git a/yt/yql/providers/yt/fmr/process/ya.make b/yt/yql/providers/yt/fmr/process/ya.make
index 92a55453f9a..b05d7bb930e 100644
--- a/yt/yql/providers/yt/fmr/process/ya.make
+++ b/yt/yql/providers/yt/fmr/process/ya.make
@@ -15,6 +15,7 @@ PEERDIR(
yt/yql/providers/yt/fmr/yt_job_service/file
yt/yql/providers/yt/fmr/yt_job_service/impl
yt/yql/providers/yt/fmr/utils
+ yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl
yt/yql/providers/yt/job
)
diff --git a/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.cpp b/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.cpp
index e452acb5997..6fe4ccb1fde 100644
--- a/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.cpp
+++ b/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.cpp
@@ -1,10 +1,12 @@
#include "yql_yt_job_fmr.h"
#include <util/thread/pool.h>
#include <yt/yql/providers/yt/common/yql_configuration.h>
+#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_reduce_reader.h>
#include <yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h>
#include <yt/yql/providers/yt/fmr/tvm/impl/yql_yt_fmr_tvm_impl.h>
#include <yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h>
#include <yt/yql/providers/yt/fmr/utils/yql_yt_table_input_streams.h>
+#include <yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.h>
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_sorted_writer.h>
#include <yt/yql/providers/yt/fmr/vanilla/tds_discovery/yql_yt_vanilla_tds_discovery.h>
#include <yql/essentials/utils/log/log.h>
@@ -24,10 +26,11 @@ void TFmrUserJob::Save(IOutputStream& s) const {
ClusterConnections_,
TableDataServiceDiscoveryFilePath_,
YtJobServiceType_,
- IsOrdered_,
+ FmrJobType_,
Settings_,
TvmSettings_,
- VanillaInfo_
+ VanillaInfo_,
+ ReduceOperationSpec_
);
}
@@ -39,10 +42,11 @@ void TFmrUserJob::Load(IInputStream& s) {
ClusterConnections_,
TableDataServiceDiscoveryFilePath_,
YtJobServiceType_,
- IsOrdered_,
+ FmrJobType_,
Settings_,
TvmSettings_,
- VanillaInfo_
+ VanillaInfo_,
+ ReduceOperationSpec_
);
}
@@ -132,6 +136,45 @@ void TFmrUserJob::FillQueueFromInputTablesUnordered() {
}
}
+void TFmrUserJob::FillQueueFromReduceInput() {
+ std::vector<IBlockIterator::TPtr> blockIterators;
+ YQL_ENSURE(ReduceOperationSpec_.Defined());
+ auto reduceBy = ReduceOperationSpec_->ReduceBy;
+ auto sortBy = ReduceOperationSpec_->SortBy;
+ for (const auto& inputTableRef : InputTables_.Inputs) {
+ if (auto fmrInput = std::get_if<TFmrTableInputRef>(&inputTableRef)) {
+ blockIterators.push_back(MakeIntrusive<TTableDataServiceBlockIterator>(
+ fmrInput->TableId,
+ fmrInput->TableRanges,
+ TableDataService_,
+ sortBy.Columns,
+ sortBy.SortOrders,
+ fmrInput->Columns,
+ fmrInput->SerializedColumnGroups,
+ fmrInput->IsFirstRowInclusive,
+ fmrInput->IsLastRowInclusive,
+ fmrInput->FirstRowKeys,
+ fmrInput->LastRowKeys
+ ));
+ } else {
+ ythrow TFmrNonRetryableJobException() << "YtTables unsupported inside Reduce task for now";
+ }
+ }
+ ThreadPool_->SafeAddFunc([this, blockIterators, reduceBy]() mutable {
+ try {
+ NYT::TRawTableReaderPtr reduceReader = MakeIntrusive<TReduceReader>(blockIterators, reduceBy);
+ auto queueTableWriter = MakeIntrusive<TFmrRawTableQueueWriter>(UnionInputTablesQueue_);
+ ParseRecords(reduceReader, queueTableWriter, 1, 1000000, CancelFlag_);
+ queueTableWriter->Flush();
+ for (ui64 i = 0; i < InputTables_.Inputs.size(); ++i) {
+ UnionInputTablesQueue_->NotifyInputFinished(i);
+ }
+ } catch (...) {
+ UnionInputTablesQueue_->SetException(CurrentExceptionMessage());
+ }
+ });
+}
+
void TFmrUserJob::InitializeFmrUserJob() {
if (!YtJobService_) {
YQL_ENSURE(YtJobServiceType_ == "native" || YtJobServiceType_ == "file");
@@ -214,10 +257,12 @@ TStatistics TFmrUserJob::GetStatistics(const TFmrUserJobOptions& options) {
TStatistics TFmrUserJob::DoFmrJob(const TFmrUserJobOptions& options) {
InitializeFmrUserJob();
- if (IsOrdered_) {
+ if (FmrJobType_ == EFmrJobType::OrderedMap) {
FillQueueFromInputTablesOrdered();
- } else {
+ } else if (FmrJobType_ == EFmrJobType::Map) {
FillQueueFromInputTablesUnordered();
+ } else if (FmrJobType_ == EFmrJobType::Reduce) {
+ FillQueueFromReduceInput();
}
TYqlUserJobBase::Do();
return GetStatistics(options);
diff --git a/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h b/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h
index 1956f99b0cd..c4c60c1ece4 100644
--- a/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h
+++ b/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h
@@ -40,7 +40,6 @@ struct TFmrUserJobOptions {
bool WriteStatsToFile = false;
};
-
class TFmrUserJob: public TYqlUserJobBase {
public:
TFmrUserJob();
@@ -88,8 +87,8 @@ public:
VanillaInfo_ = std::move(info);
}
- void SetIsOrdered(bool isOrdered) {
- IsOrdered_ = isOrdered;
+ void SetFmrJobType(EFmrJobType jobType) {
+ FmrJobType_ = jobType;
}
void SetSettings(const TFmrUserJobSettings& settings) {
@@ -100,6 +99,10 @@ public:
TvmSettings_ = tvmSettings;
}
+ void SetReduceOperationSpec(const TReduceOperationSpec& reduceOperationSpec) {
+ ReduceOperationSpec_ = reduceOperationSpec;
+ }
+
void Save(IOutputStream& s) const override;
void Load(IInputStream& s) override;
@@ -116,6 +119,7 @@ private:
void FillQueueFromSingleInputTable(ui64 tableIndex);
void FillQueueFromInputTablesUnordered();
void FillQueueFromInputTablesOrdered();
+ void FillQueueFromReduceInput();
void InitializeFmrUserJob();
@@ -127,10 +131,11 @@ private:
std::unordered_map<TFmrTableId, TClusterConnection> ClusterConnections_;
TString TableDataServiceDiscoveryFilePath_;
TString YtJobServiceType_; // file or native
- bool IsOrdered_ = false;
+ EFmrJobType FmrJobType_ = EFmrJobType::Map;
TFmrUserJobSettings Settings_ = TFmrUserJobSettings();
TMaybe<TFmrTvmJobSettings> TvmSettings_ = Nothing();
TMaybe<TVanillaInfo> VanillaInfo_ = Nothing();
+ TMaybe<TReduceOperationSpec> ReduceOperationSpec_;
// End of serializable part
// Non-serialized: set only for in-process execution via SetTableDataServiceDiscovery.
diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto
index d670ad03e3f..9f4712adcb5 100644
--- a/yt/yql/providers/yt/fmr/proto/request_options.proto
+++ b/yt/yql/providers/yt/fmr/proto/request_options.proto
@@ -107,8 +107,9 @@ message TFmrTableInputRef {
repeated string Columns = 3;
optional string ColumnGroups = 4;
optional bool IsFirstRowInclusive = 5;
- optional bytes FirstRowKeys = 6;
- optional bytes LastRowKeys = 7;
+ optional bool IsLastRowInclusive = 6;
+ optional bytes FirstRowKeys = 7;
+ optional bytes LastRowKeys = 8;
}
message TSortingColumns {
@@ -233,18 +234,49 @@ message TMergeTaskParams {
TFmrTableOutputRef Output = 2;
}
+enum EFmrJobType {
+ MAP = 0;
+ ORDERED_MAP = 1;
+ REDUCE = 2;
+}
+
+enum EReduceType {
+ SORTED_REDUCE = 0;
+ JOIN_REDUCE = 1;
+}
+
message TMapOperationParams {
repeated TOperationTableRef Input = 1;
repeated TFmrTableRef Output = 2;
bytes SerializedMapJobState = 3;
- bool IsOrdered = 4;
+ EFmrJobType MapJobType = 4;
}
message TMapTaskParams {
TTaskTableInputRef Input = 1;
repeated TFmrTableOutputRef Output = 2;
bytes SerializedMapJobState = 3;
- bool IsOrdered = 4;
+ EFmrJobType MapJobType = 4;
+}
+
+message TReduceOperationSpec {
+ TSortingColumns ReduceBy = 1;
+ TSortingColumns SortBy = 2;
+ EReduceType ReduceType = 3;
+}
+
+message TReduceOperationParams {
+ repeated TOperationTableRef Input = 1;
+ repeated TFmrTableRef Output = 2;
+ bytes SerializedReduceJobState = 3;
+ TReduceOperationSpec ReduceOperationSpec = 4;
+}
+
+message TReduceTaskParams {
+ TTaskTableInputRef Input = 1;
+ repeated TFmrTableOutputRef Output = 2;
+ bytes SerializedReduceJobState = 3;
+ TReduceOperationSpec ReduceOperationSpec = 4;
}
message TSortOperationParams {
@@ -266,6 +298,7 @@ message TOperationParams {
TSortedUploadOperationParams SortedUploadOperationParams = 5;
TSortedMergeOperationParams SortedMergeOperationParams = 6;
TSortOperationParams SortOperationParams = 7;
+ TReduceOperationParams ReduceOperationParams = 8;
}
}
@@ -278,6 +311,7 @@ message TTaskParams {
TSortedUploadTaskParams SortedUploadTaskParams = 5;
TSortedMergeTaskParams SortedMergeTaskParams = 6;
TLocalSortTaskParams LocalSortTaskParams = 7;
+ TReduceTaskParams ReduceTaskParams = 8;
}
}
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
index 43cf0963546..a2311538ac6 100644
--- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
@@ -183,6 +183,9 @@ NProto::TFmrTableInputRef FmrTableInputRefToProto(const TFmrTableInputRef& fmrTa
if (fmrTableInputRef.IsFirstRowInclusive) {
protoFmrTableInputRef.SetIsFirstRowInclusive(*fmrTableInputRef.IsFirstRowInclusive);
}
+ if (fmrTableInputRef.IsLastRowInclusive) {
+ protoFmrTableInputRef.SetIsLastRowInclusive(*fmrTableInputRef.IsLastRowInclusive);
+ }
if (fmrTableInputRef.FirstRowKeys) {
protoFmrTableInputRef.SetFirstRowKeys(*fmrTableInputRef.FirstRowKeys);
}
@@ -208,6 +211,9 @@ TFmrTableInputRef FmrTableInputRefFromProto(const NProto::TFmrTableInputRef& pro
fmrTableInputRef.IsFirstRowInclusive = protoFmrTableInputRef.HasIsFirstRowInclusive()
? TMaybe<bool>(protoFmrTableInputRef.GetIsFirstRowInclusive())
: Nothing();
+ fmrTableInputRef.IsLastRowInclusive = protoFmrTableInputRef.HasIsLastRowInclusive()
+ ? TMaybe<bool>(protoFmrTableInputRef.GetIsLastRowInclusive())
+ : Nothing();
fmrTableInputRef.FirstRowKeys = protoFmrTableInputRef.HasFirstRowKeys() ? TMaybe<TString>(protoFmrTableInputRef.GetFirstRowKeys()) : Nothing();
fmrTableInputRef.LastRowKeys = protoFmrTableInputRef.HasLastRowKeys() ? TMaybe<TString>(protoFmrTableInputRef.GetLastRowKeys()) : Nothing();
return fmrTableInputRef;
@@ -594,7 +600,7 @@ NProto::TMapOperationParams MapOperationParamsToProto(const TMapOperationParams&
protoMapOperationParams.AddOutput()->Swap(&protoFmrTableRef);
}
protoMapOperationParams.SetSerializedMapJobState(mapOperationParams.SerializedMapJobState);
- protoMapOperationParams.SetIsOrdered(mapOperationParams.IsOrdered);
+ protoMapOperationParams.SetMapJobType(static_cast<NProto::EFmrJobType>(mapOperationParams.MapJobType));
return protoMapOperationParams;
}
@@ -607,7 +613,7 @@ TMapOperationParams MapOperationParamsFromProto(const NProto::TMapOperationParam
for (auto& protoFmrTableRef: protoMapOperationParams.GetOutput()) {
outputTables.emplace_back(FmrTableRefFromProto(protoFmrTableRef));
}
- return TMapOperationParams{.Input = inputTables, .Output = outputTables, .SerializedMapJobState = protoMapOperationParams.GetSerializedMapJobState(), .IsOrdered = protoMapOperationParams.GetIsOrdered()};
+ return TMapOperationParams{.Input = inputTables, .Output = outputTables, .SerializedMapJobState = protoMapOperationParams.GetSerializedMapJobState(), .MapJobType = static_cast<EFmrJobType>(protoMapOperationParams.GetMapJobType())};
}
NProto::TMapTaskParams MapTaskParamsToProto(const TMapTaskParams& mapTaskParams) {
@@ -619,7 +625,7 @@ NProto::TMapTaskParams MapTaskParamsToProto(const TMapTaskParams& mapTaskParams)
protoMapTaskParams.AddOutput()->Swap(&protoFmrTableOutputRef);
}
protoMapTaskParams.SetSerializedMapJobState(mapTaskParams.SerializedMapJobState);
- protoMapTaskParams.SetIsOrdered(mapTaskParams.IsOrdered);
+ protoMapTaskParams.SetMapJobType(static_cast<NProto::EFmrJobType>(mapTaskParams.MapJobType));
return protoMapTaskParams;
}
@@ -632,10 +638,88 @@ TMapTaskParams MapTaskParamsFromProto(const NProto::TMapTaskParams& protoMapTask
}
mapTaskParams.Output = outputTables;
mapTaskParams.SerializedMapJobState = protoMapTaskParams.GetSerializedMapJobState();
- mapTaskParams.IsOrdered = protoMapTaskParams.GetIsOrdered();
+ mapTaskParams.MapJobType = static_cast<EFmrJobType>(protoMapTaskParams.GetMapJobType());
return mapTaskParams;
}
+NProto::TReduceOperationSpec ReduceOperationSpecToProto(const TReduceOperationSpec& reduceOperationSpec) {
+ NProto::TReduceOperationSpec protoReduceOperationSpec;
+ auto protoReduceBy = SortingColumnsToProto(reduceOperationSpec.ReduceBy);
+ protoReduceOperationSpec.MutableReduceBy()->Swap(&protoReduceBy);
+ auto protoSortBy = SortingColumnsToProto(reduceOperationSpec.SortBy);
+ protoReduceOperationSpec.MutableSortBy()->Swap(&protoSortBy);
+ protoReduceOperationSpec.SetReduceType(static_cast<NProto::EReduceType>(reduceOperationSpec.ReduceType));
+ return protoReduceOperationSpec;
+}
+
+TReduceOperationSpec ReduceOperationSpecFromProto (const NProto::TReduceOperationSpec& protoReduceOperationSpec) {
+ TReduceOperationSpec reduceOperationSpec;
+ reduceOperationSpec.ReduceBy = SortingColumnsFromProto(protoReduceOperationSpec.GetReduceBy());
+ reduceOperationSpec.SortBy = SortingColumnsFromProto(protoReduceOperationSpec.GetSortBy());
+ reduceOperationSpec.ReduceType = static_cast<EReduceType>(protoReduceOperationSpec.GetReduceType());
+ return reduceOperationSpec;
+}
+
+NProto::TReduceOperationParams ReduceOperationParamsToProto(const TReduceOperationParams& reduceOperationParams) {
+ NProto::TReduceOperationParams protoReduceOperationParams;
+ for (auto& operationTableRef: reduceOperationParams.Input) {
+ auto protoOperationTableRef = OperationTableRefToProto(operationTableRef);
+ protoReduceOperationParams.AddInput()->Swap(&protoOperationTableRef);
+ }
+ for (auto& fmrTableRef: reduceOperationParams.Output) {
+ auto protoFmrTableRef = FmrTableRefToProto(fmrTableRef);
+ protoReduceOperationParams.AddOutput()->Swap(&protoFmrTableRef);
+ }
+ protoReduceOperationParams.SetSerializedReduceJobState(reduceOperationParams.SerializedReduceJobState);
+ auto protoReduceOperationSpec = ReduceOperationSpecToProto(reduceOperationParams.ReduceOperationSpec);
+ protoReduceOperationParams.MutableReduceOperationSpec()->Swap(&protoReduceOperationSpec);
+ return protoReduceOperationParams;
+}
+
+TReduceOperationParams ReduceOperationParamsFromProto(const NProto::TReduceOperationParams& protoReduceOperationParams) {
+ std::vector<TOperationTableRef> inputTables;
+ std::vector<TFmrTableRef> outputTables;
+ for (auto& protoOperationTableRef: protoReduceOperationParams.GetInput()) {
+ inputTables.emplace_back(OperationTableRefFromProto(protoOperationTableRef));
+ }
+ for (auto& protoFmrTableRef: protoReduceOperationParams.GetOutput()) {
+ outputTables.emplace_back(FmrTableRefFromProto(protoFmrTableRef));
+ }
+ return TReduceOperationParams{
+ .Input = inputTables,
+ .Output = outputTables,
+ .SerializedReduceJobState = protoReduceOperationParams.GetSerializedReduceJobState(),
+ .ReduceOperationSpec = ReduceOperationSpecFromProto(protoReduceOperationParams.GetReduceOperationSpec())
+ };
+}
+
+NProto::TReduceTaskParams ReduceTaskParamsToProto(const TReduceTaskParams& reduceTaskParams) {
+ NProto::TReduceTaskParams protoReduceTaskParams;
+ auto protoTaskTableInputRef = TaskTableInputRefToProto(reduceTaskParams.Input);
+ protoReduceTaskParams.MutableInput()->Swap(&protoTaskTableInputRef);
+ for (auto& fmrTableOutputRef: reduceTaskParams.Output) {
+ auto protoFmrTableOutputRef = FmrTableOutputRefToProto(fmrTableOutputRef);
+ protoReduceTaskParams.AddOutput()->Swap(&protoFmrTableOutputRef);
+ }
+ protoReduceTaskParams.SetSerializedReduceJobState(reduceTaskParams.SerializedReduceJobState);
+ auto protoReduceOperationSpec = ReduceOperationSpecToProto(reduceTaskParams.ReduceOperationSpec);
+ protoReduceTaskParams.MutableReduceOperationSpec()->Swap(&protoReduceOperationSpec);
+ return protoReduceTaskParams;
+}
+
+TReduceTaskParams ReduceTaskParamsFromProto(const NProto::TReduceTaskParams& protoReduceTaskParams) {
+ TReduceTaskParams reduceTaskParams;
+ reduceTaskParams.Input = TaskTableInputRefFromProto(protoReduceTaskParams.GetInput());
+ std::vector<TFmrTableOutputRef> outputTables;
+ for (auto& protoFmrTableOutputRef: protoReduceTaskParams.GetOutput()) {
+ outputTables.emplace_back(FmrTableOutputRefFromProto(protoFmrTableOutputRef));
+ }
+ reduceTaskParams.Output = outputTables;
+ reduceTaskParams.SerializedReduceJobState = protoReduceTaskParams.GetSerializedReduceJobState();
+ reduceTaskParams.ReduceOperationSpec = ReduceOperationSpecFromProto(protoReduceTaskParams.GetReduceOperationSpec());
+ return reduceTaskParams;
+}
+
NProto::TSortOperationParams SortOperationParamsToProto(const TSortOperationParams& sortOperationParams) {
NProto::TSortOperationParams protoSortOperationParams;
for (size_t i = 0; i < sortOperationParams.Input.size(); ++i) {
@@ -699,6 +783,9 @@ NProto::TOperationParams OperationParamsToProto(const TOperationParams& operatio
} else if (auto* SortOperationParamsPtr = std::get_if<TSortOperationParams>(&operationParams)) {
NProto::TSortOperationParams protoSortOperationParams = SortOperationParamsToProto(*SortOperationParamsPtr);
protoOperationParams.MutableSortOperationParams()->Swap(&protoSortOperationParams);
+ } else if (auto* ReduceOperationParamsPtr = std::get_if<TReduceOperationParams>(&operationParams)) {
+ NProto::TReduceOperationParams protoReduceOperationParams = ReduceOperationParamsToProto(*ReduceOperationParamsPtr);
+ protoOperationParams.MutableReduceOperationParams()->Swap(&protoReduceOperationParams);
}
return protoOperationParams;
}
@@ -718,6 +805,8 @@ TOperationParams OperationParamsFromProto(const NProto::TOperationParams& protoO
return SortedMergeOperationParamsFromProto(protoOperationParams.GetSortedMergeOperationParams());
} else if (protoOperationParams.HasSortOperationParams()) {
return SortOperationParamsFromProto(protoOperationParams.GetSortOperationParams());
+ } else if (protoOperationParams.HasReduceOperationParams()) {
+ return ReduceOperationParamsFromProto(protoOperationParams.GetReduceOperationParams());
}
return TOperationParams();
}
@@ -771,6 +860,9 @@ NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams) {
} else if (auto* LocalSortTaskParamsPtr = std::get_if<TLocalSortTaskParams>(&taskParams)) {
NProto::TLocalSortTaskParams protoLocalSortTaskParams = LocalSortTaskParamsToProto(*LocalSortTaskParamsPtr);
protoTaskParams.MutableLocalSortTaskParams()->Swap(&protoLocalSortTaskParams);
+ } else if (auto* reduceTaskParamsPtr = std::get_if<TReduceTaskParams>(&taskParams)) {
+ NProto::TReduceTaskParams protoReduceTaskParams = ReduceTaskParamsToProto(*reduceTaskParamsPtr);
+ protoTaskParams.MutableReduceTaskParams()->Swap(&protoReduceTaskParams);
}
return protoTaskParams;
}
@@ -791,6 +883,8 @@ TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) {
taskParams = SortedMergeTaskParamsFromProto(protoTaskParams.GetSortedMergeTaskParams());
} else if (protoTaskParams.HasLocalSortTaskParams()) {
taskParams = LocalSortTaskParamsFromProto(protoTaskParams.GetLocalSortTaskParams());
+ } else if (protoTaskParams.HasReduceTaskParams()) {
+ taskParams = ReduceTaskParamsFromProto(protoTaskParams.GetReduceTaskParams());
}
return taskParams;
}
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h
index 85794aa467b..9d0257fa923 100644
--- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h
@@ -109,6 +109,18 @@ NProto::TMapTaskParams MapTaskParamsToProto(const TMapTaskParams& mapTaskParams)
TMapTaskParams MapTaskParamsFromProto(const NProto::TMapTaskParams& protoMapTaskParams);
+NProto::TReduceOperationSpec ReduceOperationSpecToProto(const TReduceOperationSpec& reduceOperationSpec);
+
+TReduceOperationSpec ReduceOperationSpecFromProto (const NProto::TReduceOperationSpec& protoReduceOperationSpec);
+
+NProto::TReduceOperationParams ReduceOperationParamsToProto(const TReduceOperationParams& reduceOperationParams);
+
+TReduceOperationParams ReduceOperationParamsFromProto(const NProto::TReduceOperationParams& protoReduceOperationParams);
+
+NProto::TReduceTaskParams ReduceTaskParamsToProto(const TReduceTaskParams& reduceTaskParams);
+
+TReduceTaskParams ReduceTaskParamsFromProto(const NProto::TReduceTaskParams& protoReduceTaskParams);
+
NProto::TSortOperationParams SortOperationParamsToProto(const TSortOperationParams& sortOperationParams);
TSortOperationParams SortOperationParamsFromProto(const NProto::TSortOperationParams& protoSortOperationParams);
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
index 10c638d1693..dd462570908 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
@@ -166,6 +166,7 @@ void TFmrTableInputRef::Save(IOutputStream* buffer) const {
Columns,
SerializedColumnGroups,
IsFirstRowInclusive,
+ IsLastRowInclusive,
FirstRowKeys,
LastRowKeys
);
@@ -179,6 +180,7 @@ void TFmrTableInputRef::Load(IInputStream* buffer) {
Columns,
SerializedColumnGroups,
IsFirstRowInclusive,
+ IsLastRowInclusive,
FirstRowKeys,
LastRowKeys
);
@@ -287,6 +289,24 @@ NYT::TRichYPath DeserializeRichPath(const TString& serializedRichPath) {
return richPath;
}
+void TReduceOperationSpec::Save(IOutputStream* buffer) const {
+ ::SaveMany(
+ buffer,
+ ReduceBy,
+ SortBy,
+ ReduceType
+ );
+}
+
+void TReduceOperationSpec::Load(IInputStream* buffer) {
+ ::LoadMany(
+ buffer,
+ ReduceBy,
+ SortBy,
+ ReduceType
+ );
+}
+
} // namespace NYql::NFmr
//////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
index a3976cba323..b1f37585424 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
@@ -39,7 +39,8 @@ enum EOperationType {
Map = 4,
SortedUpload = 5,
SortedMerge = 6,
- Sort = 7
+ Sort = 7,
+ Reduce = 8
};
enum class ETaskType {
@@ -50,7 +51,8 @@ enum class ETaskType {
Map = 4,
SortedUpload = 5,
SortedMerge = 6,
- LocalSort = 7
+ LocalSort = 7,
+ Reduce = 8
};
enum class EFmrComponent {
@@ -202,6 +204,7 @@ struct TFmrTableInputRef {
TString SerializedColumnGroups = TString();
TMaybe<bool> IsFirstRowInclusive;
+ TMaybe<bool> IsLastRowInclusive;
TMaybe<TString> FirstRowKeys; // Binary YSON MAP
TMaybe<TString> LastRowKeys; // Binary YSON MAP
@@ -416,18 +419,24 @@ struct TSortedMergeTaskParams {
TFmrTableOutputRef Output;
};
+enum class EFmrJobType {
+ Map,
+ OrderedMap,
+ Reduce
+};
+
struct TMapOperationParams {
std::vector<TOperationTableRef> Input;
std::vector<TFmrTableRef> Output;
TString SerializedMapJobState;
- bool IsOrdered = false;
+ EFmrJobType MapJobType;
};
struct TMapTaskParams {
TTaskTableInputRef Input;
std::vector<TFmrTableOutputRef> Output;
TString SerializedMapJobState;
- bool IsOrdered;
+ EFmrJobType MapJobType;
};
struct TSortOperationParams {
@@ -440,10 +449,38 @@ struct TLocalSortTaskParams {
TFmrTableOutputRef Output;
};
+enum EReduceType {
+ SortedReduce,
+ JoinReduce
+};
+
+struct TReduceOperationSpec {
+ TSortingColumns ReduceBy;
+ TSortingColumns SortBy;
+ EReduceType ReduceType;
+
+ void Save(IOutputStream* buffer) const;
+ void Load(IInputStream* buffer);
+};
+
+struct TReduceOperationParams {
+ std::vector<TOperationTableRef> Input;
+ std::vector<TFmrTableRef> Output;
+ TString SerializedReduceJobState;
+ TReduceOperationSpec ReduceOperationSpec;
+};
+
+struct TReduceTaskParams {
+ //std::vector<TFmrTableInputRef> Input; // all reduce inputs should be in fmr.
+ TTaskTableInputRef Input;
+ std::vector<TFmrTableOutputRef> Output;
+ TString SerializedReduceJobState;
+ TReduceOperationSpec ReduceOperationSpec;
+};
-using TOperationParams = std::variant<TUploadOperationParams, TDownloadOperationParams, TMergeOperationParams, TSortedMergeOperationParams, TMapOperationParams, TSortedUploadOperationParams, TSortOperationParams>;
+using TOperationParams = std::variant<TUploadOperationParams, TDownloadOperationParams, TMergeOperationParams, TSortedMergeOperationParams, TMapOperationParams, TSortedUploadOperationParams, TSortOperationParams, TReduceOperationParams>;
-using TTaskParams = std::variant<TUploadTaskParams, TDownloadTaskParams, TMergeTaskParams, TSortedMergeTaskParams, TMapTaskParams, TSortedUploadTaskParams, TLocalSortTaskParams>;
+using TTaskParams = std::variant<TUploadTaskParams, TDownloadTaskParams, TMergeTaskParams, TSortedMergeTaskParams, TMapTaskParams, TSortedUploadTaskParams, TLocalSortTaskParams, TReduceTaskParams>;
struct TFileInfo {
TString LocalPath; // Path to local file, filled in worker.
diff --git a/yt/yql/providers/yt/fmr/test_tools/sorted_partitioner/yql_yt_sorted_partitioner_test_tools.cpp b/yt/yql/providers/yt/fmr/test_tools/sorted_partitioner/yql_yt_sorted_partitioner_test_tools.cpp
index a125458adfd..1c3f899a088 100644
--- a/yt/yql/providers/yt/fmr/test_tools/sorted_partitioner/yql_yt_sorted_partitioner_test_tools.cpp
+++ b/yt/yql/providers/yt/fmr/test_tools/sorted_partitioner/yql_yt_sorted_partitioner_test_tools.cpp
@@ -197,6 +197,7 @@ TFingerprintCounts CountTaskFingerprintsFromFiles(
TYtBlockIteratorSettings{},
TVector<ESortOrder>(keyColumns.size(), ESortOrder::Ascending),
fmr.IsFirstRowInclusive,
+ fmr.IsLastRowInclusive,
fmr.FirstRowKeys,
fmr.LastRowKeys
));
diff --git a/yt/yql/providers/yt/fmr/utils/comparator/ya.make b/yt/yql/providers/yt/fmr/utils/comparator/ya.make
index 335fcd32530..f253af0d001 100644
--- a/yt/yql/providers/yt/fmr/utils/comparator/ya.make
+++ b/yt/yql/providers/yt/fmr/utils/comparator/ya.make
@@ -12,6 +12,9 @@ PEERDIR(
yql/essentials/utils
)
- YQL_LAST_ABI_VERSION()
+YQL_LAST_ABI_VERSION()
+
+GENERATE_ENUM_SERIALIZATION(yql_yt_binary_yson_compare_impl.h)
+
END()
diff --git a/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.cpp b/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.cpp
index 386b623ac4c..be69a701d0a 100644
--- a/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.cpp
+++ b/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.cpp
@@ -7,6 +7,22 @@ int CompareKeyRows(const TFmrTableKeysBoundary& lhs, const TFmrTableKeysBoundary
return CompareKeyRowsAcrossYsonBlocks(lhs.Row, lhs.Markup, rhs.Row, rhs.Markup, lhs.SortOrders);
}
+void TSortingColumns::Save(IOutputStream* buffer) const {
+ ::SaveMany(
+ buffer,
+ Columns,
+ SortOrders
+ );
+}
+
+void TSortingColumns::Load(IInputStream* buffer) {
+ ::LoadMany(
+ buffer,
+ Columns,
+ SortOrders
+ );
+}
+
int TBinaryYsonComparator::CompareYsonValues(TColumnOffsetRange lhs, TColumnOffsetRange rhs) const {
Y_ENSURE(lhs.IsValid(), "Invalid column offset range");
Y_ENSURE(rhs.IsValid(), "Invalid column offset range");
diff --git a/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.h b/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.h
index 09d40be169d..e665370c5ea 100644
--- a/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.h
+++ b/yt/yql/providers/yt/fmr/utils/comparator/yql_yt_binary_yson_comparator.h
@@ -14,6 +14,9 @@ struct TSortingColumns {
std::vector<TString> Columns;
std::vector<ESortOrder> SortOrders;
bool operator==(const TSortingColumns&) const = default;
+
+ void Save(IOutputStream* buffer) const;
+ void Load(IInputStream* buffer);
};
struct TFmrTableKeysBoundary {
diff --git a/yt/yql/providers/yt/fmr/utils/ya.make b/yt/yql/providers/yt/fmr/utils/ya.make
index 2d110be7237..b5c5ed74de6 100644
--- a/yt/yql/providers/yt/fmr/utils/ya.make
+++ b/yt/yql/providers/yt/fmr/utils/ya.make
@@ -29,6 +29,7 @@ PEERDIR(
yt/yql/providers/yt/codec
yt/yql/providers/yt/lib/yson_helpers
yql/essentials/utils
+ yt/yql/providers/yt/fmr/test_tools/yson
)
YQL_LAST_ABI_VERSION()
diff --git a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp
index 761d61fbaa7..3ef22f42b2e 100644
--- a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp
+++ b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.cpp
@@ -34,7 +34,7 @@ void ParseRecordsImpl(
break;
}
curYsonRow.clear();
- CopyYson(cmd, inputBuf, curYsonRow);
+ CopyYsonWithAttrs(cmd, inputBuf, curYsonRow);
bool needBreak = false;
if (!inputBuf.TryRead(cmd)) {
needBreak = true;
diff --git a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h
index b5aedb6e877..4c1fd4e03c6 100644
--- a/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h
+++ b/yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h
@@ -13,7 +13,8 @@ void ParseRecords(
ui64 blockCount,
ui64 blockSize,
std::shared_ptr<std::atomic<bool>> cancelFlag,
- const TMaybe<TMutex>& writeMutex = Nothing());
+ const TMaybe<TMutex>& writeMutex = Nothing()
+);
void StreamBulkToYtDistributed(
NYT::TRawTableReaderPtr reader,
diff --git a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.cpp b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.cpp
index b96c7939681..164e83fed5d 100644
--- a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.cpp
+++ b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.cpp
@@ -18,6 +18,7 @@ TTableDataServiceBlockIterator::TTableDataServiceBlockIterator(
std::vector<TString> neededColumns,
TString serializedColumnGroupsSpec,
TMaybe<bool> isFirstRowKeysInclusive,
+ TMaybe<bool> isLastRowKeysInclusive,
TMaybe<TString> firstRowKeys,
TMaybe<TString> lastRowKeys,
ui64 readAheadChunks
@@ -45,6 +46,8 @@ TTableDataServiceBlockIterator::TTableDataServiceBlockIterator(
}
if (lastRowKeys) {
LastBoundary_ = TFmrTableKeysBoundary(*lastRowKeys, KeyColumns_, SortOrders_);
+ Y_ENSURE(isLastRowKeysInclusive.Defined(), "isLastRowKeysInclusive must be defined for Last Boundary");
+ IsLastBoundInclusive_ = *isLastRowKeysInclusive;
}
if (SerializedColumnGroupsSpec_.empty()) {
@@ -152,6 +155,8 @@ bool TTableDataServiceBlockIterator::RowInKeyBounds(const TString& blob, const T
);
if (c > 0) { // if row > last boundary
return false;
+ } else if (!IsLastBoundInclusive_ && c == 0) {
+ return false;
}
}
return true;
diff --git a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.h b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.h
index 0bf0a4e0268..4ab202869fc 100644
--- a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.h
+++ b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_tds_block_iterator.h
@@ -24,6 +24,7 @@ public:
std::vector<TString> neededColumns,
TString serializedColumnGroupsSpec = {},
TMaybe<bool> isFirstRowKeysInclusive = Nothing(),
+ TMaybe<bool> isLastRowKeysInclusive = Nothing(),
TMaybe<TString> firstRowKeys = Nothing(),
TMaybe<TString> lastRowKeys = Nothing(),
ui64 readAheadChunks = 4
@@ -67,6 +68,7 @@ private:
TMaybe<TFmrTableKeysBoundary> FirstBoundary_;
TMaybe<TFmrTableKeysBoundary> LastBoundary_;
bool IsFirstBoundInclusive_ = true;
+ bool IsLastBoundInclusive_ = true;
std::vector<TString> GroupNamesToRead_;
diff --git a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.cpp b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.cpp
index f3ae9cb8a54..70f256aa45f 100644
--- a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.cpp
+++ b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.cpp
@@ -12,6 +12,7 @@ TYtBlockIterator::TYtBlockIterator(
TYtBlockIteratorSettings settings,
std::vector<ESortOrder> sortOrders,
TMaybe<bool> isFirstRowKeysInclusive,
+ TMaybe<bool> isLastRowKeysInclusive,
TMaybe<TString> firstRowKeys,
TMaybe<TString> lastRowKeys
)
@@ -34,6 +35,8 @@ TYtBlockIterator::TYtBlockIterator(
}
if (lastRowKeys) {
LastBoundary_ = TFmrTableKeysBoundary(*lastRowKeys, KeyColumns_, SortOrders_);
+ Y_ENSURE(isLastRowKeysInclusive.Defined(), "isLastRowKeysInclusive must be defined for Last Boundary");
+ IsLastBoundInclusive_ = *isLastRowKeysInclusive;
}
}
@@ -64,6 +67,8 @@ bool TYtBlockIterator::RowInKeyBounds(const TString& blob, const TRowIndexMarkup
);
if (c > 0) { // if row > last bound
return false;
+ } else if (!IsLastBoundInclusive_ && c == 0) { // if row == first bound
+ return false;
}
}
return true;
diff --git a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.h b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.h
index d9cc9c21e52..dd788cc93fe 100644
--- a/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.h
+++ b/yt/yql/providers/yt/fmr/utils/yson_block_iterator/impl/yql_yt_yson_yt_block_iterator.h
@@ -27,6 +27,7 @@ public:
TYtBlockIteratorSettings settings,
std::vector<ESortOrder> sortOrders = {},
TMaybe<bool> isFirstRowKeysInclusive = Nothing(),
+ TMaybe<bool> isLastRowKeysInclusive = Nothing(),
TMaybe<TString> firstRowKeys = Nothing(),
TMaybe<TString> lastRowKeys = Nothing()
);
@@ -55,6 +56,7 @@ private:
TMaybe<TFmrTableKeysBoundary> FirstBoundary_;
TMaybe<TFmrTableKeysBoundary> LastBoundary_;
bool IsFirstBoundInclusive_ = true;
+ bool IsLastBoundInclusive_ = true;
};
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_job_service/file/ut/ya.make b/yt/yql/providers/yt/fmr/yt_job_service/file/ut/ya.make
index 884cda6f788..a7e0c544d1e 100644
--- a/yt/yql/providers/yt/fmr/yt_job_service/file/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/yt_job_service/file/ut/ya.make
@@ -7,6 +7,8 @@ SRCS(
PEERDIR(
yt/yql/providers/yt/fmr/yt_job_service/file
yt/yql/providers/yt/gateway/file
+ yt/yql/providers/yt/fmr/utils/comparator
+ yt/yql/providers/yt/fmr/utils
)
YQL_LAST_ABI_VERSION()
diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
index 333d8aa457a..b016e92ff5f 100644
--- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
+++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
@@ -8,6 +8,7 @@
#include <yt/yql/providers/yt/gateway/lib/exec_ctx.h>
#include <yt/yql/providers/yt/gateway/lib/yt_attrs.h>
#include <yt/yql/providers/yt/gateway/lib/map_builder.h>
+#include <yt/yql/providers/yt/gateway/lib/reduce_builder.h>
#include <yt/yql/providers/yt/gateway/lib/yt_helpers.h>
#include <yt/yql/providers/yt/gateway/native/yql_yt_native.h>
#include <yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h>
@@ -504,6 +505,8 @@ public:
future = DoMap(op.Cast(), execCtx, ctx);
} else if (auto op = opBase.Maybe<TYtSort>()) {
future = DoSort(execCtx);
+ } else if (auto op = opBase.Maybe<TYtReduce>()) {
+ future = DoReduce(op.Cast(), execCtx, ctx);
} else {
// We don't support this operation
return UploadFmrInputsAndForwardToUnderlyingGateway(execCtx, node, ctx, std::move(options), nodePos);
@@ -2113,8 +2116,6 @@ private:
auto outputTable = outputTables[0];
TString outputCluster = execCtx->Cluster_;
-
-
auto outputTableColumnGroups = GetOutputTablesColumnGroups(execCtx);
TFmrTableId outputTableFmrId(outputCluster, outputTable.Path);
auto columnGroupSpec = outputTableColumnGroups[0];
@@ -2223,6 +2224,93 @@ private:
return GetRunningOperationFuture(sortedMergeOperationRequest, sessionId);
}
+ TFuture<void> GetUploadFilesToDistributedCacheFuture(
+ const TExecContextSimple<TRunOptions>::TPtr& execCtx,
+ std::shared_ptr<TFmrUserJob> fmrJob,
+ TString& lambdaCode,
+ std::vector<TFileInfo>& filesToUpload,
+ std::vector<TYtResourceInfo> ytResources,
+ std::vector<TFmrResourceOperationInfo>& fmrResources
+ ) {
+ if (!FmrServices_->FileUploadService) {
+ // For now, logic for file gateway is not implemented yet, and fileUpload service is not set.
+ // TODO (@cdzyura171) - all udfs should be executed locally, use TFileLabmdaBuilder and file transformer.
+ return MakeFuture();
+ }
+ YQL_ENSURE(UrlMapper_ && Clusters_);
+
+ TString sessionId = execCtx->GetSessionId();
+
+ execCtx->MakeUserFiles();
+ auto tmpFiles = MakeIntrusive<TTempFiles>(execCtx->FileStorage_->GetTemp());
+
+ auto client = execCtx->CreateYtClient(execCtx->Options_.Config());
+
+ auto downloader = MakeYtNativeFileDownloader(execCtx->Gateway, sessionId, execCtx->Cluster_, execCtx->Options_.Config(), client, tmpFiles);
+ TTransformerFiles transformerFiles;
+ {
+ TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),execCtx->FunctionRegistry_->SupportsSizedAllocators());
+ alloc.SetLimit(execCtx->Options_.Config()->DefaultCalcMemoryLimit.Get().GetOrElse(0));
+ TMapJobBuilder jobBuilder;
+ // TODO - this function is the same for map and reduce, make function with template builder argument instead of method.
+ transformerFiles = jobBuilder.UpdateAndSetMapLambda(alloc, execCtx, downloader, lambdaCode, fmrJob.get());
+ }
+
+ for (auto& fileInfo: transformerFiles.LocalFiles) {
+ filesToUpload.emplace_back(TFileInfo{
+ .LocalPath = fileInfo.first,
+ .Md5Key = fileInfo.second.Hash,
+ .Alias = TFsPath(fileInfo.first).GetName() // uniqueId
+ });
+ }
+ for (auto& fileInfo: transformerFiles.DeferredUdfFiles) {
+ filesToUpload.emplace_back(TFileInfo{.LocalPath = fileInfo.first, .Md5Key = fileInfo.second.Hash});
+ }
+
+ auto remoteFilesClusterConnection = GetTableClusterConnection(execCtx->Cluster_, sessionId, execCtx->Options_.Config());
+
+ for (auto& richPath: transformerFiles.RemoteFiles) {
+ // Remote files all should have the same cluster, and GatewayTransformer clears it from richPaths, so we need to fill it.
+ richPath.Cluster(execCtx->Cluster_);
+
+ // Checking in case remotePath is a table which is already inserted in fmr.
+ TFmrTableId fmrTableId(richPath);
+ auto fmrTablePresenceStatus = GetTablePresenceStatus(fmrTableId, sessionId);
+ if (fmrTablePresenceStatus == ETablePresenceStatus::OnlyInFmr || fmrTablePresenceStatus == ETablePresenceStatus::Both) {
+ TFmrTableRef fmrTableRef{.FmrTableId = fmrTableId};
+ fmrTableRef.SerializedColumnGroups = GetColumnGroupSpec(fmrTableRef.FmrTableId, sessionId);
+ if (!richPath.Columns_.Empty()) {
+ std::vector<TString> neededColumns(richPath.Columns_->Parts_.begin(), richPath.Columns_->Parts_.end());
+ fmrTableRef.Columns = neededColumns;
+ }
+
+ YQL_ENSURE(richPath.FileName_.Defined()); // uniqueId, filled in transformer.
+ fmrResources.emplace_back(TFmrResourceOperationInfo{.FmrTable = fmrTableRef, .Alias = *richPath.FileName_});
+ continue;
+ }
+
+ // adding remotePath info to list of ytResources to download in jobs.
+
+ TYtResourceInfo ytResourceInfo{.RichPath = richPath};
+ ytResourceInfo.YtServerName = remoteFilesClusterConnection.YtServerName;
+ if (remoteFilesClusterConnection.Token.Defined()) {
+ ytResourceInfo.Token = *remoteFilesClusterConnection.Token;
+ }
+ ytResources.emplace_back(ytResourceInfo);
+ }
+
+ for (auto& fileInfo: filesToUpload) {
+ for (auto& [udfModule, udfPrefix]: transformerFiles.JobUdfs) {
+ if (fileInfo.Alias.empty() && fileInfo.LocalPath.EndsWith(udfModule.substr(2))) {
+ YQL_CLOG(DEBUG, FastMapReduce) << "Setting file alias " << udfModule << " for udf with path " << fileInfo.LocalPath;
+ fileInfo.Alias = udfModule;
+ }
+ }
+ }
+
+ return UploadFilesToDistributedCache(filesToUpload);
+ }
+
TFuture<TFmrOperationResult> DoMap(
TYtMap map,
const TExecContextSimple<TRunOptions>::TPtr& execCtx,
@@ -2237,7 +2325,6 @@ private:
auto [mapInputTables, clusterConnections] = GetInputTablesAndConnections(execCtx->InputTables_, sessionId, execCtx->Options_.Config());
-
auto mapJob = std::make_shared<TFmrUserJob>();
TMapJobBuilder mapJobBuilder;
@@ -2251,96 +2338,15 @@ private:
bool useSkiff = false;
bool forceYsonInputFormat = true;
mapJobBuilder.SetMapJobParams(mapJob.get(), execCtx,remapperMap, remapperAllFiles, useSkiff, forceYsonInputFormat, false);
- mapJob->SetIsOrdered(ordered);
+ auto mapJobType = ordered ? EFmrJobType::OrderedMap : EFmrJobType::Map;
+ mapJob->SetFmrJobType(mapJobType);
mapJob->SetSettings(TFmrUserJobSettings());
- TFuture<void> uploadFilesToDistributedCacheIfNeededFuture;
std::vector<TFileInfo> filesToUpload; // Udfs and local files to upload to dist cache.
std::vector<TYtResourceInfo> ytResources; // Yt files and small tables which we need to download as files in jobs.
std::vector<TFmrResourceOperationInfo> fmrResources; // Yt small tables, which are already in fmr and we need to download as files in jobs.
- if (!FmrServices_->FileUploadService) {
- uploadFilesToDistributedCacheIfNeededFuture = MakeFuture();
- } else {
- YQL_ENSURE(UrlMapper_ && Clusters_);
-
- execCtx->MakeUserFiles();
- auto tmpFiles = MakeIntrusive<TTempFiles>(execCtx->FileStorage_->GetTemp());
-
- auto client = execCtx->CreateYtClient(execCtx->Options_.Config());
-
- auto downloader = MakeYtNativeFileDownloader(execCtx->Gateway, sessionId, execCtx->Cluster_, execCtx->Options_.Config(), client, tmpFiles);
- TTransformerFiles transformerFiles;
- {
- TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),execCtx->FunctionRegistry_->SupportsSizedAllocators());
- alloc.SetLimit(execCtx->Options_.Config()->DefaultCalcMemoryLimit.Get().GetOrElse(0));
- transformerFiles = mapJobBuilder.UpdateAndSetMapLambda(alloc, execCtx, downloader, mapLambda, mapJob.get(), useSkiff);
- }
-
- for (auto& fileInfo: transformerFiles.LocalFiles) {
- filesToUpload.emplace_back(TFileInfo{
- .LocalPath = fileInfo.first,
- .Md5Key = fileInfo.second.Hash,
- .Alias = TFsPath(fileInfo.first).GetName() // uniqueId
- });
- }
- for (auto& fileInfo: transformerFiles.DeferredUdfFiles) {
- filesToUpload.emplace_back(TFileInfo{.LocalPath = fileInfo.first, .Md5Key = fileInfo.second.Hash});
- }
-
- auto remoteFilesClusterConnection = GetTableClusterConnection(execCtx->Cluster_, sessionId, execCtx->Options_.Config());
-
- for (auto& richPath: transformerFiles.RemoteFiles) {
- // Remote files all should have the same cluster, and GatewayTransformer clears it from richPaths, so we need to fill it.
- richPath.Cluster(execCtx->Cluster_);
-
- // Checking in case remotePath is a table which is already inserted in fmr.
- TFmrTableId fmrTableId(richPath);
- auto fmrTablePresenceStatus = GetTablePresenceStatus(fmrTableId, sessionId);
- if (fmrTablePresenceStatus == ETablePresenceStatus::OnlyInFmr || fmrTablePresenceStatus == ETablePresenceStatus::Both) {
- TFmrTableRef fmrTableRef{.FmrTableId = fmrTableId};
- fmrTableRef.SerializedColumnGroups = GetColumnGroupSpec(fmrTableRef.FmrTableId, sessionId);
- if (!richPath.Columns_.Empty()) {
- std::vector<TString> neededColumns(richPath.Columns_->Parts_.begin(), richPath.Columns_->Parts_.end());
- fmrTableRef.Columns = neededColumns;
- }
-
- YQL_ENSURE(richPath.FileName_.Defined()); // uniqueId, filled in transformer.
- fmrResources.emplace_back(TFmrResourceOperationInfo{.FmrTable = fmrTableRef, .Alias = *richPath.FileName_});
- continue;
- }
-
- // adding remotePath info to list of ytResources to download in jobs.
-
- if (!richPath.TransactionId_.Defined() && remoteFilesClusterConnection.TransactionId) {
- richPath.TransactionId(GetGuid(remoteFilesClusterConnection.TransactionId));
- }
-
- TYtResourceInfo ytResourceInfo{.RichPath = richPath};
- ytResourceInfo.YtServerName = remoteFilesClusterConnection.YtServerName;
- if (remoteFilesClusterConnection.Token.Defined()) {
- ytResourceInfo.Token = *remoteFilesClusterConnection.Token;
- }
- ytResources.emplace_back(ytResourceInfo);
- }
-
- for (auto& fileInfo: filesToUpload) {
- for (auto& [udfModule, udfPrefix]: transformerFiles.JobUdfs) {
- if (fileInfo.Alias.empty() && fileInfo.LocalPath.EndsWith(udfModule.substr(2))) {
- YQL_CLOG(DEBUG, FastMapReduce) << "Setting file alias " << udfModule << " for udf with path " << fileInfo.LocalPath;
- fileInfo.Alias = udfModule;
- }
- }
- }
-
- if (!filesToUpload.empty()) {
- YQL_ENSURE(FmrServices_->FileUploadService, "FileUploadService is not configured, but map operation requires uploading "
- << filesToUpload.size() << " files (UDFs/local files) to distributed cache. "
- << "Please configure FileRemoteCacheName in FmrConfigurations and FileCacheConfigurations in gateways.conf");
- }
-
- uploadFilesToDistributedCacheIfNeededFuture = UploadFilesToDistributedCache(filesToUpload);
- }
+ TFuture<void> uploadFilesToDistributedCacheIfNeededFuture = GetUploadFilesToDistributedCacheFuture(execCtx, mapJob, mapLambda, filesToUpload, ytResources, fmrResources);
return uploadFilesToDistributedCacheIfNeededFuture.Apply([=, this] (const auto& f) mutable {
f.GetValue();
@@ -2348,12 +2354,7 @@ private:
TStringStream jobStateStream;
mapJob->Save(jobStateStream);
- TMapOperationParams mapOperationParams{
- .Input = mapInputTables,
- .Output = fmrOutputTables,
- .SerializedMapJobState = jobStateStream.Str(),
- .IsOrdered = ordered
- };
+ TMapOperationParams mapOperationParams{.Input = mapInputTables,.Output = fmrOutputTables, .SerializedMapJobState = jobStateStream.Str(), .MapJobType = mapJobType};
TStartOperationRequest mapOperationRequest{
.OperationType = EOperationType::Map,
.OperationParams = mapOperationParams,
@@ -2430,6 +2431,158 @@ private:
return GetRunningOperationFuture(sortOperationRequest, sessionId);
}
+ TSortingColumns GetSortingColumnsFromColumnPairList(const TVector<std::pair<TString, bool>>& sortColumns) {
+ TSortingColumns sortingColumns;
+ for (auto& [colName, isAscending]: sortColumns) {
+ sortingColumns.Columns.emplace_back(colName);
+ ESortOrder sortOrder = isAscending ? ESortOrder::Ascending : ESortOrder::Descending;
+ sortingColumns.SortOrders.emplace_back(sortOrder);
+ }
+ return sortingColumns;
+ }
+
+ TFuture<TFmrOperationResult> DoReduce(
+ TYtReduce reduce,
+ const TExecContextSimple<TRunOptions>::TPtr& execCtx,
+ TExprContext& ctx
+ ) {
+ TString sessionId = execCtx->GetSessionId();
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId);
+ YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
+
+ auto reduceBy = NYql::GetSettingAsColumnPairList(reduce.Settings().Ref(), EYtSettingType::ReduceBy);
+ auto sortBy = NYql::GetSettingAsColumnPairList(reduce.Settings().Ref(), EYtSettingType::SortBy);
+ bool joinReduce = NYql::HasSetting(reduce.Settings().Ref(), EYtSettingType::JoinReduce);
+ bool useFirstAsPrimary = NYql::HasSetting(reduce.Settings().Ref(), EYtSettingType::FirstAsPrimary);
+
+ TVector<TString> sortLimitBy = NYql::GetSettingAsColumnList(reduce.Settings().Ref(), EYtSettingType::SortLimitBy);
+ TMaybe<ui64> limit = GetLimit(reduce.Settings().Ref());
+ if (limit && !sortLimitBy.empty() && *limit > execCtx->Options_.Config()->TopSortMaxLimit.Get().GetOrElse(DEFAULT_TOP_SORT_LIMIT)) {
+ limit.Clear();
+ }
+
+ auto [reduceInputTables, clusterConnections] = GetInputTablesAndConnections(execCtx->InputTables_, sessionId, execCtx->Options_.Config());
+ auto reduceOutputTables = GetOutputTables(execCtx);
+
+ auto reduceJob = std::make_shared<TFmrUserJob>();
+ TReduceJobBuilder reduceJobBuilder;
+
+ TVector<ui32> groups;
+ TVector<TString> tables;
+ TVector<ui64> rowOffsets;
+ ui64 currentRowOffset = 0;
+ std::vector<NYT::TRichYPath> primaryInputTablesPaths;
+
+ YQL_ENSURE(!execCtx->InputTables_.empty());
+ const ui32 primaryGroup = useFirstAsPrimary ? execCtx->InputTables_.front().Group : execCtx->InputTables_.back().Group;
+ for (const auto& table : execCtx->InputTables_) {
+ if (joinReduce) {
+ auto yPath = table.Path;
+ if (table.Group == primaryGroup) {
+ primaryInputTablesPaths.emplace_back(yPath);
+ }
+ }
+
+ if (!groups.empty() && groups.back() != table.Group) {
+ currentRowOffset = 0;
+ }
+
+ groups.push_back(table.Group);
+ tables.push_back(table.Temp ? TString() : table.Name);
+ rowOffsets.push_back(currentRowOffset);
+ currentRowOffset += table.Records;
+ }
+
+ THashSet<TString> auxColumns;
+ std::for_each(reduceBy.begin(), reduceBy.end(), [&auxColumns](const auto& it) { auxColumns.insert(it.first); });
+ if (!sortBy.empty()) {
+ std::for_each(sortBy.begin(), sortBy.end(), [&auxColumns](const auto& it) { auxColumns.insert(it.first); });
+ }
+
+ if (sortBy.empty() && !joinReduce) {
+ sortBy = reduceBy;
+ }
+
+ // handle unsupported reduce types.
+ auto fallbackReduceOperationResult = TFmrOperationResult{
+ .Errors = {
+ TFmrError{
+ .Component = EFmrComponent::Gateway,
+ .Reason = EFmrErrorReason::FallbackOperation,
+ }
+ }
+ };
+
+ if (joinReduce) {
+ fallbackReduceOperationResult.Errors[0].ErrorMessage = "Join Reduce is not supported yet, falling back to underlying gateway";
+ return MakeFuture(fallbackReduceOperationResult);
+ }
+
+ for (auto& table: reduceInputTables) {
+ if (std::holds_alternative<TYtTableRef>(table)) {
+ auto ytTable = std::get<TYtTableRef>(table);
+ fallbackReduceOperationResult.Errors[0].ErrorMessage = TStringBuilder() << "Table " << ytTable.GetPath() << " is not in fmr - falling back to underlying gateway";
+ return MakeFuture(fallbackReduceOperationResult);
+ }
+ }
+
+
+ TReduceOperationSpec reduceOperationSpec{
+ .ReduceBy = GetSortingColumnsFromColumnPairList(reduceBy),
+ .SortBy = GetSortingColumnsFromColumnPairList(sortBy),
+ .ReduceType = joinReduce ? EReduceType::JoinReduce : EReduceType::SortedReduce
+ }; // TODO - add JoinReduceSpec, for now not supported.
+
+ reduceJobBuilder.SetInputType(reduceJob.get(), reduce);
+ reduceJobBuilder.SetReduceJobParams(reduceJob.get(), execCtx, groups, tables, rowOffsets, auxColumns);
+ TString reduceLambda = reduceJobBuilder.SetReduceLambdaCode(reduceJob.get(), reduce, execCtx, ctx);
+
+ reduceJob->SetSettings(TFmrUserJobSettings());
+
+ std::vector<TFileInfo> filesToUpload; // Udfs and local files to upload to dist cache.
+ std::vector<TYtResourceInfo> ytResources; // Yt files and small tables which we need to download as files in jobs.
+ std::vector<TFmrResourceOperationInfo> fmrResources; // Yt small tables, which are already in fmr and we need to download as files in jobs.
+
+ TFuture<void> uploadFilesToDistributedCacheIfNeededFuture = GetUploadFilesToDistributedCacheFuture(execCtx, reduceJob, reduceLambda, filesToUpload, ytResources, fmrResources);
+
+ return uploadFilesToDistributedCacheIfNeededFuture.Apply([=, this] (const auto& f) mutable {
+ f.GetValue();
+ // serializing job State
+ TStringStream jobStateStream;
+ reduceJob->Save(jobStateStream);
+
+ TReduceOperationParams reduceOperationParams{
+ .Input = reduceInputTables,
+ .Output = reduceOutputTables,
+ .SerializedReduceJobState = jobStateStream.Str(),
+ .ReduceOperationSpec = reduceOperationSpec
+ };
+ TStartOperationRequest reduceOperationRequest{
+ .OperationType = EOperationType::Reduce,
+ .OperationParams = reduceOperationParams,
+ .SessionId = sessionId,
+ .IdempotencyKey = GenerateId(),
+ .NumRetries = 1,
+ .ClusterConnections = clusterConnections,
+ .FmrOperationSpec = execCtx->Options_.Config()->FmrOperationSpec.Get(execCtx->Cluster_),
+ .Files = filesToUpload,
+ .YtResources = ytResources,
+ .FmrResources = fmrResources
+ };
+
+ std::vector<TString> inputPaths, outputPaths;
+ std::transform(execCtx->InputTables_.begin(), execCtx->InputTables_.end(), std::back_inserter(inputPaths), [](const auto& table) {
+ return table.Cluster + "." + table.Name;}
+ );
+ std::transform(execCtx->OutTables_.begin(), execCtx->OutTables_.end(), std::back_inserter(outputPaths), [execCtx](const auto& table) {
+ return execCtx->Cluster_ + "." + table.Path;}
+ );
+
+ YQL_CLOG(INFO, FastMapReduce) << "Starting reduce from yt tables: " << JoinRange(' ', inputPaths.begin(), inputPaths.end()) << " to yt tables: " << JoinRange(' ', outputPaths.begin(), outputPaths.end());
+ return GetRunningOperationFuture(reduceOperationRequest, sessionId);
+ });
+ }
+
TFuture<TFmrOperationResult> GetSuccessfulFmrOperationResult() {
TFmrOperationResult fmrOperationResult = TFmrOperationResult();
fmrOperationResult.SetSuccess();