diff options
author | ivanmorozov <[email protected]> | 2023-10-04 16:11:44 +0300 |
---|---|---|
committer | ivanmorozov <[email protected]> | 2023-10-04 16:44:34 +0300 |
commit | f09b8bf859a6a17ef94825f878606458b2488e71 (patch) | |
tree | 5678269c09f571a9ef825ac7c85d272ce9ab915d | |
parent | e65261efc674e38a075188ba2278d042a33e169c (diff) |
KIKIMR-19211: min/max snapshot
37 files changed, 604 insertions, 318 deletions
diff --git a/.mapping.json b/.mapping.json index bcd78fb5638..8c7d862863b 100644 --- a/.mapping.json +++ b/.mapping.json @@ -3976,6 +3976,11 @@ "ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt":"", "ydb/core/formats/arrow/dictionary/CMakeLists.txt":"", "ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt":"", + "ydb/core/formats/arrow/reader/CMakeLists.darwin-x86_64.txt":"", + "ydb/core/formats/arrow/reader/CMakeLists.linux-aarch64.txt":"", + "ydb/core/formats/arrow/reader/CMakeLists.linux-x86_64.txt":"", + "ydb/core/formats/arrow/reader/CMakeLists.txt":"", + "ydb/core/formats/arrow/reader/CMakeLists.windows-x86_64.txt":"", "ydb/core/formats/arrow/serializer/CMakeLists.darwin-x86_64.txt":"", "ydb/core/formats/arrow/serializer/CMakeLists.linux-aarch64.txt":"", "ydb/core/formats/arrow/serializer/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt index 9e497a26d76..59de4d93dfc 100644 --- a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt @@ -9,6 +9,7 @@ add_subdirectory(common) add_subdirectory(compression) add_subdirectory(dictionary) +add_subdirectory(reader) add_subdirectory(serializer) add_subdirectory(simple_builder) add_subdirectory(switch) @@ -31,6 +32,7 @@ target_link_libraries(core-formats-arrow PUBLIC formats-arrow-simple_builder formats-arrow-dictionary formats-arrow-transformer + formats-arrow-reader cpp-actors-core ydb-library-arrow_kernels ydb-library-binary_json diff --git a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt index 6dffeca31e0..abcf2a17694 100644 --- a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt +++ b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt @@ -9,6 +9,7 @@ add_subdirectory(common) add_subdirectory(compression) add_subdirectory(dictionary) +add_subdirectory(reader) add_subdirectory(serializer) add_subdirectory(simple_builder) add_subdirectory(switch) @@ -32,6 +33,7 @@ target_link_libraries(core-formats-arrow PUBLIC formats-arrow-simple_builder formats-arrow-dictionary formats-arrow-transformer + formats-arrow-reader cpp-actors-core ydb-library-arrow_kernels ydb-library-binary_json diff --git a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt index 6dffeca31e0..abcf2a17694 100644 --- a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt @@ -9,6 +9,7 @@ add_subdirectory(common) add_subdirectory(compression) add_subdirectory(dictionary) +add_subdirectory(reader) add_subdirectory(serializer) add_subdirectory(simple_builder) add_subdirectory(switch) @@ -32,6 +33,7 @@ target_link_libraries(core-formats-arrow PUBLIC formats-arrow-simple_builder formats-arrow-dictionary formats-arrow-transformer + formats-arrow-reader cpp-actors-core ydb-library-arrow_kernels ydb-library-binary_json diff --git a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt index 618a9d0bd73..44d27d39280 100644 --- a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt @@ -9,6 +9,7 @@ add_subdirectory(common) add_subdirectory(compression) add_subdirectory(dictionary) +add_subdirectory(reader) add_subdirectory(serializer) add_subdirectory(simple_builder) add_subdirectory(switch) @@ -32,6 +33,7 @@ target_link_libraries(core-formats-arrow PUBLIC formats-arrow-simple_builder formats-arrow-dictionary formats-arrow-transformer + formats-arrow-reader cpp-actors-core ydb-library-arrow_kernels ydb-library-binary_json diff --git a/ydb/core/formats/arrow/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/reader/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..161c7752003 --- /dev/null +++ b/ydb/core/formats/arrow/reader/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-reader) +target_link_libraries(formats-arrow-reader PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-simple_builder + formats-arrow-switch + cpp-actors-core +) +target_sources(formats-arrow-reader PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/reader/read_filter_merger.cpp +) diff --git a/ydb/core/formats/arrow/reader/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/reader/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..6f2cffc30c5 --- /dev/null +++ b/ydb/core/formats/arrow/reader/CMakeLists.linux-aarch64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-reader) +target_link_libraries(formats-arrow-reader PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-simple_builder + formats-arrow-switch + cpp-actors-core +) +target_sources(formats-arrow-reader PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/reader/read_filter_merger.cpp +) diff --git a/ydb/core/formats/arrow/reader/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/reader/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..6f2cffc30c5 --- /dev/null +++ b/ydb/core/formats/arrow/reader/CMakeLists.linux-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-reader) +target_link_libraries(formats-arrow-reader PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-simple_builder + formats-arrow-switch + cpp-actors-core +) +target_sources(formats-arrow-reader PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/reader/read_filter_merger.cpp +) diff --git a/ydb/core/formats/arrow/reader/CMakeLists.txt b/ydb/core/formats/arrow/reader/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/formats/arrow/reader/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/formats/arrow/reader/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/reader/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..161c7752003 --- /dev/null +++ b/ydb/core/formats/arrow/reader/CMakeLists.windows-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(formats-arrow-reader) +target_link_libraries(formats-arrow-reader PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + formats-arrow-simple_builder + formats-arrow-switch + cpp-actors-core +) +target_sources(formats-arrow-reader PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/reader/read_filter_merger.cpp +) diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.cpp b/ydb/core/formats/arrow/reader/read_filter_merger.cpp new file mode 100644 index 00000000000..29a35d5aa56 --- /dev/null +++ b/ydb/core/formats/arrow/reader/read_filter_merger.cpp @@ -0,0 +1,99 @@ +#include "read_filter_merger.h" +#include <library/cpp/actors/core/log.h> + +namespace NKikimr::NOlap::NIndexedReader { + +NJson::TJsonValue TSortableBatchPosition::DebugJson() const { + NJson::TJsonValue result; + result["reverse"] = ReverseSort; + result["records_count"] = RecordsCount; + result["position"] = Position; + result["sorting"] = Sorting->DebugJson(Position); + if (Data) { + result["data"] = Data->DebugJson(Position); + } + return result; +} + +std::optional<ui64> TSortableBatchPosition::FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool greater, const bool include) { + if (!batch || !batch->num_rows()) { + return {}; + } + + const auto checkEqualBorder = [batch, greater, include](const i64 position) ->std::optional<i64> { + if (include) { + return position; + } else if (greater) { + if (batch->num_rows() > position + 1) { + return position + 1; + } else { + return {}; + } + } else { + if (position) { + return position - 1; + } else { + return {}; + } + } + }; + + i64 posStart = 0; + i64 posFinish = batch->num_rows() - 1; + TSortableBatchPosition position = forFound.BuildSame(batch, posStart); + { + position.InitPosition(posStart); + auto cmp = position.Compare(forFound); + if (cmp == std::partial_ordering::greater) { + if (greater) { + return posStart; + } else { + return {}; + } + } else if (cmp == std::partial_ordering::equivalent) { + return checkEqualBorder(posStart); + } + } + { + position.InitPosition(posFinish); + auto cmp = position.Compare(forFound); + if (cmp == std::partial_ordering::less) { + if (greater) { + return {}; + } else { + return posFinish; + } + } else if (cmp == std::partial_ordering::equivalent) { + return checkEqualBorder(posFinish); + } + } + while (posFinish > posStart + 1) { + Y_VERIFY(position.InitPosition(0.5 * (posStart + posFinish))); + const auto comparision = position.Compare(forFound); + if (comparision == std::partial_ordering::less) { + posStart = position.Position; + } else if (comparision == std::partial_ordering::greater) { + posFinish = position.Position; + } else { + return checkEqualBorder(position.Position); + } + } + Y_VERIFY(posFinish != posStart); + if (greater) { + return posFinish; + } else { + return posStart; + } +} + +TSortableScanData::TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns) { + for (auto&& i : columns) { + auto c = batch->GetColumnByName(i); + AFL_VERIFY(c)("column_name", i)("columns", JoinSeq(",", columns)); + Columns.emplace_back(c); + auto f = batch->schema()->GetFieldByName(i); + Fields.emplace_back(f); + } +} + +} diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.h b/ydb/core/formats/arrow/reader/read_filter_merger.h new file mode 100644 index 00000000000..78967acdb84 --- /dev/null +++ b/ydb/core/formats/arrow/reader/read_filter_merger.h @@ -0,0 +1,163 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/formats/arrow/arrow_filter.h> +#include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/switch/switch_type.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <util/generic/hash.h> +#include <util/string/join.h> +#include <set> + +namespace NKikimr::NOlap::NIndexedReader { + +class TRecordBatchBuilder; + +class TSortableScanData { +private: + YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Array>>, Columns); + YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Field>>, Fields); +public: + TSortableScanData() = default; + TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns); + + bool IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const { + if (Fields.size() != (size_t)schema->num_fields()) { + return false; + } + for (ui32 i = 0; i < Fields.size(); ++i) { + if (Fields[i]->type() != schema->field(i)->type()) { + return false; + } + if (Fields[i]->name() != schema->field(i)->name()) { + return false; + } + } + return true; + } + + NJson::TJsonValue DebugJson(const i32 position) const { + NJson::TJsonValue result = NJson::JSON_MAP; + for (ui32 i = 0; i < Columns.size(); ++i) { + auto& jsonColumn = result["sorting_columns"].AppendValue(NJson::JSON_MAP); + jsonColumn["name"] = Fields[i]->name(); + if (position >= 0 && position < Columns[i]->length()) { + jsonColumn["value"] = NArrow::DebugString(Columns[i], position); + } + } + return result; + } + + std::vector<std::string> GetFieldNames() const { + std::vector<std::string> result; + for (auto&& i : Fields) { + result.emplace_back(i->name()); + } + return result; + } +}; + +class TSortableBatchPosition { +protected: + + YDB_READONLY(i64, Position, 0); + i64 RecordsCount = 0; + bool ReverseSort = false; + std::shared_ptr<TSortableScanData> Sorting; + std::shared_ptr<TSortableScanData> Data; + std::shared_ptr<arrow::RecordBatch> Batch; + static std::optional<ui64> FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool needGreater, const bool include); + +public: + TSortableBatchPosition() = default; + + const TSortableScanData& GetData() const { + return *Data; + } + + bool IsReverseSort() const { + return ReverseSort; + } + NJson::TJsonValue DebugJson() const; + + TSortableBatchPosition BuildSame(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position) const { + return TSortableBatchPosition(batch, position, Sorting->GetFieldNames(), Data->GetFieldNames(), ReverseSort); + } + + bool IsSameSortingSchema(const std::shared_ptr<arrow::Schema>& schema) { + return Sorting->IsSameSchema(schema); + } + + static std::shared_ptr<arrow::RecordBatch> SelectInterval(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& from, const TSortableBatchPosition& to, const bool includeFrom, const bool includeTo) { + if (!batch) { + return nullptr; + } + Y_VERIFY(from.Compare(to) != std::partial_ordering::greater); + const std::optional<ui32> idxFrom = FindPosition(batch, from, true, includeFrom); + const std::optional<ui32> idxTo = FindPosition(batch, to, false, includeTo); + if (!idxFrom || !idxTo || *idxTo < *idxFrom) { + return nullptr; + } + return batch->Slice(*idxFrom, *idxTo - *idxFrom + 1); + } + + TSortableBatchPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position, const std::vector<std::string>& sortingColumns, const std::vector<std::string>& dataColumns, const bool reverseSort) + : Position(position) + , RecordsCount(batch->num_rows()) + , ReverseSort(reverseSort) + , Sorting(std::make_shared<TSortableScanData>(batch, sortingColumns)) + , Batch(batch) + { + if (dataColumns.size()) { + Data = std::make_shared<TSortableScanData>(batch, dataColumns); + } + Y_VERIFY(batch->num_rows()); + Y_VERIFY_DEBUG(batch->ValidateFull().ok()); + Y_VERIFY(Sorting->GetColumns().size()); + } + + std::partial_ordering Compare(const TSortableBatchPosition& item) const { + Y_VERIFY(item.ReverseSort == ReverseSort); + Y_VERIFY(item.Sorting->GetColumns().size() == Sorting->GetColumns().size()); + const auto directResult = NArrow::ColumnsCompare(Sorting->GetColumns(), Position, item.Sorting->GetColumns(), item.Position); + if (ReverseSort) { + if (directResult == std::partial_ordering::less) { + return std::partial_ordering::greater; + } else if (directResult == std::partial_ordering::greater) { + return std::partial_ordering::less; + } else { + return std::partial_ordering::equivalent; + } + } else { + return directResult; + } + } + + bool operator<(const TSortableBatchPosition& item) const { + return Compare(item) == std::partial_ordering::less; + } + + bool operator==(const TSortableBatchPosition& item) const { + return Compare(item) == std::partial_ordering::equivalent; + } + + bool operator!=(const TSortableBatchPosition& item) const { + return Compare(item) != std::partial_ordering::equivalent; + } + + bool NextPosition(const i64 delta) { + return InitPosition(Position + delta); + } + + bool InitPosition(const i64 position) { + if (position < RecordsCount && position >= 0) { + Position = position; + return true; + } else { + return false; + } + + } + +}; + +} diff --git a/ydb/core/formats/arrow/reader/ya.make b/ydb/core/formats/arrow/reader/ya.make new file mode 100644 index 00000000000..540b895abf3 --- /dev/null +++ b/ydb/core/formats/arrow/reader/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +PEERDIR( + contrib/libs/apache/arrow + ydb/core/formats/arrow/simple_builder + ydb/core/formats/arrow/switch + library/cpp/actors/core +) + +SRCS( + read_filter_merger.cpp +) + +END() diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp index 31f21b52954..7f42862045d 100644 --- a/ydb/core/formats/arrow/special_keys.cpp +++ b/ydb/core/formats/arrow/special_keys.cpp @@ -1,5 +1,6 @@ #include "special_keys.h" #include "permutations.h" +#include "reader/read_filter_merger.h" #include <ydb/core/formats/arrow/serializer/full.h> #include <ydb/core/formats/arrow/arrow_helpers.h> #include <ydb/core/formats/arrow/arrow_filter.h> @@ -27,6 +28,10 @@ TString TSpecialKeys::SerializeToString() const { return NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(Data); } +TString TSpecialKeys::SerializeToStringDataOnlyNoCompression() const { + return NArrow::SerializeBatchNoCompression(Data); +} + TFirstLastSpecialKeys::TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames /*= {}*/) { Y_VERIFY(batch); Y_VERIFY(batch->num_rows()); @@ -40,13 +45,69 @@ TFirstLastSpecialKeys::TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> } Data = NArrow::CopyRecords(keyBatch, indexes); + Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2); +} + +TMinMaxSpecialKeys::TMinMaxSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::shared_ptr<arrow::Schema>& schema) { + Y_VERIFY(batch); + Y_VERIFY(batch->num_rows()); + Y_VERIFY(schema); + + NOlap::NIndexedReader::TSortableBatchPosition record(batch, 0, schema->field_names(), {}, false); + std::optional<NOlap::NIndexedReader::TSortableBatchPosition> minValue; + std::optional<NOlap::NIndexedReader::TSortableBatchPosition> maxValue; + while (true) { + if (!minValue || minValue->Compare(record) == std::partial_ordering::greater) { + minValue = record; + } + if (!maxValue || maxValue->Compare(record) == std::partial_ordering::less) { + maxValue = record; + } + if (!record.NextPosition(1)) { + break; + } + } + Y_VERIFY(minValue && maxValue); + std::vector<ui64> indexes; + indexes.emplace_back(minValue->GetPosition()); + if (maxValue->GetPosition() != minValue->GetPosition()) { + indexes.emplace_back(maxValue->GetPosition()); + } + + std::vector<TString> columnNamesString; + for (auto&& i : schema->field_names()) { + columnNamesString.emplace_back(i); + } + + auto dataBatch = NArrow::ExtractColumns(batch, columnNamesString); + Data = NArrow::CopyRecords(dataBatch, indexes); + Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2); } TFirstLastSpecialKeys::TFirstLastSpecialKeys(const TString& data) + : TBase(data) { + Y_VERIFY_DEBUG(Data->ValidateFull().ok()); + Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2); +} + +std::shared_ptr<NKikimr::NArrow::TFirstLastSpecialKeys> TFirstLastSpecialKeys::BuildAccordingToSchemaVerified(const std::shared_ptr<arrow::Schema>& schema) const { + auto newData = NArrow::ExtractColumns(Data, schema); + AFL_VERIFY(newData); + return std::make_shared<TFirstLastSpecialKeys>(newData); +} + +TMinMaxSpecialKeys::TMinMaxSpecialKeys(const TString& data) : TBase(data) { Y_VERIFY_DEBUG(Data->ValidateFull().ok()); Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2); } +std::shared_ptr<NKikimr::NArrow::TMinMaxSpecialKeys> TMinMaxSpecialKeys::BuildAccordingToSchemaVerified(const std::shared_ptr<arrow::Schema>& schema) const { + auto newData = NArrow::ExtractColumns(Data, schema); + AFL_VERIFY(newData); + std::shared_ptr<TMinMaxSpecialKeys> result(new TMinMaxSpecialKeys(newData)); + return result; +} + } diff --git a/ydb/core/formats/arrow/special_keys.h b/ydb/core/formats/arrow/special_keys.h index 135e3aff11f..98ad72012fb 100644 --- a/ydb/core/formats/arrow/special_keys.h +++ b/ydb/core/formats/arrow/special_keys.h @@ -20,6 +20,13 @@ protected: } public: + TString SerializeToStringDataOnlyNoCompression() const; + + TSpecialKeys(const TString& data, const std::shared_ptr<arrow::Schema>& schema) { + Data = NArrow::DeserializeBatch(data, schema); + Y_VERIFY(Data); + Y_VERIFY_DEBUG(Data->ValidateFull().ok()); + } TSpecialKeys(const TString& data) { Y_VERIFY(DeserializeFromString(data)); @@ -36,16 +43,52 @@ public: return Data; } - std::optional<TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema) const { + std::shared_ptr<TFirstLastSpecialKeys> BuildAccordingToSchemaVerified(const std::shared_ptr<arrow::Schema>& schema) const; + + std::optional<TReplaceKey> GetFirst(const std::shared_ptr<arrow::Schema>& schema = nullptr) const { return GetKeyByIndex(0, schema); } - std::optional<TReplaceKey> GetMax(const std::shared_ptr<arrow::Schema>& schema) const { + std::optional<TReplaceKey> GetLast(const std::shared_ptr<arrow::Schema>& schema = nullptr) const { return GetKeyByIndex(Data->num_rows() - 1, schema); } explicit TFirstLastSpecialKeys(const TString& data); - + explicit TFirstLastSpecialKeys(const TString& data, const std::shared_ptr<arrow::Schema>& schema) + : TBase(data, schema) + { + Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2); + } explicit TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames = {}); }; +class TMinMaxSpecialKeys: public TSpecialKeys { +private: + using TBase = TSpecialKeys; +protected: + TMinMaxSpecialKeys(std::shared_ptr<arrow::RecordBatch> data) + : TBase(data) { + } +public: + std::shared_ptr<TMinMaxSpecialKeys> BuildAccordingToSchemaVerified(const std::shared_ptr<arrow::Schema>& schema) const; + + const std::shared_ptr<arrow::RecordBatch>& GetBatch() const { + return Data; + } + + std::optional<TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema = nullptr) const { + return GetKeyByIndex(0, schema); + } + std::optional<TReplaceKey> GetMax(const std::shared_ptr<arrow::Schema>& schema = nullptr) const { + return GetKeyByIndex(Data->num_rows() - 1, schema); + } + + explicit TMinMaxSpecialKeys(const TString& data); + explicit TMinMaxSpecialKeys(const TString& data, const std::shared_ptr<arrow::Schema>& schema) + : TBase(data, schema) { + Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2); + } + + explicit TMinMaxSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::shared_ptr<arrow::Schema>& schema); +}; + } diff --git a/ydb/core/formats/arrow/ya.make b/ydb/core/formats/arrow/ya.make index 416cefc1406..c26eb6bb844 100644 --- a/ydb/core/formats/arrow/ya.make +++ b/ydb/core/formats/arrow/ya.make @@ -11,6 +11,7 @@ PEERDIR( ydb/core/formats/arrow/simple_builder ydb/core/formats/arrow/dictionary ydb/core/formats/arrow/transformer + ydb/core/formats/arrow/reader library/cpp/actors/core ydb/library/arrow_kernels ydb/library/binary_json diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index f9677baaf11..f53fbcc9d73 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -259,6 +259,11 @@ message TIndexGranuleMeta { optional uint32 MarkSize = 1; // Composite key mark (granule border) size: count of first PK elements in mark } +message TSnapshot { + optional uint64 PlanStep = 1; + optional uint64 TxId = 2; +} + message TIndexPortionMeta { oneof Produced { bool IsInserted = 1; @@ -268,6 +273,8 @@ message TIndexPortionMeta { } optional string TierName = 5; optional bytes PrimaryKeyBorders = 6; // arrow::RecordBatch with first and last ReplaceKey rows + optional TSnapshot RecordSnapshotMin = 7; + optional TSnapshot RecordSnapshotMax = 8; } message TIndexColumnMeta { diff --git a/ydb/core/tx/columnshard/background_controller.h b/ydb/core/tx/columnshard/background_controller.h index 056c6d94cf8..6fba6a4cd5a 100644 --- a/ydb/core/tx/columnshard/background_controller.h +++ b/ydb/core/tx/columnshard/background_controller.h @@ -81,7 +81,7 @@ public: bool IsIndexingActive() const { return ActiveIndexing; } - bool GetIndexingActiveCount() const { + i64 GetIndexingActiveCount() const { return ActiveIndexing; } diff --git a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt index 9f20f37bd4d..a059465374e 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt @@ -19,6 +19,7 @@ target_link_libraries(tx-columnshard-common PUBLIC yutil ydb-core-protos libs-apache-arrow + core-formats-arrow tools-enum_parser-enum_serialization_runtime ) target_sources(tx-columnshard-common PRIVATE diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt index 56fe56b09ee..1ef95311730 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt @@ -20,6 +20,7 @@ target_link_libraries(tx-columnshard-common PUBLIC yutil ydb-core-protos libs-apache-arrow + core-formats-arrow tools-enum_parser-enum_serialization_runtime ) target_sources(tx-columnshard-common PRIVATE diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt index 56fe56b09ee..1ef95311730 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt @@ -20,6 +20,7 @@ target_link_libraries(tx-columnshard-common PUBLIC yutil ydb-core-protos libs-apache-arrow + core-formats-arrow tools-enum_parser-enum_serialization_runtime ) target_sources(tx-columnshard-common PRIVATE diff --git a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt index 9f20f37bd4d..a059465374e 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt @@ -19,6 +19,7 @@ target_link_libraries(tx-columnshard-common PUBLIC yutil ydb-core-protos libs-apache-arrow + core-formats-arrow tools-enum_parser-enum_serialization_runtime ) target_sources(tx-columnshard-common PRIVATE diff --git a/ydb/core/tx/columnshard/common/ya.make b/ydb/core/tx/columnshard/common/ya.make index 50e899dab00..ac748e9ed9c 100644 --- a/ydb/core/tx/columnshard/common/ya.make +++ b/ydb/core/tx/columnshard/common/ya.make @@ -10,6 +10,7 @@ SRCS( PEERDIR( ydb/core/protos contrib/libs/apache/arrow + ydb/core/formats/arrow ) GENERATE_ENUM_SERIALIZATION(portion.h) diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index fb9a45be109..6a137f7748f 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -42,6 +42,9 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc arrow::FieldVector indexFields; indexFields.emplace_back(portionIdField); indexFields.emplace_back(portionRecordIndexField); + for (auto&& i : TIndexInfo::ArrowSchemaSnapshot()->fields()) { + indexFields.emplace_back(i); + } auto dataSchema = std::make_shared<arrow::Schema>(indexFields); NIndexedReader::TMergePartialStream mergeStream(resultSchema->GetIndexInfo().GetReplaceKey(), dataSchema, false); ui32 idx = 0; @@ -67,9 +70,13 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName); auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName); - Y_VERIFY(columnPortionIdx && columnPortionRecordIdx); + auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); + auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); + Y_VERIFY(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx); Y_VERIFY(columnPortionIdx->type_id() == arrow::UInt16Type::type_id); Y_VERIFY(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id); + Y_VERIFY(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id); + Y_VERIFY(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id); const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx); const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx); @@ -89,8 +96,8 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc TColumnMergeContext context(resultSchema, portionRecordsCountLimit, 50 * 1024 * 1024, f, *columnInfo, SaverContext); TMergedColumn mColumn(context); { - auto c = batchResult->GetColumnByName(f->name()); - AFL_VERIFY(!c); +// auto c = batchResult->GetColumnByName(f->name()); +// AFL_VERIFY(!c); AFL_VERIFY(batchResult->num_rows() == pIdxArray.length()); std::vector<TPortionColumnCursor> cursors; auto loader = resultSchema->GetColumnLoader(f->name()); @@ -144,12 +151,16 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc TSimilarSlicer slicer(4 * 1024 * 1024); auto packs = slicer.Split(batchSlices); + ui32 recordIdx = 0; for (auto&& i : packs) { TGeneralSerializedSlice slice(std::move(i)); + auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount()); std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs(); AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetGranuleId(), *maxSnapshot, SaverContext.GetStorageOperator())); - NArrow::TFirstLastSpecialKeys specialKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); - AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, specialKeys, SaverContext.GetTierName()); + NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); + NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); + AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, SaverContext.GetTierName()); + recordIdx += slice.GetRecordsCount(); } if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) { TStringBuilder sbSwitched; diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 3f7dcce7caf..e60288c7fdc 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -93,9 +93,6 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange const TPortionInfo& oldInfo = self.GetGranuleVerified(granule).GetPortionVerified(portion); - auto& granuleStart = self.Granules[granule]->Record.Mark; - - Y_VERIFY(granuleStart <= portionInfo.IndexKeyStart()); self.UpsertPortion(portionInfo, &oldInfo); for (auto& record : portionInfo.Records) { diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index 8bcbfb38df9..4d1f69800ad 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -119,14 +119,14 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& sna } } const auto pred = [pkSchema](const TInsertedData* l, const TInsertedData* r) { - return l->GetMeta().GetMin(pkSchema) < r->GetMeta().GetMin(pkSchema); + return l->GetMeta().GetFirstPK(pkSchema) < r->GetMeta().GetFirstPK(pkSchema); }; std::sort(ret.begin(), ret.end(), pred); std::vector<TCommittedBlob> result; result.reserve(ret.size()); for (auto&& i : ret) { - result.emplace_back(TCommittedBlob(i->GetBlobRange(), i->GetSnapshot(), i->GetSchemaVersion(), i->GetMeta().GetMin(pkSchema), i->GetMeta().GetMax(pkSchema))); + result.emplace_back(TCommittedBlob(i->GetBlobRange(), i->GetSnapshot(), i->GetSchemaVersion(), i->GetMeta().GetFirstPK(pkSchema), i->GetMeta().GetLastPK(pkSchema))); } return result; diff --git a/ydb/core/tx/columnshard/engines/insert_table/meta.h b/ydb/core/tx/columnshard/engines/insert_table/meta.h index 5621e6359f8..d884c3a910a 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/meta.h +++ b/ydb/core/tx/columnshard/engines/insert_table/meta.h @@ -30,16 +30,16 @@ public: RawBytes = proto.GetRawBytes(); } - std::optional<NArrow::TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema) const { + std::optional<NArrow::TReplaceKey> GetFirstPK(const std::shared_ptr<arrow::Schema>& schema) const { if (GetSpecialKeys()) { - return GetSpecialKeys()->GetMin(schema); + return GetSpecialKeys()->GetFirst(schema); } else { return {}; } } - std::optional<NArrow::TReplaceKey> GetMax(const std::shared_ptr<arrow::Schema>& schema) const { + std::optional<NArrow::TReplaceKey> GetLastPK(const std::shared_ptr<arrow::Schema>& schema) const { if (GetSpecialKeys()) { - return GetSpecialKeys()->GetMax(schema); + return GetSpecialKeys()->GetLast(schema); } else { return {}; } diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.cpp b/ydb/core/tx/columnshard/engines/portions/column_record.cpp index 77d59ed74fe..c158dcf3a22 100644 --- a/ydb/core/tx/columnshard/engines/portions/column_record.cpp +++ b/ydb/core/tx/columnshard/engines/portions/column_record.cpp @@ -15,9 +15,11 @@ TChunkMeta::TChunkMeta(const TColumnChunkLoadContext& context, const TIndexInfo& RawBytes = context.GetMetaProto().GetRawBytes(); } if (context.GetMetaProto().HasMinValue()) { + AFL_VERIFY(field)("field_id", context.GetAddress().GetColumnId())("field_name", indexInfo.GetColumnName(context.GetAddress().GetColumnId())); Min = ConstantToScalar(context.GetMetaProto().GetMinValue(), field->type()); } if (context.GetMetaProto().HasMaxValue()) { + AFL_VERIFY(field)("field_id", context.GetAddress().GetColumnId())("field_name", indexInfo.GetColumnName(context.GetAddress().GetColumnId())); Max = ConstantToScalar(context.GetMetaProto().GetMaxValue(), field->type()); } } diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp index 1d560b61075..58303ab6a27 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.cpp +++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp @@ -5,31 +5,27 @@ namespace NKikimr::NOlap { -void TPortionMeta::FillBatchInfo(const NArrow::TFirstLastSpecialKeys& specials, const TIndexInfo& indexInfo) { - auto& batch = specials.GetBatch(); - AFL_VERIFY(batch->num_rows()); +void TPortionMeta::FillBatchInfo(const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TIndexInfo& indexInfo) { { - auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey()); - AFL_VERIFY(keyBatch); - std::vector<bool> bits(batch->num_rows(), false); - bits[0] = true; - bits[batch->num_rows() - 1] = true; // it could be 0 if batch has one row - - auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows()); - auto res = arrow::compute::Filter(keyBatch, filter); - Y_VERIFY(res.ok()); - - ReplaceKeyEdges = res->record_batch(); - Y_VERIFY(ReplaceKeyEdges->num_rows() == 1 || ReplaceKeyEdges->num_rows() == 2); + ReplaceKeyEdges = primaryKeys.BuildAccordingToSchemaVerified(indexInfo.GetReplaceKey()); + IndexKeyStart = ReplaceKeyEdges->GetFirst(); + IndexKeyEnd = ReplaceKeyEdges->GetLast(); } - auto edgesBatch = NArrow::ExtractColumns(ReplaceKeyEdges, indexInfo.GetIndexKey()); - IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0); - IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1); + { + auto cPlanStep = snapshotKeys.GetBatch()->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); + auto cTxId = snapshotKeys.GetBatch()->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); + Y_VERIFY(cPlanStep && cTxId); + Y_VERIFY(cPlanStep->type_id() == arrow::UInt64Type::type_id); + Y_VERIFY(cTxId->type_id() == arrow::UInt64Type::type_id); + const arrow::UInt64Array& cPlanStepArray = static_cast<const arrow::UInt64Array&>(*cPlanStep); + const arrow::UInt64Array& cTxIdArray = static_cast<const arrow::UInt64Array&>(*cTxId); + RecordSnapshotMin = TSnapshot(cPlanStepArray.GetView(0), cTxIdArray.GetView(0)); + RecordSnapshotMax = TSnapshot(cPlanStepArray.GetView(snapshotKeys.GetBatch()->num_rows() - 1), cTxIdArray.GetView(snapshotKeys.GetBatch()->num_rows() - 1)); + } } bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo) { - const bool compositeIndexKey = indexInfo.IsCompositeIndexKey(); if (Produced != TPortionMeta::EProduced::UNSPECIFIED) { AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "parsing duplication"); return true; @@ -49,17 +45,16 @@ bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortio } if (portionMeta.HasPrimaryKeyBorders()) { - ReplaceKeyEdges = NArrow::DeserializeBatch(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey()); - Y_VERIFY(ReplaceKeyEdges); - Y_VERIFY_DEBUG(ReplaceKeyEdges->ValidateFull().ok()); - Y_VERIFY(ReplaceKeyEdges->num_rows() == 1 || ReplaceKeyEdges->num_rows() == 2); + ReplaceKeyEdges = std::make_shared<NArrow::TFirstLastSpecialKeys>(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey()); + IndexKeyStart = ReplaceKeyEdges->GetFirst(); + IndexKeyEnd = ReplaceKeyEdges->GetLast(); + } - if (compositeIndexKey) { - auto edgesBatch = NArrow::ExtractColumns(ReplaceKeyEdges, indexInfo.GetIndexKey()); - Y_VERIFY(edgesBatch); - IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0); - IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1); - } + if (portionMeta.HasRecordSnapshotMin()) { + RecordSnapshotMin = TSnapshot(portionMeta.GetRecordSnapshotMin().GetPlanStep(), portionMeta.GetRecordSnapshotMin().GetTxId()); + } + if (portionMeta.HasRecordSnapshotMax()) { + RecordSnapshotMax = TSnapshot(portionMeta.GetRecordSnapshotMax().GetPlanStep(), portionMeta.GetRecordSnapshotMax().GetTxId()); } return true; } @@ -93,11 +88,17 @@ std::optional<NKikimrTxColumnShard::TIndexPortionMeta> TPortionMeta::SerializeTo break; } - if (const auto& keyEdgesBatch = ReplaceKeyEdges) { - Y_VERIFY(keyEdgesBatch); - Y_VERIFY_DEBUG(keyEdgesBatch->ValidateFull().ok()); - Y_VERIFY(keyEdgesBatch->num_rows() == 1 || keyEdgesBatch->num_rows() == 2); - portionMeta.SetPrimaryKeyBorders(NArrow::SerializeBatchNoCompression(keyEdgesBatch)); + if (ReplaceKeyEdges) { + portionMeta.SetPrimaryKeyBorders(ReplaceKeyEdges->SerializeToStringDataOnlyNoCompression()); + } + + if (RecordSnapshotMin) { + portionMeta.MutableRecordSnapshotMin()->SetPlanStep(RecordSnapshotMin->GetPlanStep()); + portionMeta.MutableRecordSnapshotMin()->SetTxId(RecordSnapshotMin->GetTxId()); + } + if (RecordSnapshotMax) { + portionMeta.MutableRecordSnapshotMax()->SetPlanStep(RecordSnapshotMax->GetPlanStep()); + portionMeta.MutableRecordSnapshotMax()->SetTxId(RecordSnapshotMax->GetTxId()); } return portionMeta; } diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h index ae4305e0675..6207944db82 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.h +++ b/ydb/core/tx/columnshard/engines/portions/meta.h @@ -1,5 +1,6 @@ #pragma once #include <ydb/core/tx/columnshard/common/portion.h> +#include <ydb/core/tx/columnshard/common/snapshot.h> #include <ydb/core/formats/arrow/replace_key.h> #include <ydb/core/formats/arrow/special_keys.h> #include <ydb/core/protos/tx_columnshard.pb.h> @@ -12,14 +13,16 @@ struct TIndexInfo; struct TPortionMeta { private: - void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted); - std::shared_ptr<arrow::RecordBatch> ReplaceKeyEdges; // first and last PK rows + std::shared_ptr<NArrow::TFirstLastSpecialKeys> ReplaceKeyEdges; // first and last PK rows YDB_ACCESSOR_DEF(TString, TierName); public: using EProduced = NPortion::EProduced; std::optional<NArrow::TReplaceKey> IndexKeyStart; std::optional<NArrow::TReplaceKey> IndexKeyEnd; + + std::optional<TSnapshot> RecordSnapshotMin; + std::optional<TSnapshot> RecordSnapshotMax; EProduced Produced{EProduced::UNSPECIFIED}; ui32 FirstPkColumn = 0; @@ -27,7 +30,7 @@ public: std::optional<NKikimrTxColumnShard::TIndexPortionMeta> SerializeToProto(const ui32 columnId, const ui32 chunk) const; - void FillBatchInfo(const NArrow::TFirstLastSpecialKeys& specials, const TIndexInfo& indexInfo); + void FillBatchInfo(const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TIndexInfo& indexInfo); EProduced GetProduced() const { return Produced; diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 4cb9ac33a16..ae943b88ead 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -26,18 +26,16 @@ const TColumnRecord& TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record) return Records.back(); } -void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, - const TString& tierName) { +void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, const TString& tierName) { Y_VERIFY(batch->num_rows() == NumRows()); - AddMetadata(snapshotSchema, NArrow::TFirstLastSpecialKeys(batch), tierName); + AddMetadata(snapshotSchema, NArrow::TFirstLastSpecialKeys(batch), NArrow::TMinMaxSpecialKeys(batch, TIndexInfo::ArrowSchemaSnapshot()), tierName); } -void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& specials, - const TString& tierName) { +void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TString& tierName) { const auto& indexInfo = snapshotSchema.GetIndexInfo(); Meta = {}; Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId(); - Meta.FillBatchInfo(specials, indexInfo); + Meta.FillBatchInfo(primaryKeys, snapshotKeys, indexInfo); Meta.SetTierName(tierName); } @@ -143,20 +141,6 @@ void TPortionInfo::AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& r Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId(); Y_VERIFY(Meta.DeserializeFromProto(*portionMeta, indexInfo)); } - if (!indexInfo.IsCompositeIndexKey() && indexInfo.GetPKFirstColumnId() == rec.ColumnId) { - if (rec.GetMeta().GetMin()) { - auto candidate = NArrow::TReplaceKey::FromScalar(rec.GetMeta().GetMin()); - if (!Meta.IndexKeyStart || candidate < *Meta.IndexKeyStart) { - Meta.IndexKeyStart = candidate; - } - } - if (rec.GetMeta().GetMax()) { - auto candidate = NArrow::TReplaceKey::FromScalar(rec.GetMeta().GetMax()); - if (!Meta.IndexKeyEnd || *Meta.IndexKeyEnd < candidate) { - Meta.IndexKeyEnd = candidate; - } - } - } } bool TPortionInfo::HasPkMinMax() const { diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 5ac673408f8..9cb88c7a5d6 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -222,7 +222,7 @@ public: void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, const TString& tierName); - void AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& specials, + void AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TString& tierName); std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const; @@ -238,6 +238,16 @@ public: return *Meta.IndexKeyEnd; } + const TSnapshot& RecordSnapshotMin() const { + Y_VERIFY(Meta.RecordSnapshotMin); + return *Meta.RecordSnapshotMin; + } + + const TSnapshot& RecordSnapshotMax() const { + Y_VERIFY(Meta.RecordSnapshotMax); + return *Meta.RecordSnapshotMax; + } + ui32 NumRows() const { ui32 result = 0; std::optional<ui32> columnIdFirst; diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp index 832d00a0c0c..ef71c3366ab 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp @@ -3,89 +3,6 @@ namespace NKikimr::NOlap::NIndexedReader { -NJson::TJsonValue TSortableBatchPosition::DebugJson() const { - NJson::TJsonValue result; - result["reverse"] = ReverseSort; - result["records_count"] = RecordsCount; - result["position"] = Position; - result["sorting"] = Sorting->DebugJson(Position); - if (Data) { - result["data"] = Data->DebugJson(Position); - } - return result; -} - -std::optional<ui64> TSortableBatchPosition::FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool greater, const bool include) { - if (!batch || !batch->num_rows()) { - return {}; - } - - const auto checkEqualBorder = [batch, greater, include](const i64 position) ->std::optional<i64> { - if (include) { - return position; - } else if (greater) { - if (batch->num_rows() > position + 1) { - return position + 1; - } else { - return {}; - } - } else { - if (position) { - return position - 1; - } else { - return {}; - } - } - }; - - i64 posStart = 0; - i64 posFinish = batch->num_rows() - 1; - TSortableBatchPosition position = forFound.BuildSame(batch, posStart); - { - position.InitPosition(posStart); - auto cmp = position.Compare(forFound); - if (cmp == std::partial_ordering::greater) { - if (greater) { - return posStart; - } else { - return {}; - } - } else if (cmp == std::partial_ordering::equivalent) { - return checkEqualBorder(posStart); - } - } - { - position.InitPosition(posFinish); - auto cmp = position.Compare(forFound); - if (cmp == std::partial_ordering::less) { - if (greater) { - return {}; - } else { - return posFinish; - } - } else if (cmp == std::partial_ordering::equivalent) { - return checkEqualBorder(posFinish); - } - } - while (posFinish > posStart + 1) { - Y_VERIFY(position.InitPosition(0.5 * (posStart + posFinish))); - const auto comparision = position.Compare(forFound); - if (comparision == std::partial_ordering::less) { - posStart = position.Position; - } else if (comparision == std::partial_ordering::greater) { - posFinish = position.Position; - } else { - return checkEqualBorder(position.Position); - } - } - Y_VERIFY(posFinish != posStart); - if (greater) { - return posFinish; - } else { - return posStart; - } -} - void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition> point) { Y_VERIFY(point); Y_VERIFY(point->IsSameSortingSchema(SortSchema)); @@ -217,16 +134,6 @@ NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const { return result; } -TSortableScanData::TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns) { - for (auto&& i : columns) { - auto c = batch->GetColumnByName(i); - AFL_VERIFY(c)("column_name", i)("columns", JoinSeq(",", columns)); - Columns.emplace_back(c); - auto f = batch->schema()->GetFieldByName(i); - Fields.emplace_back(f); - } -} - void TRecordBatchBuilder::AddRecord(const TSortableBatchPosition& position) { Y_VERIFY_DEBUG(position.GetData().GetColumns().size() == Builders.size()); Y_VERIFY_DEBUG(IsSameFieldsSequence(position.GetData().GetFields(), Fields)); diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h index f4cded77878..ce2bebf5b0c 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h @@ -4,6 +4,7 @@ #include <ydb/core/formats/arrow/arrow_helpers.h> #include <ydb/core/tx/columnshard/engines/index_info.h> #include <ydb/core/formats/arrow/switch/switch_type.h> +#include <ydb/core/formats/arrow/reader/read_filter_merger.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> #include <util/generic/hash.h> #include <util/string/join.h> @@ -13,154 +14,6 @@ namespace NKikimr::NOlap::NIndexedReader { class TRecordBatchBuilder; -class TSortableScanData { -private: - YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Array>>, Columns); - YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Field>>, Fields); -public: - TSortableScanData() = default; - TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns); - - bool IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const { - if (Fields.size() != (size_t)schema->num_fields()) { - return false; - } - for (ui32 i = 0; i < Fields.size(); ++i) { - if (Fields[i]->type() != schema->field(i)->type()) { - return false; - } - if (Fields[i]->name() != schema->field(i)->name()) { - return false; - } - } - return true; - } - - NJson::TJsonValue DebugJson(const i32 position) const { - NJson::TJsonValue result = NJson::JSON_MAP; - for (ui32 i = 0; i < Columns.size(); ++i) { - auto& jsonColumn = result["sorting_columns"].AppendValue(NJson::JSON_MAP); - jsonColumn["name"] = Fields[i]->name(); - if (position >= 0 && position < Columns[i]->length()) { - jsonColumn["value"] = NArrow::DebugString(Columns[i], position); - } - } - return result; - } - - std::vector<std::string> GetFieldNames() const { - std::vector<std::string> result; - for (auto&& i : Fields) { - result.emplace_back(i->name()); - } - return result; - } -}; - -class TSortableBatchPosition { -protected: - - YDB_READONLY(i64, Position, 0); - i64 RecordsCount = 0; - bool ReverseSort = false; - std::shared_ptr<TSortableScanData> Sorting; - std::shared_ptr<TSortableScanData> Data; - std::shared_ptr<arrow::RecordBatch> Batch; - static std::optional<ui64> FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool needGreater, const bool include); - -public: - TSortableBatchPosition() = default; - - const TSortableScanData& GetData() const { - return *Data; - } - - bool IsReverseSort() const { - return ReverseSort; - } - NJson::TJsonValue DebugJson() const; - - TSortableBatchPosition BuildSame(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position) const { - return TSortableBatchPosition(batch, position, Sorting->GetFieldNames(), Data->GetFieldNames(), ReverseSort); - } - - bool IsSameSortingSchema(const std::shared_ptr<arrow::Schema>& schema) { - return Sorting->IsSameSchema(schema); - } - - static std::shared_ptr<arrow::RecordBatch> SelectInterval(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& from, const TSortableBatchPosition& to, const bool includeFrom, const bool includeTo) { - if (!batch) { - return nullptr; - } - Y_VERIFY(from.Compare(to) != std::partial_ordering::greater); - const std::optional<ui32> idxFrom = FindPosition(batch, from, true, includeFrom); - const std::optional<ui32> idxTo = FindPosition(batch, to, false, includeTo); - if (!idxFrom || !idxTo || *idxTo < *idxFrom) { - return nullptr; - } - return batch->Slice(*idxFrom, *idxTo - *idxFrom + 1); - } - - TSortableBatchPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position, const std::vector<std::string>& sortingColumns, const std::vector<std::string>& dataColumns, const bool reverseSort) - : Position(position) - , RecordsCount(batch->num_rows()) - , ReverseSort(reverseSort) - , Sorting(std::make_shared<TSortableScanData>(batch, sortingColumns)) - , Batch(batch) - { - if (dataColumns.size()) { - Data = std::make_shared<TSortableScanData>(batch, dataColumns); - } - Y_VERIFY(batch->num_rows()); - Y_VERIFY_DEBUG(batch->ValidateFull().ok()); - Y_VERIFY(Sorting->GetColumns().size()); - } - - std::partial_ordering Compare(const TSortableBatchPosition& item) const { - Y_VERIFY(item.ReverseSort == ReverseSort); - Y_VERIFY(item.Sorting->GetColumns().size() == Sorting->GetColumns().size()); - const auto directResult = NArrow::ColumnsCompare(Sorting->GetColumns(), Position, item.Sorting->GetColumns(), item.Position); - if (ReverseSort) { - if (directResult == std::partial_ordering::less) { - return std::partial_ordering::greater; - } else if (directResult == std::partial_ordering::greater) { - return std::partial_ordering::less; - } else { - return std::partial_ordering::equivalent; - } - } else { - return directResult; - } - } - - bool operator<(const TSortableBatchPosition& item) const { - return Compare(item) == std::partial_ordering::less; - } - - bool operator==(const TSortableBatchPosition& item) const { - return Compare(item) == std::partial_ordering::equivalent; - } - - bool operator!=(const TSortableBatchPosition& item) const { - return Compare(item) != std::partial_ordering::equivalent; - } - - bool NextPosition(const i64 delta) { - return InitPosition(Position + delta); - } - - bool InitPosition(const i64 position) { - if (position < RecordsCount && position >= 0) { - Position = position; - return true; - } else { - return false; - } - - } - -}; - class TMergePartialStream { private: #ifndef NDEBUG diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 835c311f2b1..a4249a9007c 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -438,4 +438,8 @@ std::optional<TIndexInfo> TIndexInfo::BuildFromProto(const NKikimrSchemeOp::TCol return result; } +std::shared_ptr<arrow::Field> TIndexInfo::SpecialColumnField(const ui32 columnId) const { + return ArrowSchemaSnapshot()->GetFieldByName(GetColumnName(columnId, true)); +} + } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index 2dd955dd6f3..bc53deaa748 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -98,6 +98,11 @@ public: return Id; } + static const std::vector<std::string>& SnapshotColumnNames() { + static std::vector<std::string> result = {SPEC_COL_PLAN_STEP, SPEC_COL_TX_ID}; + return result; + } + std::shared_ptr<arrow::Schema> GetColumnSchema(const ui32 columnId) const; std::shared_ptr<arrow::Schema> GetColumnsSchema(const std::set<ui32>& columnIds) const; TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const; @@ -154,6 +159,7 @@ public: std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<ui32>& columnIds, bool withSpecials = false) const; std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<TString>& columnNames) const; std::shared_ptr<arrow::Field> ArrowColumnField(ui32 columnId) const; + std::shared_ptr<arrow::Field> SpecialColumnField(const ui32 columnId) const; const THashSet<TString>& GetRequiredColumns() const { return RequiredColumns; diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp index 1a13559ac9e..84bd52f6622 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -25,7 +25,6 @@ void TGranuleMeta::UpsertPortion(const TPortionInfo& info) { AFL_VERIFY(record.Valid())("event", "incorrect_record")("record", record.DebugString())("portion", info.DebugString()); } - Y_VERIFY(Record.Mark <= info.IndexKeyStart()); if (it == Portions.end()) { OnBeforeChangePortion(nullptr); auto portionNew = std::make_shared<TPortionInfo>(info); |