diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-21 14:41:35 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-21 14:57:09 +0300 |
commit | d8084877032fb2ef042227048eddd1cd51313537 (patch) | |
tree | 329b84824944e7c622faa137053dbd83ad7c55b6 | |
parent | 362e06fbca53f6c9d264a0c67eede0d10e2fc926 (diff) | |
download | ydb-d8084877032fb2ef042227048eddd1cd51313537.tar.gz |
KIKIMR-19802: split throught another way withno useless convertation
19 files changed, 253 insertions, 22 deletions
diff --git a/.mapping.json b/.mapping.json index 4da05c1d3a..0d2c687713 100644 --- a/.mapping.json +++ b/.mapping.json @@ -5353,6 +5353,11 @@ "ydb/core/tx/columnshard/engines/changes/compaction/CMakeLists.linux-x86_64.txt":"", "ydb/core/tx/columnshard/engines/changes/compaction/CMakeLists.txt":"", "ydb/core/tx/columnshard/engines/changes/compaction/CMakeLists.windows-x86_64.txt":"", + "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.darwin-x86_64.txt":"", + "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-aarch64.txt":"", + "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-x86_64.txt":"", + "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.txt":"", + "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.windows-x86_64.txt":"", "ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt":"", "ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt":"", "ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index 2b8c23cad2..568ea59f6a 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -171,7 +171,7 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr auto srcSchema = srcBatch->schema(); for (auto& name : columnNames) { - int pos = srcSchema->GetFieldIndex(name); + const int pos = srcSchema->GetFieldIndex(name); AFL_VERIFY(pos >= 0)("field_name", name); fields.push_back(srcSchema->field(pos)); columns.push_back(srcBatch->column(pos)); @@ -206,7 +206,7 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow:: auto srcField = srcBatch->schema()->GetFieldByName(field->name()); Y_ABORT_UNLESS(srcField); if (!field->Equals(srcField)) { - AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_parse_incoming_batch")("reason", "invalid_column_type")("column", field->name()) + AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name()) ("column_type", field->ToString(true))("incoming_type", srcField->ToString(true)); return nullptr; } @@ -214,7 +214,7 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow:: Y_ABORT_UNLESS(columns.back()); if (!columns.back()->type()->Equals(field->type())) { - AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_parse_incoming_batch")("reason", "invalid_column_type")("column", field->name()) + AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name()) ("column_type", field->type()->ToString())("incoming_type", columns.back()->type()->ToString()); return nullptr; } @@ -696,7 +696,11 @@ int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) { auto sortPermutation = MakeSortPermutation(batch, sortingKey, andUnique); - return Reorder(batch, sortPermutation, andUnique); + if (sortPermutation) { + return Reorder(batch, sortPermutation, andUnique); + } else { + return batch; + } } std::shared_ptr<arrow::Array> BoolVecToArray(const std::vector<bool>& vec) { @@ -862,7 +866,7 @@ NJson::TJsonValue DebugJson(std::shared_ptr<arrow::Array> array, const ui32 head resultFull.InsertValue("tail", tail); auto& result = resultFull.InsertValue("data", NJson::JSON_ARRAY); for (int i = 0; i < column.length(); ++i) { - if (i >= (int)head && i + (int)tail <= column.length()) { + if (i >= (int)head && i + (int)tail < column.length()) { continue; } if constexpr (arrow::has_string_view<typename TWrap::T>()) { diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp index ae03d1dbec..ef11ab2c49 100644 --- a/ydb/core/formats/arrow/permutations.cpp +++ b/ydb/core/formats/arrow/permutations.cpp @@ -71,6 +71,8 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar TStatusValidator::Validate(builder.Reserve(points.size())); TRawReplaceKey* predKey = nullptr; + int predPosition = -1; + bool isTrivial = true; for (auto& point : points) { if (andUnique) { if (predKey) { @@ -83,10 +85,18 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar } } } + if (point.GetPosition() != predPosition + 1) { + isTrivial = false; + } + predPosition = point.GetPosition(); TStatusValidator::Validate(builder.Append(point.GetPosition())); predKey = &point; } + if (isTrivial && builder.length() == (i64)points.size()) { + return nullptr; + } + std::shared_ptr<arrow::UInt64Array> out; TStatusValidator::Validate(builder.Finish(&out)); return out; diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.cpp b/ydb/core/formats/arrow/reader/read_filter_merger.cpp index 1dd756701f..dbcc795104 100644 --- a/ydb/core/formats/arrow/reader/read_filter_merger.cpp +++ b/ydb/core/formats/arrow/reader/read_filter_merger.cpp @@ -15,7 +15,7 @@ NJson::TJsonValue TSortableBatchPosition::DebugJson() const { return result; } -std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool greater) { +std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool greater, const std::optional<ui32> includedStartPosition) { if (!batch || !batch->num_rows()) { return {}; } @@ -25,6 +25,9 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi if (forFound.IsReverseSort()) { std::swap(posStart, posFinish); } + if (includedStartPosition) { + posStart = *includedStartPosition; + } TSortableBatchPosition position = forFound.BuildSame(batch, posStart); { position.InitPosition(posStart); @@ -64,7 +67,7 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi } TSortableBatchPosition::TFoundPosition TSortableBatchPosition::SkipToLower(const TSortableBatchPosition& forFound) { - auto pos = FindPosition(Batch, forFound, true); + auto pos = FindPosition(Batch, forFound, true, Position); AFL_VERIFY(pos)("batch", NArrow::DebugJson(Batch, 1, 1))("found", forFound.DebugJson()); if (ReverseSort) { AFL_VERIFY(pos->GetPosition() <= Position)("pos", Position)("pos_skip", pos->GetPosition())("reverse", true); @@ -75,7 +78,7 @@ TSortableBatchPosition::TFoundPosition TSortableBatchPosition::SkipToLower(const return *pos; } -TSortableScanData::TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns) { +TSortableScanData::TSortableScanData(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columns) { for (auto&& i : columns) { auto c = batch->GetColumnByName(i); AFL_VERIFY(c)("column_name", i)("columns", JoinSeq(",", columns)); diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.h b/ydb/core/formats/arrow/reader/read_filter_merger.h index c004316bcc..c5f9de0fdd 100644 --- a/ydb/core/formats/arrow/reader/read_filter_merger.h +++ b/ydb/core/formats/arrow/reader/read_filter_merger.h @@ -3,8 +3,8 @@ #include <ydb/core/formats/arrow/arrow_filter.h> #include <ydb/core/formats/arrow/arrow_helpers.h> #include <ydb/core/formats/arrow/switch/switch_type.h> -#include <ydb/core/formats/arrow/replace_key.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <library/cpp/actors/core/log.h> #include <util/generic/hash.h> #include <util/string/join.h> #include <set> @@ -19,7 +19,7 @@ private: YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Field>>, Fields); public: TSortableScanData() = default; - TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns); + TSortableScanData(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columns); bool IsSameSchema(const std::shared_ptr<arrow::Schema>& schema) const { if (Fields.size() != (size_t)schema->num_fields()) { @@ -103,13 +103,85 @@ public: } }; - static std::optional<TFoundPosition> FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool needGreater); - TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound); + template <class TContainer> + class TAssociatedContainerIterator { + private: + typename TContainer::const_iterator Current; + typename TContainer::const_iterator End; + public: + TAssociatedContainerIterator(const TContainer& container) + : Current(container.begin()) + , End(container.end()) + { + } + + bool IsValid() const { + return Current != End; + } + + void Next() { + ++Current; + } + + const auto& CurrentPosition() const { + return Current->first; + } + }; + + // (-inf, it1), [it1, it2), [it2, it3), ..., [itLast, +inf) + template <class TBordersIterator> + static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBorders(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columnNames, TBordersIterator& it) { + std::vector<std::shared_ptr<arrow::RecordBatch>> result; + if (!batch || batch->num_rows() == 0) { + while (it.IsValid()) { + result.emplace_back(nullptr); + } + result.emplace_back(nullptr); + return result; + } + TSortableBatchPosition pos(batch, 0, columnNames, {}, false); + bool batchFinished = false; + i64 recordsCountSplitted = 0; + for (; it.IsValid() && !batchFinished; it.Next()) { + const ui32 startPos = pos.GetPosition(); + auto posFound = pos.SkipToLower(it.CurrentPosition()); + if (posFound.IsGreater() || posFound.IsEqual()) { + if (posFound.GetPosition() == startPos) { + result.emplace_back(nullptr); + } else { + result.emplace_back(batch->Slice(startPos, posFound.GetPosition() - startPos)); + recordsCountSplitted += result.back()->num_rows(); + } + } else { + result.emplace_back(batch->Slice(startPos, posFound.GetPosition() - startPos + 1)); + recordsCountSplitted += result.back()->num_rows(); + batchFinished = true; + } + } + if (batchFinished) { + for (; it.IsValid(); it.Next()) { + result.emplace_back(nullptr); + } + result.emplace_back(nullptr); + } else { + AFL_VERIFY(!it.IsValid()); + result.emplace_back(batch->Slice(pos.GetPosition())); + AFL_VERIFY(result.back()->num_rows()); + recordsCountSplitted += result.back()->num_rows(); + } + AFL_VERIFY(batch->num_rows() == recordsCountSplitted); + return result; + } - NArrow::TReplaceKey BuildReplaceKey() const { - return NArrow::TReplaceKey::FromBatch(Batch, Position); + template <class TContainer> + static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBordersInAssociativeContainer(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columnNames, const TContainer& container) { + TAssociatedContainerIterator<TContainer> it(container); + return SplitByBorders(batch, columnNames, it); } + static std::optional<TFoundPosition> FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool needGreater, const std::optional<ui32> includedStartPosition); + TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound); + const TSortableScanData& GetData() const { return *Data; } diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt index 971b6148ff..bab0224d42 100644 --- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(abstract) add_subdirectory(compaction) +add_subdirectory(counters) add_library(columnshard-engines-changes) target_link_libraries(columnshard-engines-changes PUBLIC @@ -18,6 +19,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC columnshard-engines-insert_table engines-changes-abstract engines-changes-compaction + engines-changes-counters tx-columnshard-splitter ydb-core-tablet_flat core-tx-tiering diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt index de77580057..96d8d37c04 100644 --- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt @@ -8,6 +8,7 @@ add_subdirectory(abstract) add_subdirectory(compaction) +add_subdirectory(counters) add_library(columnshard-engines-changes) target_link_libraries(columnshard-engines-changes PUBLIC @@ -19,6 +20,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC columnshard-engines-insert_table engines-changes-abstract engines-changes-compaction + engines-changes-counters tx-columnshard-splitter ydb-core-tablet_flat core-tx-tiering diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt index de77580057..96d8d37c04 100644 --- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(abstract) add_subdirectory(compaction) +add_subdirectory(counters) add_library(columnshard-engines-changes) target_link_libraries(columnshard-engines-changes PUBLIC @@ -19,6 +20,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC columnshard-engines-insert_table engines-changes-abstract engines-changes-compaction + engines-changes-counters tx-columnshard-splitter ydb-core-tablet_flat core-tx-tiering diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt index 971b6148ff..bab0224d42 100644 --- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(abstract) add_subdirectory(compaction) +add_subdirectory(counters) add_library(columnshard-engines-changes) target_link_libraries(columnshard-engines-changes PUBLIC @@ -18,6 +19,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC columnshard-engines-insert_table engines-changes-abstract engines-changes-compaction + engines-changes-counters tx-columnshard-splitter ydb-core-tablet_flat core-tx-tiering diff --git a/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..e1f276a65d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(engines-changes-counters) +target_link_libraries(engines-changes-counters PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core + ydb-core-tablet_flat +) +target_sources(engines-changes-counters PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/counters/general.cpp +) diff --git a/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..ceccef447f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-aarch64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(engines-changes-counters) +target_link_libraries(engines-changes-counters PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core + ydb-core-tablet_flat +) +target_sources(engines-changes-counters PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/counters/general.cpp +) diff --git a/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..ceccef447f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(engines-changes-counters) +target_link_libraries(engines-changes-counters PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core + ydb-core-tablet_flat +) +target_sources(engines-changes-counters PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/counters/general.cpp +) diff --git a/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.txt b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..e1f276a65d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.windows-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(engines-changes-counters) +target_link_libraries(engines-changes-counters PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core + ydb-core-tablet_flat +) +target_sources(engines-changes-counters PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/counters/general.cpp +) diff --git a/ydb/core/tx/columnshard/engines/changes/counters/general.cpp b/ydb/core/tx/columnshard/engines/changes/counters/general.cpp new file mode 100644 index 0000000000..250a050496 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/counters/general.cpp @@ -0,0 +1,5 @@ +#include "general.h" + +namespace NKikimr::NOlap::NChanges { + +} diff --git a/ydb/core/tx/columnshard/engines/changes/counters/ya.make b/ydb/core/tx/columnshard/engines/changes/counters/ya.make new file mode 100644 index 0000000000..66ca3166da --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/counters/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + general.cpp +) + +PEERDIR( + ydb/core/protos + library/cpp/actors/core + ydb/core/tablet_flat +) + +END() diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 6677379763..2335394095 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -109,6 +109,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont } Y_ABORT_UNLESS(Blobs.empty()); + const std::vector<std::string> comparableColumns = resultSchema->GetIndexInfo().GetReplaceKey()->field_names(); for (auto& [pathId, batches] : pathBatches) { auto newGranuleId = AddPathIfNotExists(pathId); NIndexedReader::TMergePartialStream stream(resultSchema->GetIndexInfo().GetReplaceKey(), resultSchema->GetIndexInfo().ArrowSchemaWithSpecials(), false); @@ -126,16 +127,24 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont stream.SetPossibleSameVersion(true); stream.DrainAll(builder); - std::map<NArrow::TReplaceKey, ui64> markers; - for (auto&& i : PathToGranule[pathId]) { - markers[i.first.BuildReplaceKey()] = i.second; - } THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> batchChunks; - if (markers.empty()) { + if (PathToGranule[pathId].empty()) { AFL_VERIFY(newGranuleId); batchChunks[*newGranuleId].emplace_back(builder.Finalize()); } else { - batchChunks = TMarksGranules::SliceIntoGranules(builder.Finalize(), markers, resultSchema->GetIndexInfo()); + auto& markers = PathToGranule[pathId]; + std::vector<std::shared_ptr<arrow::RecordBatch>> result = NIndexedReader::TSortableBatchPosition::SplitByBordersInAssociativeContainer(builder.Finalize(), comparableColumns, markers); + AFL_VERIFY(result.size() == markers.size() + 1)("result", result.size())("markers", markers.size()); + ui32 idx = 0; + for (auto&& i : markers) { + auto chunk = result[++idx]; + if (chunk) { + batchChunks[i.second].emplace_back(chunk); + } + } + if (result.front()) { + batchChunks[markers.begin()->second].emplace_back(result.front()); + } } for (auto&& g : batchChunks) { for (auto&& b : g.second) { diff --git a/ydb/core/tx/columnshard/engines/changes/ya.make b/ydb/core/tx/columnshard/engines/changes/ya.make index acb243b789..fb399e43a8 100644 --- a/ydb/core/tx/columnshard/engines/changes/ya.make +++ b/ydb/core/tx/columnshard/engines/changes/ya.make @@ -16,6 +16,7 @@ PEERDIR( ydb/core/tx/columnshard/engines/insert_table ydb/core/tx/columnshard/engines/changes/abstract ydb/core/tx/columnshard/engines/changes/compaction + ydb/core/tx/columnshard/engines/changes/counters ydb/core/tx/columnshard/splitter ydb/core/tablet_flat ydb/core/tx/tiering diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp index 6bbdd37c8a..71c086aed9 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp @@ -1,5 +1,7 @@ #include "read_filter_merger.h" +#include <ydb/core/formats/arrow/permutations.h> #include <library/cpp/actors/core/log.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_vector.h> namespace NKikimr::NOlap::NIndexedReader { @@ -132,8 +134,8 @@ bool TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) { } std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition() { - Y_DEBUG_ABORT_UNLESS(SortHeap.Size()); - Y_DEBUG_ABORT_UNLESS(!SortHeap.Current().IsControlPoint()); + Y_ABORT_UNLESS(SortHeap.Size()); + Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint()); TSortableBatchPosition result = SortHeap.Current().GetKeyColumns(); TSortableBatchPosition resultVersion = SortHeap.Current().GetVersionColumns(); bool isFirst = true; |