summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <[email protected]>2023-10-04 16:11:44 +0300
committerivanmorozov <[email protected]>2023-10-04 16:44:34 +0300
commitf09b8bf859a6a17ef94825f878606458b2488e71 (patch)
tree5678269c09f571a9ef825ac7c85d272ce9ab915d
parente65261efc674e38a075188ba2278d042a33e169c (diff)
KIKIMR-19211: min/max snapshot
-rw-r--r--.mapping.json5
-rw-r--r--ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/formats/arrow/reader/CMakeLists.darwin-x86_64.txt21
-rw-r--r--ydb/core/formats/arrow/reader/CMakeLists.linux-aarch64.txt22
-rw-r--r--ydb/core/formats/arrow/reader/CMakeLists.linux-x86_64.txt22
-rw-r--r--ydb/core/formats/arrow/reader/CMakeLists.txt17
-rw-r--r--ydb/core/formats/arrow/reader/CMakeLists.windows-x86_64.txt21
-rw-r--r--ydb/core/formats/arrow/reader/read_filter_merger.cpp99
-rw-r--r--ydb/core/formats/arrow/reader/read_filter_merger.h163
-rw-r--r--ydb/core/formats/arrow/reader/ya.make14
-rw-r--r--ydb/core/formats/arrow/special_keys.cpp61
-rw-r--r--ydb/core/formats/arrow/special_keys.h49
-rw-r--r--ydb/core/formats/arrow/ya.make1
-rw-r--r--ydb/core/protos/tx_columnshard.proto7
-rw-r--r--ydb/core/tx/columnshard/background_controller.h2
-rw-r--r--ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/common/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.cpp21
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/meta.h8
-rw-r--r--ydb/core/tx/columnshard/engines/portions/column_record.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.cpp69
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.h9
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp24
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h12
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp93
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.h149
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.h6
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.cpp1
37 files changed, 604 insertions, 318 deletions
diff --git a/.mapping.json b/.mapping.json
index bcd78fb5638..8c7d862863b 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -3976,6 +3976,11 @@
"ydb/core/formats/arrow/dictionary/CMakeLists.linux-x86_64.txt":"",
"ydb/core/formats/arrow/dictionary/CMakeLists.txt":"",
"ydb/core/formats/arrow/dictionary/CMakeLists.windows-x86_64.txt":"",
+ "ydb/core/formats/arrow/reader/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/core/formats/arrow/reader/CMakeLists.linux-aarch64.txt":"",
+ "ydb/core/formats/arrow/reader/CMakeLists.linux-x86_64.txt":"",
+ "ydb/core/formats/arrow/reader/CMakeLists.txt":"",
+ "ydb/core/formats/arrow/reader/CMakeLists.windows-x86_64.txt":"",
"ydb/core/formats/arrow/serializer/CMakeLists.darwin-x86_64.txt":"",
"ydb/core/formats/arrow/serializer/CMakeLists.linux-aarch64.txt":"",
"ydb/core/formats/arrow/serializer/CMakeLists.linux-x86_64.txt":"",
diff --git a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
index 9e497a26d76..59de4d93dfc 100644
--- a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
@@ -9,6 +9,7 @@
add_subdirectory(common)
add_subdirectory(compression)
add_subdirectory(dictionary)
+add_subdirectory(reader)
add_subdirectory(serializer)
add_subdirectory(simple_builder)
add_subdirectory(switch)
@@ -31,6 +32,7 @@ target_link_libraries(core-formats-arrow PUBLIC
formats-arrow-simple_builder
formats-arrow-dictionary
formats-arrow-transformer
+ formats-arrow-reader
cpp-actors-core
ydb-library-arrow_kernels
ydb-library-binary_json
diff --git a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
index 6dffeca31e0..abcf2a17694 100644
--- a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
@@ -9,6 +9,7 @@
add_subdirectory(common)
add_subdirectory(compression)
add_subdirectory(dictionary)
+add_subdirectory(reader)
add_subdirectory(serializer)
add_subdirectory(simple_builder)
add_subdirectory(switch)
@@ -32,6 +33,7 @@ target_link_libraries(core-formats-arrow PUBLIC
formats-arrow-simple_builder
formats-arrow-dictionary
formats-arrow-transformer
+ formats-arrow-reader
cpp-actors-core
ydb-library-arrow_kernels
ydb-library-binary_json
diff --git a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
index 6dffeca31e0..abcf2a17694 100644
--- a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
@@ -9,6 +9,7 @@
add_subdirectory(common)
add_subdirectory(compression)
add_subdirectory(dictionary)
+add_subdirectory(reader)
add_subdirectory(serializer)
add_subdirectory(simple_builder)
add_subdirectory(switch)
@@ -32,6 +33,7 @@ target_link_libraries(core-formats-arrow PUBLIC
formats-arrow-simple_builder
formats-arrow-dictionary
formats-arrow-transformer
+ formats-arrow-reader
cpp-actors-core
ydb-library-arrow_kernels
ydb-library-binary_json
diff --git a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
index 618a9d0bd73..44d27d39280 100644
--- a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
@@ -9,6 +9,7 @@
add_subdirectory(common)
add_subdirectory(compression)
add_subdirectory(dictionary)
+add_subdirectory(reader)
add_subdirectory(serializer)
add_subdirectory(simple_builder)
add_subdirectory(switch)
@@ -32,6 +33,7 @@ target_link_libraries(core-formats-arrow PUBLIC
formats-arrow-simple_builder
formats-arrow-dictionary
formats-arrow-transformer
+ formats-arrow-reader
cpp-actors-core
ydb-library-arrow_kernels
ydb-library-binary_json
diff --git a/ydb/core/formats/arrow/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/reader/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..161c7752003
--- /dev/null
+++ b/ydb/core/formats/arrow/reader/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-reader)
+target_link_libraries(formats-arrow-reader PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-simple_builder
+ formats-arrow-switch
+ cpp-actors-core
+)
+target_sources(formats-arrow-reader PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/reader/read_filter_merger.cpp
+)
diff --git a/ydb/core/formats/arrow/reader/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/reader/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..6f2cffc30c5
--- /dev/null
+++ b/ydb/core/formats/arrow/reader/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-reader)
+target_link_libraries(formats-arrow-reader PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-simple_builder
+ formats-arrow-switch
+ cpp-actors-core
+)
+target_sources(formats-arrow-reader PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/reader/read_filter_merger.cpp
+)
diff --git a/ydb/core/formats/arrow/reader/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/reader/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..6f2cffc30c5
--- /dev/null
+++ b/ydb/core/formats/arrow/reader/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-reader)
+target_link_libraries(formats-arrow-reader PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-simple_builder
+ formats-arrow-switch
+ cpp-actors-core
+)
+target_sources(formats-arrow-reader PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/reader/read_filter_merger.cpp
+)
diff --git a/ydb/core/formats/arrow/reader/CMakeLists.txt b/ydb/core/formats/arrow/reader/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/formats/arrow/reader/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/formats/arrow/reader/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/reader/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..161c7752003
--- /dev/null
+++ b/ydb/core/formats/arrow/reader/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(formats-arrow-reader)
+target_link_libraries(formats-arrow-reader PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ formats-arrow-simple_builder
+ formats-arrow-switch
+ cpp-actors-core
+)
+target_sources(formats-arrow-reader PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/reader/read_filter_merger.cpp
+)
diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.cpp b/ydb/core/formats/arrow/reader/read_filter_merger.cpp
new file mode 100644
index 00000000000..29a35d5aa56
--- /dev/null
+++ b/ydb/core/formats/arrow/reader/read_filter_merger.cpp
@@ -0,0 +1,99 @@
+#include "read_filter_merger.h"
+#include <library/cpp/actors/core/log.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+NJson::TJsonValue TSortableBatchPosition::DebugJson() const {
+ NJson::TJsonValue result;
+ result["reverse"] = ReverseSort;
+ result["records_count"] = RecordsCount;
+ result["position"] = Position;
+ result["sorting"] = Sorting->DebugJson(Position);
+ if (Data) {
+ result["data"] = Data->DebugJson(Position);
+ }
+ return result;
+}
+
+std::optional<ui64> TSortableBatchPosition::FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool greater, const bool include) {
+ if (!batch || !batch->num_rows()) {
+ return {};
+ }
+
+ const auto checkEqualBorder = [batch, greater, include](const i64 position) ->std::optional<i64> {
+ if (include) {
+ return position;
+ } else if (greater) {
+ if (batch->num_rows() > position + 1) {
+ return position + 1;
+ } else {
+ return {};
+ }
+ } else {
+ if (position) {
+ return position - 1;
+ } else {
+ return {};
+ }
+ }
+ };
+
+ i64 posStart = 0;
+ i64 posFinish = batch->num_rows() - 1;
+ TSortableBatchPosition position = forFound.BuildSame(batch, posStart);
+ {
+ position.InitPosition(posStart);
+ auto cmp = position.Compare(forFound);
+ if (cmp == std::partial_ordering::greater) {
+ if (greater) {
+ return posStart;
+ } else {
+ return {};
+ }
+ } else if (cmp == std::partial_ordering::equivalent) {
+ return checkEqualBorder(posStart);
+ }
+ }
+ {
+ position.InitPosition(posFinish);
+ auto cmp = position.Compare(forFound);
+ if (cmp == std::partial_ordering::less) {
+ if (greater) {
+ return {};
+ } else {
+ return posFinish;
+ }
+ } else if (cmp == std::partial_ordering::equivalent) {
+ return checkEqualBorder(posFinish);
+ }
+ }
+ while (posFinish > posStart + 1) {
+ Y_VERIFY(position.InitPosition(0.5 * (posStart + posFinish)));
+ const auto comparision = position.Compare(forFound);
+ if (comparision == std::partial_ordering::less) {
+ posStart = position.Position;
+ } else if (comparision == std::partial_ordering::greater) {
+ posFinish = position.Position;
+ } else {
+ return checkEqualBorder(position.Position);
+ }
+ }
+ Y_VERIFY(posFinish != posStart);
+ if (greater) {
+ return posFinish;
+ } else {
+ return posStart;
+ }
+}
+
+TSortableScanData::TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns) {
+ for (auto&& i : columns) {
+ auto c = batch->GetColumnByName(i);
+ AFL_VERIFY(c)("column_name", i)("columns", JoinSeq(",", columns));
+ Columns.emplace_back(c);
+ auto f = batch->schema()->GetFieldByName(i);
+ Fields.emplace_back(f);
+ }
+}
+
+}
diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.h b/ydb/core/formats/arrow/reader/read_filter_merger.h
new file mode 100644
index 00000000000..78967acdb84
--- /dev/null
+++ b/ydb/core/formats/arrow/reader/read_filter_merger.h
@@ -0,0 +1,163 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/formats/arrow/arrow_filter.h>
+#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/formats/arrow/switch/switch_type.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <util/generic/hash.h>
+#include <util/string/join.h>
+#include <set>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+class TRecordBatchBuilder;
+
+class TSortableScanData {
+private:
+ YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Array>>, Columns);
+ YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Field>>, Fields);
+public:
+ TSortableScanData() = default;
+ TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns);
+
+ bool IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const {
+ if (Fields.size() != (size_t)schema->num_fields()) {
+ return false;
+ }
+ for (ui32 i = 0; i < Fields.size(); ++i) {
+ if (Fields[i]->type() != schema->field(i)->type()) {
+ return false;
+ }
+ if (Fields[i]->name() != schema->field(i)->name()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ NJson::TJsonValue DebugJson(const i32 position) const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ for (ui32 i = 0; i < Columns.size(); ++i) {
+ auto& jsonColumn = result["sorting_columns"].AppendValue(NJson::JSON_MAP);
+ jsonColumn["name"] = Fields[i]->name();
+ if (position >= 0 && position < Columns[i]->length()) {
+ jsonColumn["value"] = NArrow::DebugString(Columns[i], position);
+ }
+ }
+ return result;
+ }
+
+ std::vector<std::string> GetFieldNames() const {
+ std::vector<std::string> result;
+ for (auto&& i : Fields) {
+ result.emplace_back(i->name());
+ }
+ return result;
+ }
+};
+
+class TSortableBatchPosition {
+protected:
+
+ YDB_READONLY(i64, Position, 0);
+ i64 RecordsCount = 0;
+ bool ReverseSort = false;
+ std::shared_ptr<TSortableScanData> Sorting;
+ std::shared_ptr<TSortableScanData> Data;
+ std::shared_ptr<arrow::RecordBatch> Batch;
+ static std::optional<ui64> FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool needGreater, const bool include);
+
+public:
+ TSortableBatchPosition() = default;
+
+ const TSortableScanData& GetData() const {
+ return *Data;
+ }
+
+ bool IsReverseSort() const {
+ return ReverseSort;
+ }
+ NJson::TJsonValue DebugJson() const;
+
+ TSortableBatchPosition BuildSame(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position) const {
+ return TSortableBatchPosition(batch, position, Sorting->GetFieldNames(), Data->GetFieldNames(), ReverseSort);
+ }
+
+ bool IsSameSortingSchema(const std::shared_ptr<arrow::Schema>& schema) {
+ return Sorting->IsSameSchema(schema);
+ }
+
+ static std::shared_ptr<arrow::RecordBatch> SelectInterval(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& from, const TSortableBatchPosition& to, const bool includeFrom, const bool includeTo) {
+ if (!batch) {
+ return nullptr;
+ }
+ Y_VERIFY(from.Compare(to) != std::partial_ordering::greater);
+ const std::optional<ui32> idxFrom = FindPosition(batch, from, true, includeFrom);
+ const std::optional<ui32> idxTo = FindPosition(batch, to, false, includeTo);
+ if (!idxFrom || !idxTo || *idxTo < *idxFrom) {
+ return nullptr;
+ }
+ return batch->Slice(*idxFrom, *idxTo - *idxFrom + 1);
+ }
+
+ TSortableBatchPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position, const std::vector<std::string>& sortingColumns, const std::vector<std::string>& dataColumns, const bool reverseSort)
+ : Position(position)
+ , RecordsCount(batch->num_rows())
+ , ReverseSort(reverseSort)
+ , Sorting(std::make_shared<TSortableScanData>(batch, sortingColumns))
+ , Batch(batch)
+ {
+ if (dataColumns.size()) {
+ Data = std::make_shared<TSortableScanData>(batch, dataColumns);
+ }
+ Y_VERIFY(batch->num_rows());
+ Y_VERIFY_DEBUG(batch->ValidateFull().ok());
+ Y_VERIFY(Sorting->GetColumns().size());
+ }
+
+ std::partial_ordering Compare(const TSortableBatchPosition& item) const {
+ Y_VERIFY(item.ReverseSort == ReverseSort);
+ Y_VERIFY(item.Sorting->GetColumns().size() == Sorting->GetColumns().size());
+ const auto directResult = NArrow::ColumnsCompare(Sorting->GetColumns(), Position, item.Sorting->GetColumns(), item.Position);
+ if (ReverseSort) {
+ if (directResult == std::partial_ordering::less) {
+ return std::partial_ordering::greater;
+ } else if (directResult == std::partial_ordering::greater) {
+ return std::partial_ordering::less;
+ } else {
+ return std::partial_ordering::equivalent;
+ }
+ } else {
+ return directResult;
+ }
+ }
+
+ bool operator<(const TSortableBatchPosition& item) const {
+ return Compare(item) == std::partial_ordering::less;
+ }
+
+ bool operator==(const TSortableBatchPosition& item) const {
+ return Compare(item) == std::partial_ordering::equivalent;
+ }
+
+ bool operator!=(const TSortableBatchPosition& item) const {
+ return Compare(item) != std::partial_ordering::equivalent;
+ }
+
+ bool NextPosition(const i64 delta) {
+ return InitPosition(Position + delta);
+ }
+
+ bool InitPosition(const i64 position) {
+ if (position < RecordsCount && position >= 0) {
+ Position = position;
+ return true;
+ } else {
+ return false;
+ }
+
+ }
+
+};
+
+}
diff --git a/ydb/core/formats/arrow/reader/ya.make b/ydb/core/formats/arrow/reader/ya.make
new file mode 100644
index 00000000000..540b895abf3
--- /dev/null
+++ b/ydb/core/formats/arrow/reader/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ ydb/core/formats/arrow/simple_builder
+ ydb/core/formats/arrow/switch
+ library/cpp/actors/core
+)
+
+SRCS(
+ read_filter_merger.cpp
+)
+
+END()
diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp
index 31f21b52954..7f42862045d 100644
--- a/ydb/core/formats/arrow/special_keys.cpp
+++ b/ydb/core/formats/arrow/special_keys.cpp
@@ -1,5 +1,6 @@
#include "special_keys.h"
#include "permutations.h"
+#include "reader/read_filter_merger.h"
#include <ydb/core/formats/arrow/serializer/full.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/arrow_filter.h>
@@ -27,6 +28,10 @@ TString TSpecialKeys::SerializeToString() const {
return NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(Data);
}
+TString TSpecialKeys::SerializeToStringDataOnlyNoCompression() const {
+ return NArrow::SerializeBatchNoCompression(Data);
+}
+
TFirstLastSpecialKeys::TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames /*= {}*/) {
Y_VERIFY(batch);
Y_VERIFY(batch->num_rows());
@@ -40,13 +45,69 @@ TFirstLastSpecialKeys::TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch>
}
Data = NArrow::CopyRecords(keyBatch, indexes);
+ Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2);
+}
+
+TMinMaxSpecialKeys::TMinMaxSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::shared_ptr<arrow::Schema>& schema) {
+ Y_VERIFY(batch);
+ Y_VERIFY(batch->num_rows());
+ Y_VERIFY(schema);
+
+ NOlap::NIndexedReader::TSortableBatchPosition record(batch, 0, schema->field_names(), {}, false);
+ std::optional<NOlap::NIndexedReader::TSortableBatchPosition> minValue;
+ std::optional<NOlap::NIndexedReader::TSortableBatchPosition> maxValue;
+ while (true) {
+ if (!minValue || minValue->Compare(record) == std::partial_ordering::greater) {
+ minValue = record;
+ }
+ if (!maxValue || maxValue->Compare(record) == std::partial_ordering::less) {
+ maxValue = record;
+ }
+ if (!record.NextPosition(1)) {
+ break;
+ }
+ }
+ Y_VERIFY(minValue && maxValue);
+ std::vector<ui64> indexes;
+ indexes.emplace_back(minValue->GetPosition());
+ if (maxValue->GetPosition() != minValue->GetPosition()) {
+ indexes.emplace_back(maxValue->GetPosition());
+ }
+
+ std::vector<TString> columnNamesString;
+ for (auto&& i : schema->field_names()) {
+ columnNamesString.emplace_back(i);
+ }
+
+ auto dataBatch = NArrow::ExtractColumns(batch, columnNamesString);
+ Data = NArrow::CopyRecords(dataBatch, indexes);
+ Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2);
}
TFirstLastSpecialKeys::TFirstLastSpecialKeys(const TString& data)
+ : TBase(data) {
+ Y_VERIFY_DEBUG(Data->ValidateFull().ok());
+ Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2);
+}
+
+std::shared_ptr<NKikimr::NArrow::TFirstLastSpecialKeys> TFirstLastSpecialKeys::BuildAccordingToSchemaVerified(const std::shared_ptr<arrow::Schema>& schema) const {
+ auto newData = NArrow::ExtractColumns(Data, schema);
+ AFL_VERIFY(newData);
+ return std::make_shared<TFirstLastSpecialKeys>(newData);
+}
+
+TMinMaxSpecialKeys::TMinMaxSpecialKeys(const TString& data)
: TBase(data)
{
Y_VERIFY_DEBUG(Data->ValidateFull().ok());
Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2);
}
+std::shared_ptr<NKikimr::NArrow::TMinMaxSpecialKeys> TMinMaxSpecialKeys::BuildAccordingToSchemaVerified(const std::shared_ptr<arrow::Schema>& schema) const {
+ auto newData = NArrow::ExtractColumns(Data, schema);
+ AFL_VERIFY(newData);
+ std::shared_ptr<TMinMaxSpecialKeys> result(new TMinMaxSpecialKeys(newData));
+ return result;
+}
+
}
diff --git a/ydb/core/formats/arrow/special_keys.h b/ydb/core/formats/arrow/special_keys.h
index 135e3aff11f..98ad72012fb 100644
--- a/ydb/core/formats/arrow/special_keys.h
+++ b/ydb/core/formats/arrow/special_keys.h
@@ -20,6 +20,13 @@ protected:
}
public:
+ TString SerializeToStringDataOnlyNoCompression() const;
+
+ TSpecialKeys(const TString& data, const std::shared_ptr<arrow::Schema>& schema) {
+ Data = NArrow::DeserializeBatch(data, schema);
+ Y_VERIFY(Data);
+ Y_VERIFY_DEBUG(Data->ValidateFull().ok());
+ }
TSpecialKeys(const TString& data) {
Y_VERIFY(DeserializeFromString(data));
@@ -36,16 +43,52 @@ public:
return Data;
}
- std::optional<TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema) const {
+ std::shared_ptr<TFirstLastSpecialKeys> BuildAccordingToSchemaVerified(const std::shared_ptr<arrow::Schema>& schema) const;
+
+ std::optional<TReplaceKey> GetFirst(const std::shared_ptr<arrow::Schema>& schema = nullptr) const {
return GetKeyByIndex(0, schema);
}
- std::optional<TReplaceKey> GetMax(const std::shared_ptr<arrow::Schema>& schema) const {
+ std::optional<TReplaceKey> GetLast(const std::shared_ptr<arrow::Schema>& schema = nullptr) const {
return GetKeyByIndex(Data->num_rows() - 1, schema);
}
explicit TFirstLastSpecialKeys(const TString& data);
-
+ explicit TFirstLastSpecialKeys(const TString& data, const std::shared_ptr<arrow::Schema>& schema)
+ : TBase(data, schema)
+ {
+ Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2);
+ }
explicit TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames = {});
};
+class TMinMaxSpecialKeys: public TSpecialKeys {
+private:
+ using TBase = TSpecialKeys;
+protected:
+ TMinMaxSpecialKeys(std::shared_ptr<arrow::RecordBatch> data)
+ : TBase(data) {
+ }
+public:
+ std::shared_ptr<TMinMaxSpecialKeys> BuildAccordingToSchemaVerified(const std::shared_ptr<arrow::Schema>& schema) const;
+
+ const std::shared_ptr<arrow::RecordBatch>& GetBatch() const {
+ return Data;
+ }
+
+ std::optional<TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema = nullptr) const {
+ return GetKeyByIndex(0, schema);
+ }
+ std::optional<TReplaceKey> GetMax(const std::shared_ptr<arrow::Schema>& schema = nullptr) const {
+ return GetKeyByIndex(Data->num_rows() - 1, schema);
+ }
+
+ explicit TMinMaxSpecialKeys(const TString& data);
+ explicit TMinMaxSpecialKeys(const TString& data, const std::shared_ptr<arrow::Schema>& schema)
+ : TBase(data, schema) {
+ Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2);
+ }
+
+ explicit TMinMaxSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::shared_ptr<arrow::Schema>& schema);
+};
+
}
diff --git a/ydb/core/formats/arrow/ya.make b/ydb/core/formats/arrow/ya.make
index 416cefc1406..c26eb6bb844 100644
--- a/ydb/core/formats/arrow/ya.make
+++ b/ydb/core/formats/arrow/ya.make
@@ -11,6 +11,7 @@ PEERDIR(
ydb/core/formats/arrow/simple_builder
ydb/core/formats/arrow/dictionary
ydb/core/formats/arrow/transformer
+ ydb/core/formats/arrow/reader
library/cpp/actors/core
ydb/library/arrow_kernels
ydb/library/binary_json
diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto
index f9677baaf11..f53fbcc9d73 100644
--- a/ydb/core/protos/tx_columnshard.proto
+++ b/ydb/core/protos/tx_columnshard.proto
@@ -259,6 +259,11 @@ message TIndexGranuleMeta {
optional uint32 MarkSize = 1; // Composite key mark (granule border) size: count of first PK elements in mark
}
+message TSnapshot {
+ optional uint64 PlanStep = 1;
+ optional uint64 TxId = 2;
+}
+
message TIndexPortionMeta {
oneof Produced {
bool IsInserted = 1;
@@ -268,6 +273,8 @@ message TIndexPortionMeta {
}
optional string TierName = 5;
optional bytes PrimaryKeyBorders = 6; // arrow::RecordBatch with first and last ReplaceKey rows
+ optional TSnapshot RecordSnapshotMin = 7;
+ optional TSnapshot RecordSnapshotMax = 8;
}
message TIndexColumnMeta {
diff --git a/ydb/core/tx/columnshard/background_controller.h b/ydb/core/tx/columnshard/background_controller.h
index 056c6d94cf8..6fba6a4cd5a 100644
--- a/ydb/core/tx/columnshard/background_controller.h
+++ b/ydb/core/tx/columnshard/background_controller.h
@@ -81,7 +81,7 @@ public:
bool IsIndexingActive() const {
return ActiveIndexing;
}
- bool GetIndexingActiveCount() const {
+ i64 GetIndexingActiveCount() const {
return ActiveIndexing;
}
diff --git a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt
index 9f20f37bd4d..a059465374e 100644
--- a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt
@@ -19,6 +19,7 @@ target_link_libraries(tx-columnshard-common PUBLIC
yutil
ydb-core-protos
libs-apache-arrow
+ core-formats-arrow
tools-enum_parser-enum_serialization_runtime
)
target_sources(tx-columnshard-common PRIVATE
diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt
index 56fe56b09ee..1ef95311730 100644
--- a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt
@@ -20,6 +20,7 @@ target_link_libraries(tx-columnshard-common PUBLIC
yutil
ydb-core-protos
libs-apache-arrow
+ core-formats-arrow
tools-enum_parser-enum_serialization_runtime
)
target_sources(tx-columnshard-common PRIVATE
diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt
index 56fe56b09ee..1ef95311730 100644
--- a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt
@@ -20,6 +20,7 @@ target_link_libraries(tx-columnshard-common PUBLIC
yutil
ydb-core-protos
libs-apache-arrow
+ core-formats-arrow
tools-enum_parser-enum_serialization_runtime
)
target_sources(tx-columnshard-common PRIVATE
diff --git a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt
index 9f20f37bd4d..a059465374e 100644
--- a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt
@@ -19,6 +19,7 @@ target_link_libraries(tx-columnshard-common PUBLIC
yutil
ydb-core-protos
libs-apache-arrow
+ core-formats-arrow
tools-enum_parser-enum_serialization_runtime
)
target_sources(tx-columnshard-common PRIVATE
diff --git a/ydb/core/tx/columnshard/common/ya.make b/ydb/core/tx/columnshard/common/ya.make
index 50e899dab00..ac748e9ed9c 100644
--- a/ydb/core/tx/columnshard/common/ya.make
+++ b/ydb/core/tx/columnshard/common/ya.make
@@ -10,6 +10,7 @@ SRCS(
PEERDIR(
ydb/core/protos
contrib/libs/apache/arrow
+ ydb/core/formats/arrow
)
GENERATE_ENUM_SERIALIZATION(portion.h)
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
index fb9a45be109..6a137f7748f 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
@@ -42,6 +42,9 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
arrow::FieldVector indexFields;
indexFields.emplace_back(portionIdField);
indexFields.emplace_back(portionRecordIndexField);
+ for (auto&& i : TIndexInfo::ArrowSchemaSnapshot()->fields()) {
+ indexFields.emplace_back(i);
+ }
auto dataSchema = std::make_shared<arrow::Schema>(indexFields);
NIndexedReader::TMergePartialStream mergeStream(resultSchema->GetIndexInfo().GetReplaceKey(), dataSchema, false);
ui32 idx = 0;
@@ -67,9 +70,13 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName);
auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName);
- Y_VERIFY(columnPortionIdx && columnPortionRecordIdx);
+ auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
+ auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
+ Y_VERIFY(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx);
Y_VERIFY(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
Y_VERIFY(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
+ Y_VERIFY(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
+ Y_VERIFY(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);
const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx);
const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx);
@@ -89,8 +96,8 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
TColumnMergeContext context(resultSchema, portionRecordsCountLimit, 50 * 1024 * 1024, f, *columnInfo, SaverContext);
TMergedColumn mColumn(context);
{
- auto c = batchResult->GetColumnByName(f->name());
- AFL_VERIFY(!c);
+// auto c = batchResult->GetColumnByName(f->name());
+// AFL_VERIFY(!c);
AFL_VERIFY(batchResult->num_rows() == pIdxArray.length());
std::vector<TPortionColumnCursor> cursors;
auto loader = resultSchema->GetColumnLoader(f->name());
@@ -144,12 +151,16 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
TSimilarSlicer slicer(4 * 1024 * 1024);
auto packs = slicer.Split(batchSlices);
+ ui32 recordIdx = 0;
for (auto&& i : packs) {
TGeneralSerializedSlice slice(std::move(i));
+ auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount());
std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs();
AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetGranuleId(), *maxSnapshot, SaverContext.GetStorageOperator()));
- NArrow::TFirstLastSpecialKeys specialKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
- AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, specialKeys, SaverContext.GetTierName());
+ NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
+ NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
+ AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, SaverContext.GetTierName());
+ recordIdx += slice.GetRecordsCount();
}
if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) {
TStringBuilder sbSwitched;
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index 3f7dcce7caf..e60288c7fdc 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -93,9 +93,6 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange
const TPortionInfo& oldInfo = self.GetGranuleVerified(granule).GetPortionVerified(portion);
- auto& granuleStart = self.Granules[granule]->Record.Mark;
-
- Y_VERIFY(granuleStart <= portionInfo.IndexKeyStart());
self.UpsertPortion(portionInfo, &oldInfo);
for (auto& record : portionInfo.Records) {
diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
index 8bcbfb38df9..4d1f69800ad 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
@@ -119,14 +119,14 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& sna
}
}
const auto pred = [pkSchema](const TInsertedData* l, const TInsertedData* r) {
- return l->GetMeta().GetMin(pkSchema) < r->GetMeta().GetMin(pkSchema);
+ return l->GetMeta().GetFirstPK(pkSchema) < r->GetMeta().GetFirstPK(pkSchema);
};
std::sort(ret.begin(), ret.end(), pred);
std::vector<TCommittedBlob> result;
result.reserve(ret.size());
for (auto&& i : ret) {
- result.emplace_back(TCommittedBlob(i->GetBlobRange(), i->GetSnapshot(), i->GetSchemaVersion(), i->GetMeta().GetMin(pkSchema), i->GetMeta().GetMax(pkSchema)));
+ result.emplace_back(TCommittedBlob(i->GetBlobRange(), i->GetSnapshot(), i->GetSchemaVersion(), i->GetMeta().GetFirstPK(pkSchema), i->GetMeta().GetLastPK(pkSchema)));
}
return result;
diff --git a/ydb/core/tx/columnshard/engines/insert_table/meta.h b/ydb/core/tx/columnshard/engines/insert_table/meta.h
index 5621e6359f8..d884c3a910a 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/meta.h
+++ b/ydb/core/tx/columnshard/engines/insert_table/meta.h
@@ -30,16 +30,16 @@ public:
RawBytes = proto.GetRawBytes();
}
- std::optional<NArrow::TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema) const {
+ std::optional<NArrow::TReplaceKey> GetFirstPK(const std::shared_ptr<arrow::Schema>& schema) const {
if (GetSpecialKeys()) {
- return GetSpecialKeys()->GetMin(schema);
+ return GetSpecialKeys()->GetFirst(schema);
} else {
return {};
}
}
- std::optional<NArrow::TReplaceKey> GetMax(const std::shared_ptr<arrow::Schema>& schema) const {
+ std::optional<NArrow::TReplaceKey> GetLastPK(const std::shared_ptr<arrow::Schema>& schema) const {
if (GetSpecialKeys()) {
- return GetSpecialKeys()->GetMax(schema);
+ return GetSpecialKeys()->GetLast(schema);
} else {
return {};
}
diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.cpp b/ydb/core/tx/columnshard/engines/portions/column_record.cpp
index 77d59ed74fe..c158dcf3a22 100644
--- a/ydb/core/tx/columnshard/engines/portions/column_record.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/column_record.cpp
@@ -15,9 +15,11 @@ TChunkMeta::TChunkMeta(const TColumnChunkLoadContext& context, const TIndexInfo&
RawBytes = context.GetMetaProto().GetRawBytes();
}
if (context.GetMetaProto().HasMinValue()) {
+ AFL_VERIFY(field)("field_id", context.GetAddress().GetColumnId())("field_name", indexInfo.GetColumnName(context.GetAddress().GetColumnId()));
Min = ConstantToScalar(context.GetMetaProto().GetMinValue(), field->type());
}
if (context.GetMetaProto().HasMaxValue()) {
+ AFL_VERIFY(field)("field_id", context.GetAddress().GetColumnId())("field_name", indexInfo.GetColumnName(context.GetAddress().GetColumnId()));
Max = ConstantToScalar(context.GetMetaProto().GetMaxValue(), field->type());
}
}
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp
index 1d560b61075..58303ab6a27 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp
@@ -5,31 +5,27 @@
namespace NKikimr::NOlap {
-void TPortionMeta::FillBatchInfo(const NArrow::TFirstLastSpecialKeys& specials, const TIndexInfo& indexInfo) {
- auto& batch = specials.GetBatch();
- AFL_VERIFY(batch->num_rows());
+void TPortionMeta::FillBatchInfo(const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TIndexInfo& indexInfo) {
{
- auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey());
- AFL_VERIFY(keyBatch);
- std::vector<bool> bits(batch->num_rows(), false);
- bits[0] = true;
- bits[batch->num_rows() - 1] = true; // it could be 0 if batch has one row
-
- auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows());
- auto res = arrow::compute::Filter(keyBatch, filter);
- Y_VERIFY(res.ok());
-
- ReplaceKeyEdges = res->record_batch();
- Y_VERIFY(ReplaceKeyEdges->num_rows() == 1 || ReplaceKeyEdges->num_rows() == 2);
+ ReplaceKeyEdges = primaryKeys.BuildAccordingToSchemaVerified(indexInfo.GetReplaceKey());
+ IndexKeyStart = ReplaceKeyEdges->GetFirst();
+ IndexKeyEnd = ReplaceKeyEdges->GetLast();
}
- auto edgesBatch = NArrow::ExtractColumns(ReplaceKeyEdges, indexInfo.GetIndexKey());
- IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0);
- IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1);
+ {
+ auto cPlanStep = snapshotKeys.GetBatch()->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
+ auto cTxId = snapshotKeys.GetBatch()->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
+ Y_VERIFY(cPlanStep && cTxId);
+ Y_VERIFY(cPlanStep->type_id() == arrow::UInt64Type::type_id);
+ Y_VERIFY(cTxId->type_id() == arrow::UInt64Type::type_id);
+ const arrow::UInt64Array& cPlanStepArray = static_cast<const arrow::UInt64Array&>(*cPlanStep);
+ const arrow::UInt64Array& cTxIdArray = static_cast<const arrow::UInt64Array&>(*cTxId);
+ RecordSnapshotMin = TSnapshot(cPlanStepArray.GetView(0), cTxIdArray.GetView(0));
+ RecordSnapshotMax = TSnapshot(cPlanStepArray.GetView(snapshotKeys.GetBatch()->num_rows() - 1), cTxIdArray.GetView(snapshotKeys.GetBatch()->num_rows() - 1));
+ }
}
bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo) {
- const bool compositeIndexKey = indexInfo.IsCompositeIndexKey();
if (Produced != TPortionMeta::EProduced::UNSPECIFIED) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "parsing duplication");
return true;
@@ -49,17 +45,16 @@ bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortio
}
if (portionMeta.HasPrimaryKeyBorders()) {
- ReplaceKeyEdges = NArrow::DeserializeBatch(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey());
- Y_VERIFY(ReplaceKeyEdges);
- Y_VERIFY_DEBUG(ReplaceKeyEdges->ValidateFull().ok());
- Y_VERIFY(ReplaceKeyEdges->num_rows() == 1 || ReplaceKeyEdges->num_rows() == 2);
+ ReplaceKeyEdges = std::make_shared<NArrow::TFirstLastSpecialKeys>(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey());
+ IndexKeyStart = ReplaceKeyEdges->GetFirst();
+ IndexKeyEnd = ReplaceKeyEdges->GetLast();
+ }
- if (compositeIndexKey) {
- auto edgesBatch = NArrow::ExtractColumns(ReplaceKeyEdges, indexInfo.GetIndexKey());
- Y_VERIFY(edgesBatch);
- IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0);
- IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1);
- }
+ if (portionMeta.HasRecordSnapshotMin()) {
+ RecordSnapshotMin = TSnapshot(portionMeta.GetRecordSnapshotMin().GetPlanStep(), portionMeta.GetRecordSnapshotMin().GetTxId());
+ }
+ if (portionMeta.HasRecordSnapshotMax()) {
+ RecordSnapshotMax = TSnapshot(portionMeta.GetRecordSnapshotMax().GetPlanStep(), portionMeta.GetRecordSnapshotMax().GetTxId());
}
return true;
}
@@ -93,11 +88,17 @@ std::optional<NKikimrTxColumnShard::TIndexPortionMeta> TPortionMeta::SerializeTo
break;
}
- if (const auto& keyEdgesBatch = ReplaceKeyEdges) {
- Y_VERIFY(keyEdgesBatch);
- Y_VERIFY_DEBUG(keyEdgesBatch->ValidateFull().ok());
- Y_VERIFY(keyEdgesBatch->num_rows() == 1 || keyEdgesBatch->num_rows() == 2);
- portionMeta.SetPrimaryKeyBorders(NArrow::SerializeBatchNoCompression(keyEdgesBatch));
+ if (ReplaceKeyEdges) {
+ portionMeta.SetPrimaryKeyBorders(ReplaceKeyEdges->SerializeToStringDataOnlyNoCompression());
+ }
+
+ if (RecordSnapshotMin) {
+ portionMeta.MutableRecordSnapshotMin()->SetPlanStep(RecordSnapshotMin->GetPlanStep());
+ portionMeta.MutableRecordSnapshotMin()->SetTxId(RecordSnapshotMin->GetTxId());
+ }
+ if (RecordSnapshotMax) {
+ portionMeta.MutableRecordSnapshotMax()->SetPlanStep(RecordSnapshotMax->GetPlanStep());
+ portionMeta.MutableRecordSnapshotMax()->SetTxId(RecordSnapshotMax->GetTxId());
}
return portionMeta;
}
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h
index ae4305e0675..6207944db82 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.h
+++ b/ydb/core/tx/columnshard/engines/portions/meta.h
@@ -1,5 +1,6 @@
#pragma once
#include <ydb/core/tx/columnshard/common/portion.h>
+#include <ydb/core/tx/columnshard/common/snapshot.h>
#include <ydb/core/formats/arrow/replace_key.h>
#include <ydb/core/formats/arrow/special_keys.h>
#include <ydb/core/protos/tx_columnshard.pb.h>
@@ -12,14 +13,16 @@ struct TIndexInfo;
struct TPortionMeta {
private:
- void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted);
- std::shared_ptr<arrow::RecordBatch> ReplaceKeyEdges; // first and last PK rows
+ std::shared_ptr<NArrow::TFirstLastSpecialKeys> ReplaceKeyEdges; // first and last PK rows
YDB_ACCESSOR_DEF(TString, TierName);
public:
using EProduced = NPortion::EProduced;
std::optional<NArrow::TReplaceKey> IndexKeyStart;
std::optional<NArrow::TReplaceKey> IndexKeyEnd;
+
+ std::optional<TSnapshot> RecordSnapshotMin;
+ std::optional<TSnapshot> RecordSnapshotMax;
EProduced Produced{EProduced::UNSPECIFIED};
ui32 FirstPkColumn = 0;
@@ -27,7 +30,7 @@ public:
std::optional<NKikimrTxColumnShard::TIndexPortionMeta> SerializeToProto(const ui32 columnId, const ui32 chunk) const;
- void FillBatchInfo(const NArrow::TFirstLastSpecialKeys& specials, const TIndexInfo& indexInfo);
+ void FillBatchInfo(const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TIndexInfo& indexInfo);
EProduced GetProduced() const {
return Produced;
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index 4cb9ac33a16..ae943b88ead 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -26,18 +26,16 @@ const TColumnRecord& TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record)
return Records.back();
}
-void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
- const TString& tierName) {
+void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, const TString& tierName) {
Y_VERIFY(batch->num_rows() == NumRows());
- AddMetadata(snapshotSchema, NArrow::TFirstLastSpecialKeys(batch), tierName);
+ AddMetadata(snapshotSchema, NArrow::TFirstLastSpecialKeys(batch), NArrow::TMinMaxSpecialKeys(batch, TIndexInfo::ArrowSchemaSnapshot()), tierName);
}
-void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& specials,
- const TString& tierName) {
+void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TString& tierName) {
const auto& indexInfo = snapshotSchema.GetIndexInfo();
Meta = {};
Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId();
- Meta.FillBatchInfo(specials, indexInfo);
+ Meta.FillBatchInfo(primaryKeys, snapshotKeys, indexInfo);
Meta.SetTierName(tierName);
}
@@ -143,20 +141,6 @@ void TPortionInfo::AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& r
Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId();
Y_VERIFY(Meta.DeserializeFromProto(*portionMeta, indexInfo));
}
- if (!indexInfo.IsCompositeIndexKey() && indexInfo.GetPKFirstColumnId() == rec.ColumnId) {
- if (rec.GetMeta().GetMin()) {
- auto candidate = NArrow::TReplaceKey::FromScalar(rec.GetMeta().GetMin());
- if (!Meta.IndexKeyStart || candidate < *Meta.IndexKeyStart) {
- Meta.IndexKeyStart = candidate;
- }
- }
- if (rec.GetMeta().GetMax()) {
- auto candidate = NArrow::TReplaceKey::FromScalar(rec.GetMeta().GetMax());
- if (!Meta.IndexKeyEnd || *Meta.IndexKeyEnd < candidate) {
- Meta.IndexKeyEnd = candidate;
- }
- }
- }
}
bool TPortionInfo::HasPkMinMax() const {
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index 5ac673408f8..9cb88c7a5d6 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -222,7 +222,7 @@ public:
void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
const TString& tierName);
- void AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& specials,
+ void AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys,
const TString& tierName);
std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const;
@@ -238,6 +238,16 @@ public:
return *Meta.IndexKeyEnd;
}
+ const TSnapshot& RecordSnapshotMin() const {
+ Y_VERIFY(Meta.RecordSnapshotMin);
+ return *Meta.RecordSnapshotMin;
+ }
+
+ const TSnapshot& RecordSnapshotMax() const {
+ Y_VERIFY(Meta.RecordSnapshotMax);
+ return *Meta.RecordSnapshotMax;
+ }
+
ui32 NumRows() const {
ui32 result = 0;
std::optional<ui32> columnIdFirst;
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
index 832d00a0c0c..ef71c3366ab 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
@@ -3,89 +3,6 @@
namespace NKikimr::NOlap::NIndexedReader {
-NJson::TJsonValue TSortableBatchPosition::DebugJson() const {
- NJson::TJsonValue result;
- result["reverse"] = ReverseSort;
- result["records_count"] = RecordsCount;
- result["position"] = Position;
- result["sorting"] = Sorting->DebugJson(Position);
- if (Data) {
- result["data"] = Data->DebugJson(Position);
- }
- return result;
-}
-
-std::optional<ui64> TSortableBatchPosition::FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool greater, const bool include) {
- if (!batch || !batch->num_rows()) {
- return {};
- }
-
- const auto checkEqualBorder = [batch, greater, include](const i64 position) ->std::optional<i64> {
- if (include) {
- return position;
- } else if (greater) {
- if (batch->num_rows() > position + 1) {
- return position + 1;
- } else {
- return {};
- }
- } else {
- if (position) {
- return position - 1;
- } else {
- return {};
- }
- }
- };
-
- i64 posStart = 0;
- i64 posFinish = batch->num_rows() - 1;
- TSortableBatchPosition position = forFound.BuildSame(batch, posStart);
- {
- position.InitPosition(posStart);
- auto cmp = position.Compare(forFound);
- if (cmp == std::partial_ordering::greater) {
- if (greater) {
- return posStart;
- } else {
- return {};
- }
- } else if (cmp == std::partial_ordering::equivalent) {
- return checkEqualBorder(posStart);
- }
- }
- {
- position.InitPosition(posFinish);
- auto cmp = position.Compare(forFound);
- if (cmp == std::partial_ordering::less) {
- if (greater) {
- return {};
- } else {
- return posFinish;
- }
- } else if (cmp == std::partial_ordering::equivalent) {
- return checkEqualBorder(posFinish);
- }
- }
- while (posFinish > posStart + 1) {
- Y_VERIFY(position.InitPosition(0.5 * (posStart + posFinish)));
- const auto comparision = position.Compare(forFound);
- if (comparision == std::partial_ordering::less) {
- posStart = position.Position;
- } else if (comparision == std::partial_ordering::greater) {
- posFinish = position.Position;
- } else {
- return checkEqualBorder(position.Position);
- }
- }
- Y_VERIFY(posFinish != posStart);
- if (greater) {
- return posFinish;
- } else {
- return posStart;
- }
-}
-
void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition> point) {
Y_VERIFY(point);
Y_VERIFY(point->IsSameSortingSchema(SortSchema));
@@ -217,16 +134,6 @@ NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const {
return result;
}
-TSortableScanData::TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns) {
- for (auto&& i : columns) {
- auto c = batch->GetColumnByName(i);
- AFL_VERIFY(c)("column_name", i)("columns", JoinSeq(",", columns));
- Columns.emplace_back(c);
- auto f = batch->schema()->GetFieldByName(i);
- Fields.emplace_back(f);
- }
-}
-
void TRecordBatchBuilder::AddRecord(const TSortableBatchPosition& position) {
Y_VERIFY_DEBUG(position.GetData().GetColumns().size() == Builders.size());
Y_VERIFY_DEBUG(IsSameFieldsSequence(position.GetData().GetFields(), Fields));
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
index f4cded77878..ce2bebf5b0c 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
@@ -4,6 +4,7 @@
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/tx/columnshard/engines/index_info.h>
#include <ydb/core/formats/arrow/switch/switch_type.h>
+#include <ydb/core/formats/arrow/reader/read_filter_merger.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
#include <util/generic/hash.h>
#include <util/string/join.h>
@@ -13,154 +14,6 @@ namespace NKikimr::NOlap::NIndexedReader {
class TRecordBatchBuilder;
-class TSortableScanData {
-private:
- YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Array>>, Columns);
- YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Field>>, Fields);
-public:
- TSortableScanData() = default;
- TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns);
-
- bool IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const {
- if (Fields.size() != (size_t)schema->num_fields()) {
- return false;
- }
- for (ui32 i = 0; i < Fields.size(); ++i) {
- if (Fields[i]->type() != schema->field(i)->type()) {
- return false;
- }
- if (Fields[i]->name() != schema->field(i)->name()) {
- return false;
- }
- }
- return true;
- }
-
- NJson::TJsonValue DebugJson(const i32 position) const {
- NJson::TJsonValue result = NJson::JSON_MAP;
- for (ui32 i = 0; i < Columns.size(); ++i) {
- auto& jsonColumn = result["sorting_columns"].AppendValue(NJson::JSON_MAP);
- jsonColumn["name"] = Fields[i]->name();
- if (position >= 0 && position < Columns[i]->length()) {
- jsonColumn["value"] = NArrow::DebugString(Columns[i], position);
- }
- }
- return result;
- }
-
- std::vector<std::string> GetFieldNames() const {
- std::vector<std::string> result;
- for (auto&& i : Fields) {
- result.emplace_back(i->name());
- }
- return result;
- }
-};
-
-class TSortableBatchPosition {
-protected:
-
- YDB_READONLY(i64, Position, 0);
- i64 RecordsCount = 0;
- bool ReverseSort = false;
- std::shared_ptr<TSortableScanData> Sorting;
- std::shared_ptr<TSortableScanData> Data;
- std::shared_ptr<arrow::RecordBatch> Batch;
- static std::optional<ui64> FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool needGreater, const bool include);
-
-public:
- TSortableBatchPosition() = default;
-
- const TSortableScanData& GetData() const {
- return *Data;
- }
-
- bool IsReverseSort() const {
- return ReverseSort;
- }
- NJson::TJsonValue DebugJson() const;
-
- TSortableBatchPosition BuildSame(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position) const {
- return TSortableBatchPosition(batch, position, Sorting->GetFieldNames(), Data->GetFieldNames(), ReverseSort);
- }
-
- bool IsSameSortingSchema(const std::shared_ptr<arrow::Schema>& schema) {
- return Sorting->IsSameSchema(schema);
- }
-
- static std::shared_ptr<arrow::RecordBatch> SelectInterval(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& from, const TSortableBatchPosition& to, const bool includeFrom, const bool includeTo) {
- if (!batch) {
- return nullptr;
- }
- Y_VERIFY(from.Compare(to) != std::partial_ordering::greater);
- const std::optional<ui32> idxFrom = FindPosition(batch, from, true, includeFrom);
- const std::optional<ui32> idxTo = FindPosition(batch, to, false, includeTo);
- if (!idxFrom || !idxTo || *idxTo < *idxFrom) {
- return nullptr;
- }
- return batch->Slice(*idxFrom, *idxTo - *idxFrom + 1);
- }
-
- TSortableBatchPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position, const std::vector<std::string>& sortingColumns, const std::vector<std::string>& dataColumns, const bool reverseSort)
- : Position(position)
- , RecordsCount(batch->num_rows())
- , ReverseSort(reverseSort)
- , Sorting(std::make_shared<TSortableScanData>(batch, sortingColumns))
- , Batch(batch)
- {
- if (dataColumns.size()) {
- Data = std::make_shared<TSortableScanData>(batch, dataColumns);
- }
- Y_VERIFY(batch->num_rows());
- Y_VERIFY_DEBUG(batch->ValidateFull().ok());
- Y_VERIFY(Sorting->GetColumns().size());
- }
-
- std::partial_ordering Compare(const TSortableBatchPosition& item) const {
- Y_VERIFY(item.ReverseSort == ReverseSort);
- Y_VERIFY(item.Sorting->GetColumns().size() == Sorting->GetColumns().size());
- const auto directResult = NArrow::ColumnsCompare(Sorting->GetColumns(), Position, item.Sorting->GetColumns(), item.Position);
- if (ReverseSort) {
- if (directResult == std::partial_ordering::less) {
- return std::partial_ordering::greater;
- } else if (directResult == std::partial_ordering::greater) {
- return std::partial_ordering::less;
- } else {
- return std::partial_ordering::equivalent;
- }
- } else {
- return directResult;
- }
- }
-
- bool operator<(const TSortableBatchPosition& item) const {
- return Compare(item) == std::partial_ordering::less;
- }
-
- bool operator==(const TSortableBatchPosition& item) const {
- return Compare(item) == std::partial_ordering::equivalent;
- }
-
- bool operator!=(const TSortableBatchPosition& item) const {
- return Compare(item) != std::partial_ordering::equivalent;
- }
-
- bool NextPosition(const i64 delta) {
- return InitPosition(Position + delta);
- }
-
- bool InitPosition(const i64 position) {
- if (position < RecordsCount && position >= 0) {
- Position = position;
- return true;
- } else {
- return false;
- }
-
- }
-
-};
-
class TMergePartialStream {
private:
#ifndef NDEBUG
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
index 835c311f2b1..a4249a9007c 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
@@ -438,4 +438,8 @@ std::optional<TIndexInfo> TIndexInfo::BuildFromProto(const NKikimrSchemeOp::TCol
return result;
}
+std::shared_ptr<arrow::Field> TIndexInfo::SpecialColumnField(const ui32 columnId) const {
+ return ArrowSchemaSnapshot()->GetFieldByName(GetColumnName(columnId, true));
+}
+
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h
index 2dd955dd6f3..bc53deaa748 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h
@@ -98,6 +98,11 @@ public:
return Id;
}
+ static const std::vector<std::string>& SnapshotColumnNames() {
+ static std::vector<std::string> result = {SPEC_COL_PLAN_STEP, SPEC_COL_TX_ID};
+ return result;
+ }
+
std::shared_ptr<arrow::Schema> GetColumnSchema(const ui32 columnId) const;
std::shared_ptr<arrow::Schema> GetColumnsSchema(const std::set<ui32>& columnIds) const;
TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const;
@@ -154,6 +159,7 @@ public:
std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<ui32>& columnIds, bool withSpecials = false) const;
std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<TString>& columnNames) const;
std::shared_ptr<arrow::Field> ArrowColumnField(ui32 columnId) const;
+ std::shared_ptr<arrow::Field> SpecialColumnField(const ui32 columnId) const;
const THashSet<TString>& GetRequiredColumns() const {
return RequiredColumns;
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp
index 1a13559ac9e..84bd52f6622 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp
@@ -25,7 +25,6 @@ void TGranuleMeta::UpsertPortion(const TPortionInfo& info) {
AFL_VERIFY(record.Valid())("event", "incorrect_record")("record", record.DebugString())("portion", info.DebugString());
}
- Y_VERIFY(Record.Mark <= info.IndexKeyStart());
if (it == Portions.end()) {
OnBeforeChangePortion(nullptr);
auto portionNew = std::make_shared<TPortionInfo>(info);