aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-10-21 14:41:35 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-10-21 14:57:09 +0300
commitd8084877032fb2ef042227048eddd1cd51313537 (patch)
tree329b84824944e7c622faa137053dbd83ad7c55b6
parent362e06fbca53f6c9d264a0c67eede0d10e2fc926 (diff)
downloadydb-d8084877032fb2ef042227048eddd1cd51313537.tar.gz
KIKIMR-19802: split throught another way withno useless convertation
-rw-r--r--.mapping.json5
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp14
-rw-r--r--ydb/core/formats/arrow/permutations.cpp10
-rw-r--r--ydb/core/formats/arrow/reader/read_filter_merger.cpp9
-rw-r--r--ydb/core/formats/arrow/reader/read_filter_merger.h84
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.darwin-x86_64.txt20
-rw-r--r--ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-aarch64.txt21
-rw-r--r--ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-x86_64.txt21
-rw-r--r--ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.windows-x86_64.txt20
-rw-r--r--ydb/core/tx/columnshard/engines/changes/counters/general.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/changes/counters/ya.make13
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp21
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp6
19 files changed, 253 insertions, 22 deletions
diff --git a/.mapping.json b/.mapping.json
index 4da05c1d3a..0d2c687713 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -5353,6 +5353,11 @@
"ydb/core/tx/columnshard/engines/changes/compaction/CMakeLists.linux-x86_64.txt":"",
"ydb/core/tx/columnshard/engines/changes/compaction/CMakeLists.txt":"",
"ydb/core/tx/columnshard/engines/changes/compaction/CMakeLists.windows-x86_64.txt":"",
+ "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-aarch64.txt":"",
+ "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-x86_64.txt":"",
+ "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.txt":"",
+ "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.windows-x86_64.txt":"",
"ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt":"",
"ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt":"",
"ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt":"",
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index 2b8c23cad2..568ea59f6a 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -171,7 +171,7 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr
auto srcSchema = srcBatch->schema();
for (auto& name : columnNames) {
- int pos = srcSchema->GetFieldIndex(name);
+ const int pos = srcSchema->GetFieldIndex(name);
AFL_VERIFY(pos >= 0)("field_name", name);
fields.push_back(srcSchema->field(pos));
columns.push_back(srcBatch->column(pos));
@@ -206,7 +206,7 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::
auto srcField = srcBatch->schema()->GetFieldByName(field->name());
Y_ABORT_UNLESS(srcField);
if (!field->Equals(srcField)) {
- AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_parse_incoming_batch")("reason", "invalid_column_type")("column", field->name())
+ AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name())
("column_type", field->ToString(true))("incoming_type", srcField->ToString(true));
return nullptr;
}
@@ -214,7 +214,7 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::
Y_ABORT_UNLESS(columns.back());
if (!columns.back()->type()->Equals(field->type())) {
- AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_parse_incoming_batch")("reason", "invalid_column_type")("column", field->name())
+ AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name())
("column_type", field->type()->ToString())("incoming_type", columns.back()->type()->ToString());
return nullptr;
}
@@ -696,7 +696,11 @@ int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr
std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) {
auto sortPermutation = MakeSortPermutation(batch, sortingKey, andUnique);
- return Reorder(batch, sortPermutation, andUnique);
+ if (sortPermutation) {
+ return Reorder(batch, sortPermutation, andUnique);
+ } else {
+ return batch;
+ }
}
std::shared_ptr<arrow::Array> BoolVecToArray(const std::vector<bool>& vec) {
@@ -862,7 +866,7 @@ NJson::TJsonValue DebugJson(std::shared_ptr<arrow::Array> array, const ui32 head
resultFull.InsertValue("tail", tail);
auto& result = resultFull.InsertValue("data", NJson::JSON_ARRAY);
for (int i = 0; i < column.length(); ++i) {
- if (i >= (int)head && i + (int)tail <= column.length()) {
+ if (i >= (int)head && i + (int)tail < column.length()) {
continue;
}
if constexpr (arrow::has_string_view<typename TWrap::T>()) {
diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp
index ae03d1dbec..ef11ab2c49 100644
--- a/ydb/core/formats/arrow/permutations.cpp
+++ b/ydb/core/formats/arrow/permutations.cpp
@@ -71,6 +71,8 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar
TStatusValidator::Validate(builder.Reserve(points.size()));
TRawReplaceKey* predKey = nullptr;
+ int predPosition = -1;
+ bool isTrivial = true;
for (auto& point : points) {
if (andUnique) {
if (predKey) {
@@ -83,10 +85,18 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar
}
}
}
+ if (point.GetPosition() != predPosition + 1) {
+ isTrivial = false;
+ }
+ predPosition = point.GetPosition();
TStatusValidator::Validate(builder.Append(point.GetPosition()));
predKey = &point;
}
+ if (isTrivial && builder.length() == (i64)points.size()) {
+ return nullptr;
+ }
+
std::shared_ptr<arrow::UInt64Array> out;
TStatusValidator::Validate(builder.Finish(&out));
return out;
diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.cpp b/ydb/core/formats/arrow/reader/read_filter_merger.cpp
index 1dd756701f..dbcc795104 100644
--- a/ydb/core/formats/arrow/reader/read_filter_merger.cpp
+++ b/ydb/core/formats/arrow/reader/read_filter_merger.cpp
@@ -15,7 +15,7 @@ NJson::TJsonValue TSortableBatchPosition::DebugJson() const {
return result;
}
-std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool greater) {
+std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool greater, const std::optional<ui32> includedStartPosition) {
if (!batch || !batch->num_rows()) {
return {};
}
@@ -25,6 +25,9 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
if (forFound.IsReverseSort()) {
std::swap(posStart, posFinish);
}
+ if (includedStartPosition) {
+ posStart = *includedStartPosition;
+ }
TSortableBatchPosition position = forFound.BuildSame(batch, posStart);
{
position.InitPosition(posStart);
@@ -64,7 +67,7 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
}
TSortableBatchPosition::TFoundPosition TSortableBatchPosition::SkipToLower(const TSortableBatchPosition& forFound) {
- auto pos = FindPosition(Batch, forFound, true);
+ auto pos = FindPosition(Batch, forFound, true, Position);
AFL_VERIFY(pos)("batch", NArrow::DebugJson(Batch, 1, 1))("found", forFound.DebugJson());
if (ReverseSort) {
AFL_VERIFY(pos->GetPosition() <= Position)("pos", Position)("pos_skip", pos->GetPosition())("reverse", true);
@@ -75,7 +78,7 @@ TSortableBatchPosition::TFoundPosition TSortableBatchPosition::SkipToLower(const
return *pos;
}
-TSortableScanData::TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns) {
+TSortableScanData::TSortableScanData(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columns) {
for (auto&& i : columns) {
auto c = batch->GetColumnByName(i);
AFL_VERIFY(c)("column_name", i)("columns", JoinSeq(",", columns));
diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.h b/ydb/core/formats/arrow/reader/read_filter_merger.h
index c004316bcc..c5f9de0fdd 100644
--- a/ydb/core/formats/arrow/reader/read_filter_merger.h
+++ b/ydb/core/formats/arrow/reader/read_filter_merger.h
@@ -3,8 +3,8 @@
#include <ydb/core/formats/arrow/arrow_filter.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/switch/switch_type.h>
-#include <ydb/core/formats/arrow/replace_key.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <library/cpp/actors/core/log.h>
#include <util/generic/hash.h>
#include <util/string/join.h>
#include <set>
@@ -19,7 +19,7 @@ private:
YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Field>>, Fields);
public:
TSortableScanData() = default;
- TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns);
+ TSortableScanData(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columns);
bool IsSameSchema(const std::shared_ptr<arrow::Schema>& schema) const {
if (Fields.size() != (size_t)schema->num_fields()) {
@@ -103,13 +103,85 @@ public:
}
};
- static std::optional<TFoundPosition> FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool needGreater);
- TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound);
+ template <class TContainer>
+ class TAssociatedContainerIterator {
+ private:
+ typename TContainer::const_iterator Current;
+ typename TContainer::const_iterator End;
+ public:
+ TAssociatedContainerIterator(const TContainer& container)
+ : Current(container.begin())
+ , End(container.end())
+ {
+ }
+
+ bool IsValid() const {
+ return Current != End;
+ }
+
+ void Next() {
+ ++Current;
+ }
+
+ const auto& CurrentPosition() const {
+ return Current->first;
+ }
+ };
+
+ // (-inf, it1), [it1, it2), [it2, it3), ..., [itLast, +inf)
+ template <class TBordersIterator>
+ static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBorders(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columnNames, TBordersIterator& it) {
+ std::vector<std::shared_ptr<arrow::RecordBatch>> result;
+ if (!batch || batch->num_rows() == 0) {
+ while (it.IsValid()) {
+ result.emplace_back(nullptr);
+ }
+ result.emplace_back(nullptr);
+ return result;
+ }
+ TSortableBatchPosition pos(batch, 0, columnNames, {}, false);
+ bool batchFinished = false;
+ i64 recordsCountSplitted = 0;
+ for (; it.IsValid() && !batchFinished; it.Next()) {
+ const ui32 startPos = pos.GetPosition();
+ auto posFound = pos.SkipToLower(it.CurrentPosition());
+ if (posFound.IsGreater() || posFound.IsEqual()) {
+ if (posFound.GetPosition() == startPos) {
+ result.emplace_back(nullptr);
+ } else {
+ result.emplace_back(batch->Slice(startPos, posFound.GetPosition() - startPos));
+ recordsCountSplitted += result.back()->num_rows();
+ }
+ } else {
+ result.emplace_back(batch->Slice(startPos, posFound.GetPosition() - startPos + 1));
+ recordsCountSplitted += result.back()->num_rows();
+ batchFinished = true;
+ }
+ }
+ if (batchFinished) {
+ for (; it.IsValid(); it.Next()) {
+ result.emplace_back(nullptr);
+ }
+ result.emplace_back(nullptr);
+ } else {
+ AFL_VERIFY(!it.IsValid());
+ result.emplace_back(batch->Slice(pos.GetPosition()));
+ AFL_VERIFY(result.back()->num_rows());
+ recordsCountSplitted += result.back()->num_rows();
+ }
+ AFL_VERIFY(batch->num_rows() == recordsCountSplitted);
+ return result;
+ }
- NArrow::TReplaceKey BuildReplaceKey() const {
- return NArrow::TReplaceKey::FromBatch(Batch, Position);
+ template <class TContainer>
+ static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBordersInAssociativeContainer(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columnNames, const TContainer& container) {
+ TAssociatedContainerIterator<TContainer> it(container);
+ return SplitByBorders(batch, columnNames, it);
}
+ static std::optional<TFoundPosition> FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool needGreater, const std::optional<ui32> includedStartPosition);
+ TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound);
+
const TSortableScanData& GetData() const {
return *Data;
}
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt
index 971b6148ff..bab0224d42 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(abstract)
add_subdirectory(compaction)
+add_subdirectory(counters)
add_library(columnshard-engines-changes)
target_link_libraries(columnshard-engines-changes PUBLIC
@@ -18,6 +19,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC
columnshard-engines-insert_table
engines-changes-abstract
engines-changes-compaction
+ engines-changes-counters
tx-columnshard-splitter
ydb-core-tablet_flat
core-tx-tiering
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt
index de77580057..96d8d37c04 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt
@@ -8,6 +8,7 @@
add_subdirectory(abstract)
add_subdirectory(compaction)
+add_subdirectory(counters)
add_library(columnshard-engines-changes)
target_link_libraries(columnshard-engines-changes PUBLIC
@@ -19,6 +20,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC
columnshard-engines-insert_table
engines-changes-abstract
engines-changes-compaction
+ engines-changes-counters
tx-columnshard-splitter
ydb-core-tablet_flat
core-tx-tiering
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt
index de77580057..96d8d37c04 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(abstract)
add_subdirectory(compaction)
+add_subdirectory(counters)
add_library(columnshard-engines-changes)
target_link_libraries(columnshard-engines-changes PUBLIC
@@ -19,6 +20,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC
columnshard-engines-insert_table
engines-changes-abstract
engines-changes-compaction
+ engines-changes-counters
tx-columnshard-splitter
ydb-core-tablet_flat
core-tx-tiering
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt
index 971b6148ff..bab0224d42 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(abstract)
add_subdirectory(compaction)
+add_subdirectory(counters)
add_library(columnshard-engines-changes)
target_link_libraries(columnshard-engines-changes PUBLIC
@@ -18,6 +19,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC
columnshard-engines-insert_table
engines-changes-abstract
engines-changes-compaction
+ engines-changes-counters
tx-columnshard-splitter
ydb-core-tablet_flat
core-tx-tiering
diff --git a/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..e1f276a65d
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(engines-changes-counters)
+target_link_libraries(engines-changes-counters PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+ ydb-core-tablet_flat
+)
+target_sources(engines-changes-counters PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/counters/general.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..ceccef447f
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(engines-changes-counters)
+target_link_libraries(engines-changes-counters PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+ ydb-core-tablet_flat
+)
+target_sources(engines-changes-counters PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/counters/general.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..ceccef447f
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(engines-changes-counters)
+target_link_libraries(engines-changes-counters PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+ ydb-core-tablet_flat
+)
+target_sources(engines-changes-counters PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/counters/general.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.txt b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..e1f276a65d
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(engines-changes-counters)
+target_link_libraries(engines-changes-counters PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+ ydb-core-tablet_flat
+)
+target_sources(engines-changes-counters PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/counters/general.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/changes/counters/general.cpp b/ydb/core/tx/columnshard/engines/changes/counters/general.cpp
new file mode 100644
index 0000000000..250a050496
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/changes/counters/general.cpp
@@ -0,0 +1,5 @@
+#include "general.h"
+
+namespace NKikimr::NOlap::NChanges {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/changes/counters/ya.make b/ydb/core/tx/columnshard/engines/changes/counters/ya.make
new file mode 100644
index 0000000000..66ca3166da
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/changes/counters/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ general.cpp
+)
+
+PEERDIR(
+ ydb/core/protos
+ library/cpp/actors/core
+ ydb/core/tablet_flat
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index 6677379763..2335394095 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -109,6 +109,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
}
Y_ABORT_UNLESS(Blobs.empty());
+ const std::vector<std::string> comparableColumns = resultSchema->GetIndexInfo().GetReplaceKey()->field_names();
for (auto& [pathId, batches] : pathBatches) {
auto newGranuleId = AddPathIfNotExists(pathId);
NIndexedReader::TMergePartialStream stream(resultSchema->GetIndexInfo().GetReplaceKey(), resultSchema->GetIndexInfo().ArrowSchemaWithSpecials(), false);
@@ -126,16 +127,24 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
stream.SetPossibleSameVersion(true);
stream.DrainAll(builder);
- std::map<NArrow::TReplaceKey, ui64> markers;
- for (auto&& i : PathToGranule[pathId]) {
- markers[i.first.BuildReplaceKey()] = i.second;
- }
THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> batchChunks;
- if (markers.empty()) {
+ if (PathToGranule[pathId].empty()) {
AFL_VERIFY(newGranuleId);
batchChunks[*newGranuleId].emplace_back(builder.Finalize());
} else {
- batchChunks = TMarksGranules::SliceIntoGranules(builder.Finalize(), markers, resultSchema->GetIndexInfo());
+ auto& markers = PathToGranule[pathId];
+ std::vector<std::shared_ptr<arrow::RecordBatch>> result = NIndexedReader::TSortableBatchPosition::SplitByBordersInAssociativeContainer(builder.Finalize(), comparableColumns, markers);
+ AFL_VERIFY(result.size() == markers.size() + 1)("result", result.size())("markers", markers.size());
+ ui32 idx = 0;
+ for (auto&& i : markers) {
+ auto chunk = result[++idx];
+ if (chunk) {
+ batchChunks[i.second].emplace_back(chunk);
+ }
+ }
+ if (result.front()) {
+ batchChunks[markers.begin()->second].emplace_back(result.front());
+ }
}
for (auto&& g : batchChunks) {
for (auto&& b : g.second) {
diff --git a/ydb/core/tx/columnshard/engines/changes/ya.make b/ydb/core/tx/columnshard/engines/changes/ya.make
index acb243b789..fb399e43a8 100644
--- a/ydb/core/tx/columnshard/engines/changes/ya.make
+++ b/ydb/core/tx/columnshard/engines/changes/ya.make
@@ -16,6 +16,7 @@ PEERDIR(
ydb/core/tx/columnshard/engines/insert_table
ydb/core/tx/columnshard/engines/changes/abstract
ydb/core/tx/columnshard/engines/changes/compaction
+ ydb/core/tx/columnshard/engines/changes/counters
ydb/core/tx/columnshard/splitter
ydb/core/tablet_flat
ydb/core/tx/tiering
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
index 6bbdd37c8a..71c086aed9 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
@@ -1,5 +1,7 @@
#include "read_filter_merger.h"
+#include <ydb/core/formats/arrow/permutations.h>
#include <library/cpp/actors/core/log.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_vector.h>
namespace NKikimr::NOlap::NIndexedReader {
@@ -132,8 +134,8 @@ bool TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) {
}
std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition() {
- Y_DEBUG_ABORT_UNLESS(SortHeap.Size());
- Y_DEBUG_ABORT_UNLESS(!SortHeap.Current().IsControlPoint());
+ Y_ABORT_UNLESS(SortHeap.Size());
+ Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint());
TSortableBatchPosition result = SortHeap.Current().GetKeyColumns();
TSortableBatchPosition resultVersion = SortHeap.Current().GetVersionColumns();
bool isFirst = true;