aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-07-19 16:57:24 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-07-19 16:57:24 +0300
commitd34037f1ebac7a9b3d7669010224c828ed89c5aa (patch)
tree044c353106aa8f09b9808b4b16eeea5467dbbd70
parent610e03a93f88446e713af1c9e30c483394fe26e6 (diff)
downloadydb-d34037f1ebac7a9b3d7669010224c828ed89c5aa.tar.gz
KIKIMR-18796:move libraries for usage in future
-rw-r--r--ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt4
-rw-r--r--ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt4
-rw-r--r--ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt4
-rw-r--r--ydb/core/tx/columnshard/common/scalars.cpp (renamed from ydb/core/tx/columnshard/engines/scalars.cpp)1
-rw-r--r--ydb/core/tx/columnshard/common/scalars.h17
-rw-r--r--ydb/core/tx/columnshard/common/snapshot.cpp5
-rw-r--r--ydb/core/tx/columnshard/common/snapshot.h51
-rw-r--r--ydb/core/tx/columnshard/common/ya.make4
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt15
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt15
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt15
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt15
-rw-r--r--ydb/core/tx/columnshard/engines/column_features.cpp78
-rw-r--r--ydb/core/tx/columnshard/engines/column_features.h120
-rw-r--r--ydb/core/tx/columnshard/engines/columns_table.h85
-rw-r--r--ydb/core/tx/columnshard/engines/defs.h45
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp429
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h201
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp257
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h515
-rw-r--r--ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt31
-rw-r--r--ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt32
-rw-r--r--ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt32
-rw-r--r--ydb/core/tx/columnshard/engines/portions/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt31
-rw-r--r--ydb/core/tx/columnshard/engines/portions/column_record.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/portions/column_record.h91
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp262
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h519
-rw-r--r--ydb/core/tx/columnshard/engines/portions/ya.make14
-rw-r--r--ydb/core/tx/columnshard/engines/scalars.h17
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/column_features.cpp79
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/column_features.h120
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.cpp429
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.h203
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/tier_info.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/tier_info.h234
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/ya.make3
-rw-r--r--ydb/core/tx/columnshard/engines/tier_info.h235
-rw-r--r--ydb/core/tx/columnshard/engines/ya.make3
45 files changed, 2229 insertions, 2029 deletions
diff --git a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt
index 228521a024..f9f50dfaa1 100644
--- a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt
@@ -11,7 +11,11 @@ add_library(tx-columnshard-common)
target_link_libraries(tx-columnshard-common PUBLIC
contrib-libs-cxxsupp
yutil
+ ydb-core-protos
+ libs-apache-arrow
)
target_sources(tx-columnshard-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/scalars.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/snapshot.cpp
)
diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt
index 9f38a9ba79..6128aa8f0e 100644
--- a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt
@@ -12,7 +12,11 @@ target_link_libraries(tx-columnshard-common PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
+ ydb-core-protos
+ libs-apache-arrow
)
target_sources(tx-columnshard-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/scalars.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/snapshot.cpp
)
diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt
index 9f38a9ba79..6128aa8f0e 100644
--- a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt
@@ -12,7 +12,11 @@ target_link_libraries(tx-columnshard-common PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
+ ydb-core-protos
+ libs-apache-arrow
)
target_sources(tx-columnshard-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/scalars.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/snapshot.cpp
)
diff --git a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt
index 228521a024..f9f50dfaa1 100644
--- a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt
@@ -11,7 +11,11 @@ add_library(tx-columnshard-common)
target_link_libraries(tx-columnshard-common PUBLIC
contrib-libs-cxxsupp
yutil
+ ydb-core-protos
+ libs-apache-arrow
)
target_sources(tx-columnshard-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/scalars.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/snapshot.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/scalars.cpp b/ydb/core/tx/columnshard/common/scalars.cpp
index 9cc6993808..bbf24e3cc1 100644
--- a/ydb/core/tx/columnshard/engines/scalars.cpp
+++ b/ydb/core/tx/columnshard/common/scalars.cpp
@@ -2,6 +2,7 @@
#include <ydb/core/formats/arrow/switch_type.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
+#include <util/system/unaligned_mem.h>
namespace NKikimr::NOlap {
diff --git a/ydb/core/tx/columnshard/common/scalars.h b/ydb/core/tx/columnshard/common/scalars.h
new file mode 100644
index 0000000000..c2bfed65c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/common/scalars.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <ydb/core/protos/tx_columnshard.pb.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/scalar.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
+#include <memory>
+
+namespace NKikimr::NOlap {
+
+void ScalarToConstant(const arrow::Scalar& scalar, NKikimrSSA::TProgram_TConstant& value);
+std::shared_ptr<arrow::Scalar> ConstantToScalar(const NKikimrSSA::TProgram_TConstant& value,
+ const std::shared_ptr<arrow::DataType>& type);
+
+TString SerializeKeyScalar(const std::shared_ptr<arrow::Scalar>& key);
+std::shared_ptr<arrow::Scalar> DeserializeKeyScalar(const TString& key, const std::shared_ptr<arrow::DataType>& type);
+
+}
diff --git a/ydb/core/tx/columnshard/common/snapshot.cpp b/ydb/core/tx/columnshard/common/snapshot.cpp
new file mode 100644
index 0000000000..9f90580806
--- /dev/null
+++ b/ydb/core/tx/columnshard/common/snapshot.cpp
@@ -0,0 +1,5 @@
+#include "snapshot.h"
+
+namespace NKikimr::NOlap {
+
+};
diff --git a/ydb/core/tx/columnshard/common/snapshot.h b/ydb/core/tx/columnshard/common/snapshot.h
new file mode 100644
index 0000000000..3da8294491
--- /dev/null
+++ b/ydb/core/tx/columnshard/common/snapshot.h
@@ -0,0 +1,51 @@
+#pragma once
+#include <util/stream/output.h>
+#include <util/string/cast.h>
+
+namespace NKikimr::NOlap {
+
+class TSnapshot {
+private:
+ ui64 PlanStep = 0;
+ ui64 TxId = 0;
+
+public:
+ constexpr TSnapshot(const ui64 planStep, const ui64 txId) noexcept
+ : PlanStep(planStep)
+ , TxId(txId) {
+ }
+
+ constexpr ui64 GetPlanStep() const noexcept {
+ return PlanStep;
+ }
+
+ constexpr ui64 GetTxId() const noexcept {
+ return TxId;
+ }
+
+ constexpr bool IsZero() const noexcept {
+ return PlanStep == 0 && TxId == 0;
+ }
+
+ constexpr bool Valid() const noexcept {
+ return PlanStep && TxId;
+ }
+
+ static constexpr TSnapshot Zero() noexcept {
+ return TSnapshot(0, 0);
+ }
+
+ static constexpr TSnapshot Max() noexcept {
+ return TSnapshot(-1ll, -1ll);
+ }
+
+ constexpr bool operator==(const TSnapshot&) const noexcept = default;
+
+ constexpr auto operator<=>(const TSnapshot&) const noexcept = default;
+
+ friend IOutputStream& operator<<(IOutputStream& out, const TSnapshot& s) {
+ return out << "{" << s.PlanStep << ':' << (s.TxId == std::numeric_limits<ui64>::max() ? "max" : ::ToString(s.TxId)) << "}";
+ }
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/common/ya.make b/ydb/core/tx/columnshard/common/ya.make
index 91a0d3a545..36c5c82b5d 100644
--- a/ydb/core/tx/columnshard/common/ya.make
+++ b/ydb/core/tx/columnshard/common/ya.make
@@ -2,9 +2,13 @@ LIBRARY()
SRCS(
reverse_accessor.cpp
+ scalars.cpp
+ snapshot.cpp
)
PEERDIR(
+ ydb/core/protos
+ contrib/libs/apache/arrow
)
END()
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
index 6d9335d6a8..204ac83fc1 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
@@ -7,18 +7,13 @@
add_subdirectory(insert_table)
+add_subdirectory(portions)
add_subdirectory(predicate)
add_subdirectory(reader)
add_subdirectory(scheme)
add_subdirectory(storage)
add_subdirectory(ut)
add_subdirectory(writer)
-get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
add_library(tx-columnshard-engines)
target_compile_options(tx-columnshard-engines PRIVATE
@@ -38,10 +33,10 @@ target_link_libraries(tx-columnshard-engines PUBLIC
columnshard-engines-predicate
columnshard-engines-storage
columnshard-engines-insert_table
+ columnshard-engines-portions
formats-arrow-compression
core-tx-program
udf-service-exception_policy
- tools-enum_parser-enum_serialization_runtime
)
target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -54,11 +49,5 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp
)
-generate_enum_serilization(tx-columnshard-engines
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.h
- INCLUDE_HEADERS
- ydb/core/tx/columnshard/engines/portion_info.h
-)
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
index df8b47a1b8..a7bd0121aa 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
@@ -7,18 +7,13 @@
add_subdirectory(insert_table)
+add_subdirectory(portions)
add_subdirectory(predicate)
add_subdirectory(reader)
add_subdirectory(scheme)
add_subdirectory(storage)
add_subdirectory(ut)
add_subdirectory(writer)
-get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
add_library(tx-columnshard-engines)
target_compile_options(tx-columnshard-engines PRIVATE
@@ -39,10 +34,10 @@ target_link_libraries(tx-columnshard-engines PUBLIC
columnshard-engines-predicate
columnshard-engines-storage
columnshard-engines-insert_table
+ columnshard-engines-portions
formats-arrow-compression
core-tx-program
udf-service-exception_policy
- tools-enum_parser-enum_serialization_runtime
)
target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -55,11 +50,5 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp
)
-generate_enum_serilization(tx-columnshard-engines
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.h
- INCLUDE_HEADERS
- ydb/core/tx/columnshard/engines/portion_info.h
-)
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
index df8b47a1b8..a7bd0121aa 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
@@ -7,18 +7,13 @@
add_subdirectory(insert_table)
+add_subdirectory(portions)
add_subdirectory(predicate)
add_subdirectory(reader)
add_subdirectory(scheme)
add_subdirectory(storage)
add_subdirectory(ut)
add_subdirectory(writer)
-get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
add_library(tx-columnshard-engines)
target_compile_options(tx-columnshard-engines PRIVATE
@@ -39,10 +34,10 @@ target_link_libraries(tx-columnshard-engines PUBLIC
columnshard-engines-predicate
columnshard-engines-storage
columnshard-engines-insert_table
+ columnshard-engines-portions
formats-arrow-compression
core-tx-program
udf-service-exception_policy
- tools-enum_parser-enum_serialization_runtime
)
target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -55,11 +50,5 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp
)
-generate_enum_serilization(tx-columnshard-engines
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.h
- INCLUDE_HEADERS
- ydb/core/tx/columnshard/engines/portion_info.h
-)
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
index 6d9335d6a8..204ac83fc1 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
@@ -7,18 +7,13 @@
add_subdirectory(insert_table)
+add_subdirectory(portions)
add_subdirectory(predicate)
add_subdirectory(reader)
add_subdirectory(scheme)
add_subdirectory(storage)
add_subdirectory(ut)
add_subdirectory(writer)
-get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
add_library(tx-columnshard-engines)
target_compile_options(tx-columnshard-engines PRIVATE
@@ -38,10 +33,10 @@ target_link_libraries(tx-columnshard-engines PUBLIC
columnshard-engines-predicate
columnshard-engines-storage
columnshard-engines-insert_table
+ columnshard-engines-portions
formats-arrow-compression
core-tx-program
udf-service-exception_policy
- tools-enum_parser-enum_serialization_runtime
)
target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -54,11 +49,5 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp
)
-generate_enum_serilization(tx-columnshard-engines
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.h
- INCLUDE_HEADERS
- ydb/core/tx/columnshard/engines/portion_info.h
-)
diff --git a/ydb/core/tx/columnshard/engines/column_features.cpp b/ydb/core/tx/columnshard/engines/column_features.cpp
index dd6341ec7a..44faec99a7 100644
--- a/ydb/core/tx/columnshard/engines/column_features.cpp
+++ b/ydb/core/tx/columnshard/engines/column_features.cpp
@@ -1,79 +1 @@
#include "column_features.h"
-#include "index_info.h"
-#include <ydb/core/formats/arrow/serializer/full.h>
-#include <ydb/core/formats/arrow/serializer/batch_only.h>
-#include <util/string/builder.h>
-
-namespace NKikimr::NOlap {
-
-NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetSaveTransformer() const {
- NArrow::NTransformation::ITransformer::TPtr transformer;
- if (DictionaryEncoding) {
- transformer = DictionaryEncoding->BuildEncoder();
- }
- return transformer;
-}
-
-NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetLoadTransformer() const {
- NArrow::NTransformation::ITransformer::TPtr transformer;
- if (DictionaryEncoding) {
- transformer = DictionaryEncoding->BuildDecoder();
- }
- return transformer;
-}
-
-std::shared_ptr<NKikimr::NOlap::TColumnLoader> TColumnFeatures::GetLoader(const TIndexInfo& info) const {
- if (!LoaderCache) {
- NArrow::NTransformation::ITransformer::TPtr transformer = GetLoadTransformer();
- auto schema = info.GetColumnSchema(ColumnId);
- if (!transformer) {
- LoaderCache = std::make_shared<TColumnLoader>(transformer,
- std::make_shared<NArrow::NSerialization::TBatchPayloadDeserializer>(schema),
- schema, ColumnId);
- } else {
- LoaderCache = std::make_shared<TColumnLoader>(transformer,
- std::make_shared<NArrow::NSerialization::TFullDataDeserializer>(),
- schema, ColumnId);
- }
- }
- return LoaderCache;
-}
-
-std::optional<NKikimr::NOlap::TColumnFeatures> TColumnFeatures::BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo, const ui32 columnId) {
- TColumnFeatures result(columnId);
- if (columnInfo.HasCompression()) {
- auto settings = NArrow::TCompression::BuildFromProto(columnInfo.GetCompression());
- Y_VERIFY(settings.IsSuccess());
- result.Compression = *settings;
- }
- if (columnInfo.HasDictionaryEncoding()) {
- auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnInfo.GetDictionaryEncoding());
- Y_VERIFY(settings.IsSuccess());
- result.DictionaryEncoding = *settings;
- }
- return result;
-}
-
-std::unique_ptr<arrow::util::Codec> TColumnFeatures::GetCompressionCodec() const {
- if (Compression) {
- return Compression->BuildArrowCodec();
- } else {
- return nullptr;
- }
-}
-
-TString TColumnLoader::DebugString() const {
- TStringBuilder result;
- if (ExpectedSchema) {
- result << "schema:" << ExpectedSchema->ToString() << ";";
- }
- if (Transformer) {
- result << "transformer:" << Transformer->DebugString() << ";";
- }
- if (Deserializer) {
- result << "deserializer:" << Deserializer->DebugString() << ";";
- }
- return result;
-}
-
-} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/column_features.h b/ydb/core/tx/columnshard/engines/column_features.h
index bb5e79428a..053578affb 100644
--- a/ydb/core/tx/columnshard/engines/column_features.h
+++ b/ydb/core/tx/columnshard/engines/column_features.h
@@ -1,120 +1,2 @@
#pragma once
-#include <ydb/core/formats/arrow/compression/object.h>
-#include <ydb/core/formats/arrow/dictionary/object.h>
-#include <ydb/core/formats/arrow/serializer/abstract.h>
-#include <ydb/core/formats/arrow/transformer/abstract.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
-
-namespace NKikimr::NOlap {
-
-class TSaverContext {
-private:
- TString TierName;
- std::optional<NArrow::TCompression> ExternalCompression;
-public:
- const std::optional<NArrow::TCompression>& GetExternalCompression() const {
- return ExternalCompression;
- }
- TSaverContext& SetExternalCompression(const std::optional<NArrow::TCompression>& value) {
- ExternalCompression = value;
- return *this;
- }
- const TString& GetTierName() const {
- return TierName;
- }
- TSaverContext& SetTierName(const TString& value) {
- TierName = value;
- return *this;
- }
-};
-
-class TColumnSaver {
-private:
- NArrow::NTransformation::ITransformer::TPtr Transformer;
- NArrow::NSerialization::ISerializer::TPtr Serializer;
-public:
- TColumnSaver() = default;
- TColumnSaver(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::ISerializer::TPtr serializer)
- : Transformer(transformer)
- , Serializer(serializer) {
- Y_VERIFY(Serializer);
- }
-
- TString Apply(const std::shared_ptr<arrow::RecordBatch>& data) const {
- Y_VERIFY(Serializer);
- if (Transformer) {
- return Serializer->Serialize(Transformer->Transform(data));
- } else {
- return Serializer->Serialize(data);
- }
- }
-};
-
-class TColumnLoader {
-private:
- NArrow::NTransformation::ITransformer::TPtr Transformer;
- NArrow::NSerialization::IDeserializer::TPtr Deserializer;
- std::shared_ptr<arrow::Schema> ExpectedSchema;
- const ui32 ColumnId;
-public:
- TString DebugString() const;
-
- TColumnLoader(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::IDeserializer::TPtr deserializer,
- const std::shared_ptr<arrow::Schema>& expectedSchema, const ui32 columnId)
- : Transformer(transformer)
- , Deserializer(deserializer)
- , ExpectedSchema(expectedSchema)
- , ColumnId(columnId)
- {
- Y_VERIFY(ExpectedSchema);
- Y_VERIFY(Deserializer);
- }
-
- ui32 GetColumnId() const {
- return ColumnId;
- }
-
- std::shared_ptr<arrow::Schema> GetExpectedSchema() const {
- return ExpectedSchema;
- }
-
- arrow::Result<std::shared_ptr<arrow::RecordBatch>> Apply(const TString& data) const {
- Y_VERIFY(Deserializer);
- arrow::Result<std::shared_ptr<arrow::RecordBatch>> columnArray = Deserializer->Deserialize(data);
- if (!columnArray.ok()) {
- return columnArray;
- }
- if (Transformer) {
- return Transformer->Transform(*columnArray);
- } else {
- return columnArray;
- }
- }
-};
-
-struct TIndexInfo;
-
-class TColumnFeatures {
-private:
- const ui32 ColumnId;
- std::optional<NArrow::TCompression> Compression;
- std::optional<NArrow::NDictionary::TEncodingSettings> DictionaryEncoding;
- mutable std::shared_ptr<TColumnLoader> LoaderCache;
-public:
- TColumnFeatures(const ui32 columnId)
- : ColumnId(columnId)
- {
-
- }
- static std::optional<TColumnFeatures> BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo, const ui32 columnId);
-
- NArrow::NTransformation::ITransformer::TPtr GetSaveTransformer() const;
- NArrow::NTransformation::ITransformer::TPtr GetLoadTransformer() const;
-
- std::unique_ptr<arrow::util::Codec> GetCompressionCodec() const;
-
- std::shared_ptr<TColumnLoader> GetLoader(const TIndexInfo& info) const;
-
-};
-
-} // namespace NKikimr::NOlap
+#include "scheme/column_features.h"
diff --git a/ydb/core/tx/columnshard/engines/columns_table.h b/ydb/core/tx/columnshard/engines/columns_table.h
index 4be4a052b2..37f07d3472 100644
--- a/ydb/core/tx/columnshard/engines/columns_table.h
+++ b/ydb/core/tx/columnshard/engines/columns_table.h
@@ -1,95 +1,12 @@
#pragma once
-#include "defs.h"
#include "db_wrapper.h"
+#include "portions/column_record.h"
#include <ydb/core/tx/columnshard/blob.h>
namespace NKikimr::NOlap {
-struct TColumnRecord {
- ui64 Granule;
- ui64 PlanStep; // {PlanStep, TxId} is min snapshot for {Granule, Portion}
- ui64 TxId;
- ui64 Portion; // Id of independent (overlayed by PK) portion of data in granule
- ui64 XPlanStep{0}; // {XPlanStep, XTxId} is snapshot where the blob has been removed (i.e. compacted into another one)
- ui64 XTxId{0};
- ui32 ColumnId{0};
- ui16 Chunk; // Number of blob for column ColumnName in Portion
- TBlobRange BlobRange;
- TString Metadata;
-
- std::optional<ui32> GetChunkRowsCount() const {
- return {};
- }
-
- bool operator == (const TColumnRecord& rec) const {
- return (Granule == rec.Granule) && (ColumnId == rec.ColumnId) &&
- (PlanStep == rec.PlanStep) && (TxId == rec.TxId) && (Portion == rec.Portion) && (Chunk == rec.Chunk);
- }
-
- TString SerializedBlobId() const {
- return BlobRange.BlobId.SerializeBinary();
- }
-
- bool Valid() const {
- return ValidExceptSnapshot() && ValidSnapshot();
- }
-
- bool ValidSnapshot() const {
- return PlanStep && TxId;
- }
-
- bool ValidExceptSnapshot() const {
- return Granule && ColumnId && Portion && ValidBlob();
- }
-
- bool ValidBlob() const {
- return BlobRange.BlobId.IsValid() && BlobRange.Size;
- }
-
- void SetSnapshot(const TSnapshot& snap) {
- Y_VERIFY(snap.Valid());
- PlanStep = snap.GetPlanStep();
- TxId = snap.GetTxId();
- }
-
- void SetXSnapshot(const TSnapshot& snap) {
- Y_VERIFY(snap.Valid());
- XPlanStep = snap.GetPlanStep();
- XTxId = snap.GetTxId();
- }
-
- static TColumnRecord Make(ui64 granule, ui32 columnId, const TSnapshot& minSnapshot, ui64 portion, ui16 chunk = 0) {
- TColumnRecord row;
- row.Granule = granule;
- row.PlanStep = minSnapshot.GetPlanStep();
- row.TxId = minSnapshot.GetTxId();
- row.Portion = portion;
- row.ColumnId = columnId;
- row.Chunk = chunk;
- //row.BlobId
- //row.Metadata
- return row;
- }
-
- friend IOutputStream& operator << (IOutputStream& out, const TColumnRecord& rec) {
- out << '{';
- out << 'g' << rec.Granule << 'p' << rec.Portion;
- if (rec.Chunk) {
- out << 'n' << rec.Chunk;
- }
- out << ',' << (i32)rec.ColumnId;
- out << ',' << rec.PlanStep << ':' << (rec.TxId == Max<ui64>() ? "max" : ToString(rec.TxId));
- if (rec.XPlanStep) {
- out << '-' << rec.XPlanStep << ':' << (rec.XTxId == Max<ui64>() ? "max" : ToString(rec.XTxId));
- }
- out << ',' << rec.BlobRange.ToString();
- out << '}';
- return out;
- }
-};
-
class TColumnsTable {
public:
TColumnsTable(ui32 indexId)
diff --git a/ydb/core/tx/columnshard/engines/defs.h b/ydb/core/tx/columnshard/engines/defs.h
index beb1880f7a..6c4e0371d7 100644
--- a/ydb/core/tx/columnshard/engines/defs.h
+++ b/ydb/core/tx/columnshard/engines/defs.h
@@ -3,6 +3,7 @@
#include <ydb/core/base/defs.h>
#include <ydb/core/base/logoblob.h>
#include <ydb/core/tx/ctor_logger.h>
+#include <ydb/core/tx/columnshard/common/snapshot.h>
namespace NKikimr::NOlap {
@@ -15,50 +16,6 @@ inline TWriteId operator++(TWriteId& w) noexcept {
return w;
}
-class TSnapshot {
-private:
- ui64 PlanStep = 0;
- ui64 TxId = 0;
-
-public:
- constexpr TSnapshot(const ui64 planStep, const ui64 txId) noexcept
- : PlanStep(planStep)
- , TxId(txId) {
- }
-
- constexpr ui64 GetPlanStep() const noexcept {
- return PlanStep;
- }
-
- constexpr ui64 GetTxId() const noexcept {
- return TxId;
- }
-
- constexpr bool IsZero() const noexcept {
- return PlanStep == 0 && TxId == 0;
- }
-
- constexpr bool Valid() const noexcept {
- return PlanStep && TxId;
- }
-
- static constexpr TSnapshot Zero() noexcept {
- return TSnapshot(0, 0);
- }
-
- static constexpr TSnapshot Max() noexcept {
- return TSnapshot(-1ll, -1ll);
- }
-
- constexpr bool operator==(const TSnapshot&) const noexcept = default;
-
- constexpr auto operator<=>(const TSnapshot&) const noexcept = default;
-
- friend IOutputStream& operator<<(IOutputStream& out, const TSnapshot& s) {
- return out << "{" << s.PlanStep << ':' << (s.TxId == std::numeric_limits<ui64>::max() ? "max" : ToString(s.TxId)) << "}";
- }
-};
-
class IBlobGroupSelector {
protected:
virtual ~IBlobGroupSelector() = default;
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index 2ca6b57cfa..80fbea3cee 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -1,430 +1 @@
#include "index_info.h"
-#include "column_engine.h"
-
-#include <ydb/core/formats/arrow/arrow_batch_builder.h>
-#include <ydb/core/formats/arrow/sort_cursor.h>
-#include <ydb/core/sys_view/common/schema.h>
-#include <ydb/core/formats/arrow/serializer/batch_only.h>
-#include <ydb/core/formats/arrow/transformer/dictionary.h>
-#include <ydb/core/formats/arrow/serializer/full.h>
-#include <ydb/core/base/appdata.h>
-
-namespace NKikimr::NOlap {
-
-const TString TIndexInfo::STORE_INDEX_STATS_TABLE = TString("/") + NSysView::SysPathName + "/" + NSysView::StorePrimaryIndexStatsName;
-const TString TIndexInfo::TABLE_INDEX_STATS_TABLE = TString("/") + NSysView::SysPathName + "/" + NSysView::TablePrimaryIndexStatsName;
-
-static std::vector<TString> NamesOnly(const std::vector<TNameTypeInfo>& columns) {
- std::vector<TString> out;
- out.reserve(columns.size());
- for (const auto& [name, _] : columns) {
- out.push_back(name);
- }
- return out;
-}
-
-TIndexInfo::TIndexInfo(const TString& name, ui32 id)
- : NTable::TScheme::TTableSchema()
- , Id(id)
- , Name(name)
-{}
-
-std::shared_ptr<arrow::RecordBatch> TIndexInfo::AddSpecialColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) {
- Y_VERIFY(batch);
- i64 numColumns = batch->num_columns();
- i64 numRows = batch->num_rows();
-
- auto res = batch->AddColumn(numColumns, arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()),
- NArrow::MakeUI64Array(snapshot.GetPlanStep(), numRows));
- Y_VERIFY(res.ok());
- res = (*res)->AddColumn(numColumns + 1, arrow::field(SPEC_COL_TX_ID, arrow::uint64()),
- NArrow::MakeUI64Array(snapshot.GetTxId(), numRows));
- Y_VERIFY(res.ok());
- Y_VERIFY((*res)->num_columns() == numColumns + 2);
- return *res;
-}
-
-std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchemaSnapshot() {
- static std::shared_ptr<arrow::Schema> result = std::make_shared<arrow::Schema>(arrow::FieldVector{
- arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()),
- arrow::field(SPEC_COL_TX_ID, arrow::uint64())
- });
- return result;
-}
-
-bool TIndexInfo::IsSpecialColumn(const arrow::Field& field) {
- return IsSpecialColumn(field.name());
-}
-
-bool TIndexInfo::IsSpecialColumn(const std::string& fieldName) {
- return fieldName == SPEC_COL_PLAN_STEP
- || fieldName == SPEC_COL_TX_ID;
-}
-
-bool TIndexInfo::IsSpecialColumn(const ui32 fieldId) {
- return fieldId == (ui32)ESpecialColumn::PLAN_STEP
- || fieldId == (ui32)ESpecialColumn::TX_ID;
-}
-
-ui32 TIndexInfo::GetColumnId(const std::string& name) const {
- auto id = GetColumnIdOptional(name);
- Y_VERIFY(!!id, "undefined column %s", name.data());
- return *id;
-}
-
-std::optional<ui32> TIndexInfo::GetColumnIdOptional(const std::string& name) const {
- const auto ni = ColumnNames.find(name);
-
- if (ni != ColumnNames.end()) {
- return ni->second;
- }
- if (name == SPEC_COL_PLAN_STEP) {
- return ui32(ESpecialColumn::PLAN_STEP);
- } else if (name == SPEC_COL_TX_ID) {
- return ui32(ESpecialColumn::TX_ID);
- }
- return {};
-}
-
-TString TIndexInfo::GetColumnName(ui32 id, bool required) const {
- if (ESpecialColumn(id) == ESpecialColumn::PLAN_STEP) {
- return SPEC_COL_PLAN_STEP;
- } else if (ESpecialColumn(id) == ESpecialColumn::TX_ID) {
- return SPEC_COL_TX_ID;
- } else {
- const auto ci = Columns.find(id);
-
- if (!required && ci == Columns.end()) {
- return {};
- }
-
- Y_VERIFY(ci != Columns.end());
- return ci->second.Name;
- }
-}
-
-std::vector<ui32> TIndexInfo::GetColumnIds() const {
- std::vector<ui32> result;
- for (auto&& i : Columns) {
- result.emplace_back(i.first);
- }
- result.emplace_back((ui32)ESpecialColumn::PLAN_STEP);
- result.emplace_back((ui32)ESpecialColumn::TX_ID);
- return result;
-}
-
-std::vector<TString> TIndexInfo::GetColumnNames(const std::vector<ui32>& ids) const {
- std::vector<TString> out;
- out.reserve(ids.size());
- for (ui32 id : ids) {
- const auto ci = Columns.find(id);
- Y_VERIFY(ci != Columns.end());
- out.push_back(ci->second.Name);
- }
- return out;
-}
-
-std::vector<TNameTypeInfo> TIndexInfo::GetColumns(const std::vector<ui32>& ids) const {
- return NOlap::GetColumns(*this, ids);
-}
-
-std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema() const {
- if (!Schema) {
- std::vector<ui32> ids;
- ids.reserve(Columns.size());
- for (const auto& [id, _] : Columns) {
- ids.push_back(id);
- }
-
- // The ids had a set type before so we keep them sorted.
- std::sort(ids.begin(), ids.end());
- Schema = MakeArrowSchema(Columns, ids);
- }
-
- return Schema;
-}
-
-std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchemaWithSpecials() const {
- if (SchemaWithSpecials) {
- return SchemaWithSpecials;
- }
-
- const auto& schema = ArrowSchema();
-
- std::vector<std::shared_ptr<arrow::Field>> extended;
- extended.reserve(schema->num_fields() + 3);
-
- // Place special fields at the beginning of the schema.
- extended.push_back(arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()));
- extended.push_back(arrow::field(SPEC_COL_TX_ID, arrow::uint64()));
- // Append fields from the regular schema afterward.
- extended.insert(extended.end(), schema->fields().begin(), schema->fields().end());
-
- SchemaWithSpecials = std::make_shared<arrow::Schema>(std::move(extended));
- return SchemaWithSpecials;
-}
-
-std::shared_ptr<arrow::Schema> TIndexInfo::AddColumns(
- const std::shared_ptr<arrow::Schema>& src,
- const std::vector<TString>& columns) const
-{
- std::shared_ptr<arrow::Schema> all = ArrowSchemaWithSpecials();
- auto fields = src->fields();
-
- for (const auto& col : columns) {
- const std::string name(col.data(), col.size());
- if (!src->GetFieldByName(name)) {
- auto field = all->GetFieldByName(name);
- if (!field) {
- return {};
- }
- fields.push_back(field);
- }
- }
- return std::make_shared<arrow::Schema>(std::move(fields));
-}
-
-std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const std::vector<ui32>& columnIds, bool withSpecials) const {
- return MakeArrowSchema(Columns, columnIds, withSpecials);
-}
-
-std::vector<ui32> TIndexInfo::GetColumnIds(const std::vector<TString>& columnNames) const {
- std::vector<ui32> ids;
- ids.reserve(columnNames.size());
- for (auto& name : columnNames) {
- auto columnId = GetColumnIdOptional(name);
- if (!columnId) {
- return {};
- }
- ids.emplace_back(*columnId);
- }
- return ids;
-}
-
-std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const std::vector<TString>& names) const {
- auto columnIds = GetColumnIds(names);
- if (columnIds.empty()) {
- return {};
- }
- return MakeArrowSchema(Columns, columnIds);
-}
-
-std::shared_ptr<arrow::Field> TIndexInfo::ArrowColumnField(ui32 columnId) const {
- auto it = ArrowColumnByColumnIdCache.find(columnId);
- if (it == ArrowColumnByColumnIdCache.end()) {
- it = ArrowColumnByColumnIdCache.emplace(columnId, ArrowSchema()->GetFieldByName(GetColumnName(columnId, true))).first;
- }
- return it->second;
-}
-
-void TIndexInfo::SetAllKeys() {
- /// @note Setting replace and sorting key to PK we are able to:
- /// * apply REPLACE by MergeSort
- /// * apply PK predicate before REPLACE
- const auto& primaryKeyNames = NamesOnly(GetPrimaryKey());
- // Update set of required columns with names from primary key.
- for (const auto& name: primaryKeyNames) {
- RequiredColumns.insert(name);
- }
-
- std::vector<std::shared_ptr<arrow::Field>> fields;
- if (primaryKeyNames.size()) {
- SortingKey = ArrowSchema(primaryKeyNames);
- ReplaceKey = SortingKey;
- fields = ReplaceKey->fields();
- if (CompositeIndexKey) {
- IndexKey = ReplaceKey;
- } else {
- IndexKey = std::make_shared<arrow::Schema>(arrow::FieldVector({ fields[0] }));
- }
- }
-
- fields.push_back(arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()));
- fields.push_back(arrow::field(SPEC_COL_TX_ID, arrow::uint64()));
- ExtendedKey = std::make_shared<arrow::Schema>(std::move(fields));
-
- for (const auto& [colId, column] : Columns) {
- if (NArrow::IsPrimitiveYqlType(column.PType)) {
- MinMaxIdxColumnsIds.insert(colId);
- }
- }
- MinMaxIdxColumnsIds.insert(GetPKFirstColumnId());
-}
-
-std::shared_ptr<NArrow::TSortDescription> TIndexInfo::SortDescription() const {
- if (GetSortingKey()) {
- auto key = GetExtendedKey(); // Sort with extended key, greater snapshot first
- Y_VERIFY(key && key->num_fields() > 2);
- auto description = std::make_shared<NArrow::TSortDescription>(key);
- description->Directions[key->num_fields() - 1] = -1;
- description->Directions[key->num_fields() - 2] = -1;
- description->NotNull = true; // TODO
- return description;
- }
- return {};
-}
-
-std::shared_ptr<NArrow::TSortDescription> TIndexInfo::SortReplaceDescription() const {
- if (GetSortingKey()) {
- auto key = GetExtendedKey(); // Sort with extended key, greater snapshot first
- Y_VERIFY(key && key->num_fields() > 2);
- auto description = std::make_shared<NArrow::TSortDescription>(key, GetReplaceKey());
- description->Directions[key->num_fields() - 1] = -1;
- description->Directions[key->num_fields() - 2] = -1;
- description->NotNull = true; // TODO
- return description;
- }
- return {};
-}
-
-bool TIndexInfo::AllowTtlOverColumn(const TString& name) const {
- auto it = ColumnNames.find(name);
- if (it == ColumnNames.end()) {
- return false;
- }
- return MinMaxIdxColumnsIds.contains(it->second);
-}
-
-TColumnSaver TIndexInfo::GetColumnSaver(const ui32 columnId, const TSaverContext& context) const {
- arrow::ipc::IpcWriteOptions options;
- options.use_threads = false;
-
- NArrow::NTransformation::ITransformer::TPtr transformer;
- std::unique_ptr<arrow::util::Codec> columnCodec;
- {
- auto it = ColumnFeatures.find(columnId);
- if (it != ColumnFeatures.end()) {
- transformer = it->second.GetSaveTransformer();
- columnCodec = it->second.GetCompressionCodec();
- }
- }
-
- if (context.GetExternalCompression()) {
- options.codec = context.GetExternalCompression()->BuildArrowCodec();
- } else if (columnCodec) {
- options.codec = std::move(columnCodec);
- } else if (DefaultCompression) {
- options.codec = DefaultCompression->BuildArrowCodec();
- } else {
- options.codec = NArrow::TCompression::BuildDefaultCodec();
- }
-
- if (!transformer) {
- return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TBatchPayloadSerializer>(options));
- } else {
- return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TFullDataSerializer>(options));
- }
-}
-
-TColumnFeatures& TIndexInfo::GetOrCreateColumnFeatures(const ui32 columnId) const {
- auto it = ColumnFeatures.find(columnId);
- if (it == ColumnFeatures.end()) {
- it = ColumnFeatures.emplace(columnId, TColumnFeatures(columnId)).first;
- }
- return it->second;
-}
-
-std::shared_ptr<TColumnLoader> TIndexInfo::GetColumnLoader(const ui32 columnId) const {
- TColumnFeatures& features = GetOrCreateColumnFeatures(columnId);
- return features.GetLoader(*this);
-}
-
-std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId) const {
- std::shared_ptr<arrow::Schema> schema = Schema;
- if (IsSpecialColumn(columnId)) {
- schema = ArrowSchemaSnapshot();
- }
- auto field = schema->GetFieldByName(GetColumnName(columnId));
- Y_VERIFY(field);
- std::vector<std::shared_ptr<arrow::Field>> fields = { field };
- return std::make_shared<arrow::Schema>(fields);
-}
-
-bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) {
- if (schema.GetEngine() != NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", "incorrect_engine_in_schema");
- return false;
- }
-
- for (const auto& col : schema.GetColumns()) {
- const ui32 id = col.GetId();
- const TString& name = col.GetName();
- auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(),
- col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr);
- Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod);
- ColumnNames[name] = id;
- std::optional<TColumnFeatures> cFeatures = TColumnFeatures::BuildFromProto(col, id);
- if (!cFeatures) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_column_feature");
- return false;
- }
- ColumnFeatures.emplace(id, *cFeatures);
- }
-
- for (const auto& keyName : schema.GetKeyColumnNames()) {
- Y_VERIFY(ColumnNames.contains(keyName));
- KeyColumns.push_back(ColumnNames[keyName]);
- }
-
- if (schema.HasDefaultCompression()) {
- auto result = NArrow::TCompression::BuildFromProto(schema.GetDefaultCompression());
- if (!result) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", result.GetErrorMessage());
- return false;
- }
- DefaultCompression = *result;
- }
-
- CompositeMarks = schema.GetCompositeMarks();
- CompositeIndexKey = AppData()->FeatureFlags.GetForceColumnTablesCompositeMarks() ? true : CompositeMarks;
- return true;
-}
-
-bool TIndexInfo::CheckAlterScheme(const NKikimrSchemeOp::TColumnTableSchema& scheme) const {
- return CompositeMarks == scheme.GetCompositeMarks();
-}
-
-std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials) {
- std::vector<std::shared_ptr<arrow::Field>> fields;
- fields.reserve(withSpecials ? ids.size() + 2 : ids.size());
-
- if (withSpecials) {
- // Place special fields at the beginning of the schema.
- fields.push_back(arrow::field(TIndexInfo::SPEC_COL_PLAN_STEP, arrow::uint64()));
- fields.push_back(arrow::field(TIndexInfo::SPEC_COL_TX_ID, arrow::uint64()));
- }
-
- for (const ui32 id: ids) {
- auto it = columns.find(id);
- if (it == columns.end()) {
- continue;
- }
-
- const auto& column = it->second;
- std::string colName(column.Name.data(), column.Name.size());
- fields.emplace_back(std::make_shared<arrow::Field>(colName, NArrow::GetArrowType(column.PType)));
- }
-
- return std::make_shared<arrow::Schema>(std::move(fields));
-}
-
-std::vector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const std::vector<ui32>& ids) {
- std::vector<std::pair<TString, NScheme::TTypeInfo>> out;
- out.reserve(ids.size());
- for (const ui32 id : ids) {
- const auto ci = tableSchema.Columns.find(id);
- Y_VERIFY(ci != tableSchema.Columns.end());
- out.emplace_back(ci->second.Name, ci->second.PType);
- }
- return out;
-}
-
-std::optional<TIndexInfo> TIndexInfo::BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) {
- TIndexInfo result("", 0);
- if (!result.DeserializeFromProto(schema)) {
- return std::nullopt;
- }
- return result;
-}
-
-} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 33f98892d1..382c410d92 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -1,202 +1,3 @@
#pragma once
-#include "column_features.h"
-#include "defs.h"
-#include "scalars.h"
-#include "tier_info.h"
-
-#include <ydb/core/sys_view/common/schema.h>
-#include <ydb/core/tablet_flat/flat_dbase_scheme.h>
-#include <ydb/core/formats/arrow/dictionary/object.h>
-#include <ydb/core/formats/arrow/serializer/abstract.h>
-#include <ydb/core/formats/arrow/transformer/abstract.h>
-#include <ydb/core/scheme/scheme_types_proto.h>
-
-namespace arrow {
- class Array;
- class Field;
- class Schema;
-}
-
-namespace NKikimr::NArrow {
- struct TSortDescription;
-}
-
-namespace NKikimr::NOlap {
-
-struct TInsertedData;
-class TSnapshotColumnInfo;
-using TNameTypeInfo = std::pair<TString, NScheme::TTypeInfo>;
-
-/// Column engine index description in terms of tablet's local table.
-/// We have to use YDB types for keys here.
-struct TIndexInfo : public NTable::TScheme::TTableSchema {
-private:
- mutable THashMap<ui32, TColumnFeatures> ColumnFeatures;
- mutable THashMap<ui32, std::shared_ptr<arrow::Field>> ArrowColumnByColumnIdCache;
- TIndexInfo(const TString& name, ui32 id);
- bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema);
- TColumnFeatures& GetOrCreateColumnFeatures(const ui32 columnId) const;
-public:
- static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step";
- static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id";
- static const TString STORE_INDEX_STATS_TABLE;
- static const TString TABLE_INDEX_STATS_TABLE;
-
- enum class ESpecialColumn : ui32 {
- PLAN_STEP = 0xffffff00,
- TX_ID,
- };
-
- /// Appends the special columns to the batch.
- static std::shared_ptr<arrow::RecordBatch> AddSpecialColumns(
- const std::shared_ptr<arrow::RecordBatch>& batch,
- const TSnapshot& snapshot);
-
- /// Makes schema as set of the special columns.
- static std::shared_ptr<arrow::Schema> ArrowSchemaSnapshot();
-
- /// Matches name of the filed with names of the special columns.
- static bool IsSpecialColumn(const arrow::Field& field);
- static bool IsSpecialColumn(const ui32 field);
- static ui32 GetSpecialColumnByteWidth(const ui32 field) {
- Y_VERIFY(IsSpecialColumn(field));
- return 8;
- }
- static bool IsSpecialColumn(const std::string& fieldName);
- template <class TContainer>
- static bool IsSpecialColumns(const TContainer& c) {
- for (auto&& i : c) {
- if (!IsSpecialColumn(i)) {
- return false;
- }
- }
- return true;
- }
-
- bool CheckAlterScheme(const NKikimrSchemeOp::TColumnTableSchema& scheme) const;
-public:
-
- static TIndexInfo BuildDefault() {
- TIndexInfo result("dummy", 0);
- return result;
- }
-
- static std::optional<TIndexInfo> BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema);
-
- /// Returns id of the index.
- ui32 GetId() const noexcept {
- return Id;
- }
-
- std::shared_ptr<arrow::Schema> GetColumnSchema(const ui32 columnId) const;
- TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const;
- std::shared_ptr<TColumnLoader> GetColumnLoader(const ui32 columnId) const;
-
- /// Returns an id of the column located by name. The name should exists in the schema.
- ui32 GetColumnId(const std::string& name) const;
- std::optional<ui32> GetColumnIdOptional(const std::string& name) const;
-
- /// Returns a name of the column located by id.
- TString GetColumnName(ui32 id, bool required = true) const;
-
- /// Returns names of columns defined by the specific ids.
- std::vector<TString> GetColumnNames(const std::vector<ui32>& ids) const;
- std::vector<ui32> GetColumnIds() const;
-
- /// Returns info of columns defined by specific ids.
- std::vector<TNameTypeInfo> GetColumns(const std::vector<ui32>& ids) const;
-
- /// Traditional Primary Key (includes uniqueness, search and sorting logic)
- std::vector<TNameTypeInfo> GetPrimaryKey() const {
- return GetColumns(KeyColumns);
- }
-
- /// Returns id of the first column of the primary key.
- ui32 GetPKFirstColumnId() const {
- Y_VERIFY(KeyColumns.size());
- return KeyColumns[0];
- }
-
- // Sorting key: could be less or greater then traditional PK
- // It could be empty for append-only tables. It could be greater then PK for better columns compression.
- // If sorting key includes uniqueness key as a prefix we are able to use MergeSort for REPLACE.
- const std::shared_ptr<arrow::Schema>& GetSortingKey() const { return SortingKey; }
- const std::shared_ptr<arrow::Schema>& GetReplaceKey() const { return ReplaceKey; }
- const std::shared_ptr<arrow::Schema>& GetExtendedKey() const { return ExtendedKey; }
- const std::shared_ptr<arrow::Schema>& GetIndexKey() const { return IndexKey; }
-
- /// Initializes sorting, replace, index and extended keys.
- void SetAllKeys();
-
- void CheckTtlColumn(const TString& ttlColumn) const {
- Y_VERIFY(!ttlColumn.empty());
- Y_VERIFY(MinMaxIdxColumnsIds.contains(GetColumnId(ttlColumn)));
- }
-
- std::vector<ui32> GetColumnIds(const std::vector<TString>& columnNames) const;
-
- std::shared_ptr<arrow::Schema> ArrowSchema() const;
- std::shared_ptr<arrow::Schema> ArrowSchemaWithSpecials() const;
- std::shared_ptr<arrow::Schema> AddColumns(const std::shared_ptr<arrow::Schema>& schema,
- const std::vector<TString>& columns) const;
-
- std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<ui32>& columnIds, bool withSpecials = false) const;
- std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<TString>& columnNames) const;
- std::shared_ptr<arrow::Field> ArrowColumnField(ui32 columnId) const;
-
- const THashSet<TString>& GetRequiredColumns() const {
- return RequiredColumns;
- }
-
- const THashSet<ui32>& GetMinMaxIdxColumns() const {
- return MinMaxIdxColumnsIds;
- }
-
- bool AllowTtlOverColumn(const TString& name) const;
-
- /// Returns whether the sorting keys defined.
- bool IsSorted() const { return SortingKey.get(); }
-
- /// Returns whether the replace keys defined.
- bool IsReplacing() const { return ReplaceKey.get(); }
-
- bool IsCompositeIndexKey() const {
- return CompositeIndexKey;
- }
-
- std::shared_ptr<NArrow::TSortDescription> SortDescription() const;
- std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription() const;
-
- static const std::vector<std::string>& GetSpecialColumnNames() {
- static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) };
- return result;
- }
-
- static const std::vector<ui32>& GetSpecialColumnIds() {
- static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID };
- return result;
- }
-
-private:
- ui32 Id;
- TString Name;
- bool CompositeIndexKey = false;
- mutable std::shared_ptr<arrow::Schema> Schema;
- mutable std::shared_ptr<arrow::Schema> SchemaWithSpecials;
- std::shared_ptr<arrow::Schema> SortingKey;
- std::shared_ptr<arrow::Schema> ReplaceKey;
- std::shared_ptr<arrow::Schema> ExtendedKey; // Extend PK with snapshot columns to allow old shapshot reads
- std::shared_ptr<arrow::Schema> IndexKey;
- THashSet<TString> RequiredColumns;
- THashSet<ui32> MinMaxIdxColumnsIds;
- std::optional<NArrow::TCompression> DefaultCompression;
- bool CompositeMarks = false;
-};
-
-std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials = false);
-
-/// Extracts columns with the specific ids from the schema.
-std::vector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const std::vector<ui32>& ids);
-
-} // namespace NKikimr::NOlap
+#include "scheme/index_info.h"
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index b20a716f9e..9b11963e99 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -1,262 +1,5 @@
#include "portion_info.h"
-#include <ydb/core/protos/tx_columnshard.pb.h>
-#include <ydb/core/formats/arrow/arrow_filter.h>
namespace NKikimr::NOlap {
-TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array,
- const std::shared_ptr<arrow::Field>& field,
- const TColumnSaver saver) {
- auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{ field });
- auto batch = arrow::RecordBatch::Make(schema, array->length(), { array });
- Y_VERIFY(batch);
-
- return saver.Apply(batch);
-}
-
-void TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record) {
- record.Chunk = 0;
- Records.emplace_back(std::move(record));
-}
-
-void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) {
- Y_VERIFY(column->length());
-
- std::pair<int, int> minMaxPos = {0, (column->length() - 1)};
- if (!sorted) {
- minMaxPos = NArrow::FindMinMaxPosition(column);
- }
-
- Y_VERIFY(minMaxPos.first >= 0);
- Y_VERIFY(minMaxPos.second >= 0);
-
- Meta.ColumnMeta[columnId].Min = NArrow::GetScalar(column, minMaxPos.first);
- Meta.ColumnMeta[columnId].Max = NArrow::GetScalar(column, minMaxPos.second);
-}
-
-void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
- const TString& tierName) {
- const auto& indexInfo = snapshotSchema.GetIndexInfo();
- const auto& minMaxColumns = indexInfo.GetMinMaxIdxColumns();
-
- TierName = tierName;
- Meta = {};
- Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId();
-
- // Copy first and last key rows into new batch to free source batch's memory
- {
- auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey());
- std::vector<bool> bits(batch->num_rows(), false);
- bits[0] = true;
- bits[batch->num_rows() - 1] = true; // it colud be 0 if batch has one row
-
- auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows());
- auto res = arrow::compute::Filter(keyBatch, filter);
- Y_VERIFY(res.ok());
-
- Meta.ReplaceKeyEdges = res->record_batch();
- Y_VERIFY(Meta.ReplaceKeyEdges->num_rows() == 1 || Meta.ReplaceKeyEdges->num_rows() == 2);
- }
-
- auto edgesBatch = NArrow::ExtractColumns(Meta.ReplaceKeyEdges, indexInfo.GetIndexKey());
- Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0);
- Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1);
-
- /// @note It does not add RawBytes info for snapshot columns, only for user ones.
- for (auto& [columnId, col] : indexInfo.Columns) {
- const auto& columnName = col.Name;
- auto column = batch->GetColumnByName(col.Name);
- Y_VERIFY(column);
- Meta.ColumnMeta[columnId].NumRows = column->length();
- Meta.ColumnMeta[columnId].RawBytes = NArrow::GetArrayDataSize(column);
-
- if (minMaxColumns.contains(columnId)) {
- auto column = batch->GetColumnByName(columnName);
- Y_VERIFY(column);
-
- bool isSorted = (columnId == Meta.FirstPkColumn);
- AddMinMax(columnId, column, isSorted);
- Y_VERIFY(Meta.HasMinMax(columnId));
- }
- }
-}
-
-TString TPortionInfo::GetMetadata(const TColumnRecord& rec) const {
- NKikimrTxColumnShard::TIndexColumnMeta meta; // TODO: move proto serialization out of engines folder
- if (Meta.ColumnMeta.contains(rec.ColumnId)) {
- const auto& columnMeta = Meta.ColumnMeta.find(rec.ColumnId)->second;
- if (auto numRows = columnMeta.NumRows) {
- meta.SetNumRows(numRows);
- }
- if (auto rawBytes = columnMeta.RawBytes) {
- meta.SetRawBytes(rawBytes);
- }
- if (columnMeta.HasMinMax()) {
- ScalarToConstant(*columnMeta.Min, *meta.MutableMinValue());
- ScalarToConstant(*columnMeta.Max, *meta.MutableMaxValue());
- }
- }
-
- if (rec.ColumnId == Meta.FirstPkColumn) {
- auto* portionMeta = meta.MutablePortionMeta();
-
- switch (Meta.Produced) {
- case TPortionMeta::UNSPECIFIED:
- Y_VERIFY(false);
- case TPortionMeta::INSERTED:
- portionMeta->SetIsInserted(true);
- break;
- case TPortionMeta::COMPACTED:
- portionMeta->SetIsCompacted(true);
- break;
- case TPortionMeta::SPLIT_COMPACTED:
- portionMeta->SetIsSplitCompacted(true);
- break;
- case TPortionMeta::EVICTED:
- portionMeta->SetIsEvicted(true);
- break;
- case TPortionMeta::INACTIVE:
- Y_FAIL("Unexpected inactive case");
- //portionMeta->SetInactive(true);
- break;
- }
-
- if (!TierName.empty()) {
- portionMeta->SetTierName(TierName);
- }
-
- if (const auto& keyEdgesBatch = Meta.ReplaceKeyEdges) {
- Y_VERIFY(keyEdgesBatch);
- Y_VERIFY_DEBUG(keyEdgesBatch->ValidateFull().ok());
- Y_VERIFY(keyEdgesBatch->num_rows() == 1 || keyEdgesBatch->num_rows() == 2);
- portionMeta->SetPrimaryKeyBorders(NArrow::SerializeBatchNoCompression(keyEdgesBatch));
- }
- }
-
- TString out;
- Y_PROTOBUF_SUPPRESS_NODISCARD meta.SerializeToString(&out);
- return out;
-}
-
-void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec) {
- if (rec.Metadata.empty()) {
- return;
- }
-
- NKikimrTxColumnShard::TIndexColumnMeta meta;
- bool ok = meta.ParseFromString(rec.Metadata);
- Y_VERIFY(ok);
-
- Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId();
- auto field = indexInfo.ArrowColumnField(rec.ColumnId);
- const bool compositeIndexKey = indexInfo.IsCompositeIndexKey();
-
- if (meta.HasPortionMeta()) {
- Y_VERIFY_DEBUG(rec.ColumnId == Meta.FirstPkColumn);
-
- auto& portionMeta = meta.GetPortionMeta();
- TierName = portionMeta.GetTierName();
-
- if (portionMeta.GetIsInserted()) {
- Meta.Produced = TPortionMeta::INSERTED;
- } else if (portionMeta.GetIsCompacted()) {
- Meta.Produced = TPortionMeta::COMPACTED;
- } else if (portionMeta.GetIsSplitCompacted()) {
- Meta.Produced = TPortionMeta::SPLIT_COMPACTED;
- } else if (portionMeta.GetIsEvicted()) {
- Meta.Produced = TPortionMeta::EVICTED;
- }
-
- if (portionMeta.HasPrimaryKeyBorders()) {
- Meta.ReplaceKeyEdges = NArrow::DeserializeBatch(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey());
- Y_VERIFY(Meta.ReplaceKeyEdges);
- Y_VERIFY_DEBUG(Meta.ReplaceKeyEdges->ValidateFull().ok());
- Y_VERIFY(Meta.ReplaceKeyEdges->num_rows() == 1 || Meta.ReplaceKeyEdges->num_rows() == 2);
-
- if (compositeIndexKey) {
- auto edgesBatch = NArrow::ExtractColumns(Meta.ReplaceKeyEdges, indexInfo.GetIndexKey());
- Y_VERIFY(edgesBatch);
- Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0);
- Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1);
- }
- }
- }
- if (meta.HasNumRows()) {
- Meta.ColumnMeta[rec.ColumnId].NumRows = meta.GetNumRows();
- }
- if (meta.HasRawBytes()) {
- Meta.ColumnMeta[rec.ColumnId].RawBytes = meta.GetRawBytes();
- }
- if (meta.HasMinValue()) {
- auto scalar = ConstantToScalar(meta.GetMinValue(), field->type());
- Meta.ColumnMeta[rec.ColumnId].Min = scalar;
-
- // Restore Meta.IndexKeyStart for one column IndexKey
- if (!compositeIndexKey && rec.ColumnId == Meta.FirstPkColumn) {
- Meta.IndexKeyStart = NArrow::TReplaceKey::FromScalar(scalar);
- }
- }
- if (meta.HasMaxValue()) {
- auto scalar = ConstantToScalar(meta.GetMaxValue(), field->type());
- Meta.ColumnMeta[rec.ColumnId].Max = scalar;
-
- // Restore Meta.IndexKeyEnd for one column IndexKey
- if (!compositeIndexKey && rec.ColumnId == Meta.FirstPkColumn) {
- Meta.IndexKeyEnd = NArrow::TReplaceKey::FromScalar(scalar);
- }
- }
-
- // Portion genarated without PrimaryKeyBorders and loaded with indexInfo.IsCompositeIndexKey()
- // We should have no such portions for ForceColumnTablesCompositeMarks feature
- if (rec.ColumnId == Meta.FirstPkColumn) {
- Y_VERIFY(Meta.IndexKeyStart && Meta.IndexKeyEnd);
- }
-}
-
-std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const {
- if (!Meta.ColumnMeta.contains(columnId)) {
- return {};
- }
- return Meta.ColumnMeta.find(columnId)->second.Min;
-}
-
-std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const {
- if (!Meta.ColumnMeta.contains(columnId)) {
- return {};
- }
- return Meta.ColumnMeta.find(columnId)->second.Max;
-}
-
-std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const {
- Y_VERIFY(!Blobs.empty());
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- batches.reserve(Blobs.size());
- for (auto& blob : Blobs) {
- batches.push_back(blob.BuildRecordBatch(*Loader));
- Y_VERIFY(batches.back());
- }
-
- auto res = arrow::Table::FromRecordBatches(batches);
- Y_VERIFY_S(res.ok(), res.status().message());
- return (*res)->column(0);
-}
-
-std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const {
- std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
- std::vector< std::shared_ptr<arrow::Field>> fields;
- for (auto&& i : Columns) {
- if (!options.IsAcceptedColumn(i.GetColumnId())) {
- continue;
- }
- columns.emplace_back(i.Assemble());
- fields.emplace_back(i.GetField());
- }
-
- auto table = arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns);
- auto res = table->CombineChunks();
- Y_VERIFY(res.ok());
- return NArrow::ToBatch(*res);
-}
-
}
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 12abed9b25..673e4f6c0b 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -1,520 +1,7 @@
#pragma once
-#include "defs.h"
-#include "columns_table.h"
-#include "index_info.h"
-
-#include <ydb/core/formats/arrow/replace_key.h>
-#include <ydb/core/formats/arrow/serializer/abstract.h>
-#include <ydb/core/formats/arrow/dictionary/conversion.h>
-#include <ydb/core/tx/columnshard/counters/indexation.h>
-#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
+#include "portions/portion_info.h"
namespace NKikimr::NOlap {
-struct TPortionMeta {
- // NOTE: These values are persisted in LocalDB so they must be stable
- enum EProduced : ui32 {
- UNSPECIFIED = 0,
- INSERTED = 1,
- COMPACTED = 2,
- SPLIT_COMPACTED = 3,
- INACTIVE = 4,
- EVICTED = 5,
- };
-
- struct TColumnMeta {
- ui32 NumRows{0};
- ui32 RawBytes{0};
- std::shared_ptr<arrow::Scalar> Min;
- std::shared_ptr<arrow::Scalar> Max;
-
- bool HasMinMax() const noexcept {
- return Min.get() && Max.get();
- }
- };
-
- EProduced Produced{UNSPECIFIED};
- THashMap<ui32, TColumnMeta> ColumnMeta;
- ui32 FirstPkColumn = 0;
- std::shared_ptr<arrow::RecordBatch> ReplaceKeyEdges; // first and last PK rows
- std::optional<NArrow::TReplaceKey> IndexKeyStart;
- std::optional<NArrow::TReplaceKey> IndexKeyEnd;
-
- bool HasMinMax(ui32 columnId) const {
- if (!ColumnMeta.contains(columnId)) {
- return false;
- }
- return ColumnMeta.find(columnId)->second.HasMinMax();
- }
-
- bool HasPkMinMax() const {
- return HasMinMax(FirstPkColumn);
- }
-
- ui32 NumRows() const {
- if (FirstPkColumn) {
- Y_VERIFY(ColumnMeta.contains(FirstPkColumn));
- return ColumnMeta.find(FirstPkColumn)->second.NumRows;
- }
- return 0;
- }
-
- friend IOutputStream& operator << (IOutputStream& out, const TPortionMeta& info) {
- out << "reason" << (ui32)info.Produced;
- for (const auto& [_, meta] : info.ColumnMeta) {
- if (meta.NumRows) {
- out << " " << meta.NumRows << " rows";
- break;
- }
- }
- return out;
- }
-};
-
-class TPortionAddress {
-private:
- YDB_READONLY(ui64, GranuleId, 0);
- YDB_READONLY(ui64, PortionId, 0);
-public:
- TPortionAddress(const ui64 granuleId, const ui64 portionId)
- : GranuleId(granuleId)
- , PortionId(portionId)
- {
-
- }
-
- bool operator<(const TPortionAddress& item) const {
- return std::tie(GranuleId, PortionId) < std::tie(item.GranuleId, item.PortionId);
- }
-
- bool operator==(const TPortionAddress& item) const {
- return std::tie(GranuleId, PortionId) == std::tie(item.GranuleId, item.PortionId);
- }
-};
-
-struct TPortionInfo {
- static constexpr const ui32 BLOB_BYTES_LIMIT = 8 * 1024 * 1024;
-
- std::vector<TColumnRecord> Records;
- TPortionMeta Meta;
- TString TierName;
-
- bool Empty() const { return Records.empty(); }
- bool Produced() const { return Meta.Produced != TPortionMeta::UNSPECIFIED; }
- bool Valid() const { return !Empty() && Produced() && Meta.HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; }
- bool IsInserted() const { return Meta.Produced == TPortionMeta::INSERTED; }
- bool IsEvicted() const { return Meta.Produced == TPortionMeta::EVICTED; }
- bool CanHaveDups() const { return !Produced(); /* || IsInserted(); */ }
- bool CanIntersectOthers() const { return !Valid() || IsInserted() || IsEvicted(); }
- size_t NumRecords() const { return Records.size(); }
-
- bool CheckForCleanup(const TSnapshot& snapshot) const {
- if (!CheckForCleanup()) {
- return false;
- }
-
- return GetXSnapshot() < snapshot;
- }
-
- bool CheckForCleanup() const {
- return !IsActive();
- }
-
- bool AllowEarlyFilter() const {
- return Meta.Produced == TPortionMeta::COMPACTED
- || Meta.Produced == TPortionMeta::SPLIT_COMPACTED;
- }
-
- bool EvictReady(size_t hotSize) const {
- return Meta.Produced == TPortionMeta::COMPACTED
- || Meta.Produced == TPortionMeta::SPLIT_COMPACTED
- || Meta.Produced == TPortionMeta::EVICTED
- || (Meta.Produced == TPortionMeta::INSERTED && BlobsSizes().first >= hotSize);
- }
-
- ui64 Portion() const {
- Y_VERIFY(!Empty());
- auto& rec = Records[0];
- return rec.Portion;
- }
-
- ui64 Granule() const {
- Y_VERIFY(!Empty());
- auto& rec = Records[0];
- return rec.Granule;
- }
-
- TPortionAddress GetAddress() const {
- Y_VERIFY(!Empty());
- auto& rec = Records[0];
- return TPortionAddress(rec.Granule, rec.Portion);
- }
-
- void SetGranule(ui64 granule) {
- for (auto& rec : Records) {
- rec.Granule = granule;
- }
- }
-
- TSnapshot GetSnapshot() const {
- Y_VERIFY(!Empty());
- auto& rec = Records[0];
- return TSnapshot(rec.PlanStep, rec.TxId);
- }
-
- TSnapshot GetXSnapshot() const {
- Y_VERIFY(!Empty());
- auto& rec = Records[0];
- return TSnapshot(rec.XPlanStep, rec.XTxId);
- }
-
- bool IsActive() const {
- return GetXSnapshot().IsZero();
- }
-
- std::pair<ui32, ui32> BlobsSizes() const {
- ui32 sum = 0;
- ui32 max = 0;
- for (const auto& rec : Records) {
- sum += rec.BlobRange.Size;
- max = Max(max, rec.BlobRange.Size);
- }
- return {sum, max};
- }
-
- ui64 BlobsBytes() const noexcept {
- ui64 sum = 0;
- for (const auto& rec : Records) {
- sum += rec.BlobRange.Size;
- }
- return sum;
- }
-
- void UpdateRecords(ui64 portion, const THashMap<ui64, ui64>& granuleRemap) {
- for (auto& rec : Records) {
- rec.Portion = portion;
- }
- if (!granuleRemap.empty()) {
- for (auto& rec : Records) {
- Y_VERIFY(granuleRemap.contains(rec.Granule));
- rec.Granule = granuleRemap.find(rec.Granule)->second;
- }
- }
- }
-
- void UpdateRecordsMeta(TPortionMeta::EProduced produced) {
- Meta.Produced = produced;
- for (auto& record : Records) {
- record.Metadata = GetMetadata(record);
- }
- }
-
- void SetStale(const TSnapshot& snapshot) {
- for (auto& rec : Records) {
- rec.SetXSnapshot(snapshot);
- }
- }
-
- void AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec) {
- Records.push_back(rec);
- LoadMetadata(indexInfo, rec);
- }
-
- TString GetMetadata(const TColumnRecord& rec) const;
- void LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec);
- void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
- const TString& tierName);
- void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted);
-
- std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const;
- std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const;
-
- const NArrow::TReplaceKey& IndexKeyStart() const {
- Y_VERIFY(Meta.IndexKeyStart);
- return *Meta.IndexKeyStart;
- }
-
- const NArrow::TReplaceKey& IndexKeyEnd() const {
- Y_VERIFY(Meta.IndexKeyEnd);
- return *Meta.IndexKeyEnd;
- }
-
- ui32 NumRows() const {
- return Meta.NumRows();
- }
-
- ui64 GetRawBytes(const std::vector<ui32>& columnIds) const {
- ui64 sum = 0;
- const ui32 numRows = NumRows();
- for (auto&& i : columnIds) {
- if (TIndexInfo::IsSpecialColumn(i)) {
- sum += numRows * TIndexInfo::GetSpecialColumnByteWidth(i);
- } else {
- auto it = Meta.ColumnMeta.find(i);
- if (it != Meta.ColumnMeta.end()) {
- sum += it->second.RawBytes;
- }
- }
- }
- return sum;
- }
-
- ui64 RawBytesSum() const {
- ui64 sum = 0;
- for (auto& [columnId, colMeta] : Meta.ColumnMeta) {
- sum += colMeta.RawBytes;
- }
- return sum;
- }
-
-private:
- class TMinGetter {
- public:
- static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) {
- return portionInfo.MinValue(columnId);
- }
- };
-
- class TMaxGetter {
- public:
- static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) {
- return portionInfo.MaxValue(columnId);
- }
- };
-
- template <class TSelfGetter, class TItemGetter = TSelfGetter>
- int CompareByColumnIdsImpl(const TPortionInfo& item, const std::vector<ui32>& columnIds) const {
- for (auto&& i : columnIds) {
- std::shared_ptr<arrow::Scalar> valueSelf = TSelfGetter::Get(*this, i);
- std::shared_ptr<arrow::Scalar> valueItem = TItemGetter::Get(item, i);
- if (!!valueSelf && !!valueItem) {
- const int cmpResult = NArrow::ScalarCompare(valueSelf, valueItem);
- if (cmpResult) {
- return cmpResult;
- }
- } else if (!!valueSelf) {
- return 1;
- } else if (!!valueItem) {
- return -1;
- }
- }
- return 0;
- }
-public:
- int CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const {
- return CompareByColumnIdsImpl<TMaxGetter, TMinGetter>(item, info.KeyColumns);
- }
-
- int CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const {
- return CompareMinByColumnIds(item, info.KeyColumns);
- }
-
- int CompareMinByColumnIds(const TPortionInfo& item, const std::vector<ui32>& columnIds) const {
- return CompareByColumnIdsImpl<TMinGetter>(item, columnIds);
- }
-
- class TAssembleBlobInfo {
- private:
- ui32 NullRowsCount = 0;
- TString Data;
- public:
- TAssembleBlobInfo(const ui32 rowsCount)
- : NullRowsCount(rowsCount) {
-
- }
-
- TAssembleBlobInfo(const TString& data)
- : Data(data) {
-
- }
-
- ui32 GetNullRowsCount() const noexcept {
- return NullRowsCount;
- }
-
- const TString& GetData() const noexcept {
- return Data;
- }
-
- std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(const TColumnLoader& loader) const {
- if (NullRowsCount) {
- Y_VERIFY(!Data);
- return NArrow::MakeEmptyBatch(loader.GetExpectedSchema(), NullRowsCount);
- } else {
- auto result = loader.Apply(Data);
- if (!result.ok()) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "cannot unpack batch")("error", result.status().ToString())("loader", loader.DebugString());
- return nullptr;
- }
- return *result;
- }
- }
- };
-
- class TPreparedColumn {
- private:
- std::shared_ptr<TColumnLoader> Loader;
- std::vector<TAssembleBlobInfo> Blobs;
- public:
- ui32 GetColumnId() const {
- return Loader->GetColumnId();
- }
-
- const std::string& GetName() const {
- return Loader->GetExpectedSchema()->field(0)->name();
- }
-
- std::shared_ptr<arrow::Field> GetField() const {
- return Loader->GetExpectedSchema()->field(0);
- }
-
- TPreparedColumn(std::vector<TAssembleBlobInfo>&& blobs, const std::shared_ptr<TColumnLoader>& loader)
- : Loader(loader)
- , Blobs(std::move(blobs))
- {
- Y_VERIFY(Loader);
- Y_VERIFY(Loader->GetExpectedSchema()->num_fields() == 1);
- }
-
- std::shared_ptr<arrow::ChunkedArray> Assemble() const;
- };
-
- class TPreparedBatchData {
- private:
- std::vector<TPreparedColumn> Columns;
- std::shared_ptr<arrow::Schema> Schema;
- size_t RowsCount = 0;
-
- public:
- struct TAssembleOptions {
- std::optional<std::set<ui32>> IncludedColumnIds;
- std::optional<std::set<ui32>> ExcludedColumnIds;
-
- bool IsAcceptedColumn(const ui32 columnId) const {
- if (IncludedColumnIds && !IncludedColumnIds->contains(columnId)) {
- return false;
- }
- if (ExcludedColumnIds && ExcludedColumnIds->contains(columnId)) {
- return false;
- }
- return true;
- }
- };
-
- std::vector<std::string> GetSchemaColumnNames() const {
- return Schema->field_names();
- }
-
- size_t GetColumnsCount() const {
- return Columns.size();
- }
-
- size_t GetRowsCount() const {
- return RowsCount;
- }
-
- TPreparedBatchData(std::vector<TPreparedColumn>&& columns, std::shared_ptr<arrow::Schema> schema, const size_t rowsCount)
- : Columns(std::move(columns))
- , Schema(schema)
- , RowsCount(rowsCount)
- {
- }
-
- std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = {}) const;
- };
-
- template <class TExternalBlobInfo>
- TPreparedBatchData PrepareForAssemble(const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema,
- const THashMap<TBlobRange, TExternalBlobInfo>& blobsData) const {
- std::vector<TPreparedColumn> columns;
- columns.reserve(resultSchema.GetSchema()->num_fields());
-
- Y_VERIFY(!Meta.ColumnMeta.empty());
- const ui32 rowsCount = Meta.ColumnMeta.begin()->second.NumRows;
- for (auto&& field : resultSchema.GetSchema()->fields()) {
- columns.emplace_back(TPreparedColumn({ TAssembleBlobInfo(rowsCount) }, resultSchema.GetColumnLoader(field->name())));
- }
-
- TMap<size_t, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks
- TMap<size_t, size_t> positionsMap;
-
- for (auto& rec : Records) {
- auto resulPos = resultSchema.GetFieldIndex(rec.ColumnId);
- if (resulPos < 0) {
- continue;
- }
- auto pos = dataSchema.GetFieldIndex(rec.ColumnId);
- Y_ASSERT(pos >= 0);
- positionsMap[resulPos] = pos;
- columnChunks[resulPos][rec.Chunk] = rec.BlobRange;
- auto columnMeta = Meta.ColumnMeta.FindPtr(rec.ColumnId);
- if (columnMeta) {
- Y_VERIFY_S(rowsCount == columnMeta->NumRows, TStringBuilder() << "Inconsistent rows " << rowsCount << "/" << columnMeta->NumRows);
- }
- }
-
- // Make chunked arrays for columns
- for (auto& [pos, orderedChunks] : columnChunks) {
- Y_VERIFY(positionsMap.contains(pos));
- size_t dataPos = positionsMap[pos];
- auto portionField = dataSchema.GetFieldByIndex(dataPos);
- auto resultField = resultSchema.GetFieldByIndex(pos);
-
- Y_VERIFY(portionField->IsCompatibleWith(*resultField));
-
- std::vector<TAssembleBlobInfo> blobs;
- blobs.reserve(orderedChunks.size());
- ui32 expected = 0;
- for (auto& [chunk, blobRange] : orderedChunks) {
- Y_VERIFY(chunk == expected);
- ++expected;
-
- auto it = blobsData.find(blobRange);
- Y_VERIFY(it != blobsData.end());
- blobs.emplace_back(it->second);
- }
-
- Y_VERIFY(pos < columns.size());
- columns[pos] = TPreparedColumn(std::move(blobs), dataSchema.GetColumnLoader(resultField->name()));
- }
-
- return TPreparedBatchData(std::move(columns), resultSchema.GetSchema(), rowsCount);
- }
-
- std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const ISnapshotSchema& dataSchema,
- const ISnapshotSchema& resultSchema,
- const THashMap<TBlobRange, TString>& data) const {
- auto batch = PrepareForAssemble(dataSchema, resultSchema, data).Assemble();
- Y_VERIFY(batch->Validate().ok());
- return batch;
- }
-
- static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array,
- const std::shared_ptr<arrow::Field>& field,
- const TColumnSaver saver);
-
- void AppendOneChunkColumn(TColumnRecord&& record);
-
- friend IOutputStream& operator << (IOutputStream& out, const TPortionInfo& info) {
- for (auto& rec : info.Records) {
- out << " " << rec;
- out << " (1 of " << info.Records.size() << " blobs shown)";
- break;
- }
- out << ";activity=" << info.IsActive() << ";";
- if (!info.TierName.empty()) {
- out << " tier: " << info.TierName;
- }
- out << " " << info.Meta;
- return out;
- }
-};
-
-/// Ensure that TPortionInfo can be effectively assigned by moving the value.
-static_assert(std::is_nothrow_move_assignable<TPortionInfo>::value);
-
-/// Ensure that TPortionInfo can be effectively constructed by moving the value.
-static_assert(std::is_nothrow_move_constructible<TPortionInfo>::value);
-
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..914aa5278b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,31 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
+
+add_library(columnshard-engines-portions)
+target_link_libraries(columnshard-engines-portions PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ columnshard-engines-scheme
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(columnshard-engines-portions PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp
+)
+generate_enum_serilization(columnshard-engines-portions
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h
+ INCLUDE_HEADERS
+ ydb/core/tx/columnshard/engines/portions/portion_info.h
+)
diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..86efcb228b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,32 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
+
+add_library(columnshard-engines-portions)
+target_link_libraries(columnshard-engines-portions PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ columnshard-engines-scheme
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(columnshard-engines-portions PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp
+)
+generate_enum_serilization(columnshard-engines-portions
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h
+ INCLUDE_HEADERS
+ ydb/core/tx/columnshard/engines/portions/portion_info.h
+)
diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..86efcb228b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,32 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
+
+add_library(columnshard-engines-portions)
+target_link_libraries(columnshard-engines-portions PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ columnshard-engines-scheme
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(columnshard-engines-portions PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp
+)
+generate_enum_serilization(columnshard-engines-portions
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h
+ INCLUDE_HEADERS
+ ydb/core/tx/columnshard/engines/portions/portion_info.h
+)
diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..914aa5278b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,31 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
+
+add_library(columnshard-engines-portions)
+target_link_libraries(columnshard-engines-portions PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ columnshard-engines-scheme
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(columnshard-engines-portions PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp
+)
+generate_enum_serilization(columnshard-engines-portions
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h
+ INCLUDE_HEADERS
+ ydb/core/tx/columnshard/engines/portions/portion_info.h
+)
diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.cpp b/ydb/core/tx/columnshard/engines/portions/column_record.cpp
new file mode 100644
index 0000000000..621928056a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/column_record.cpp
@@ -0,0 +1,5 @@
+#include "column_record.h"
+
+namespace NKikimr::NOlap {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h
new file mode 100644
index 0000000000..cc66e266f0
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/column_record.h
@@ -0,0 +1,91 @@
+#pragma once
+
+#include <ydb/core/tx/columnshard/blob.h>
+#include <ydb/core/tx/columnshard/common/snapshot.h>
+
+namespace NKikimr::NOlap {
+
+struct TColumnRecord {
+ ui64 Granule;
+ ui64 PlanStep; // {PlanStep, TxId} is min snapshot for {Granule, Portion}
+ ui64 TxId;
+ ui64 Portion; // Id of independent (overlayed by PK) portion of data in granule
+ ui64 XPlanStep{0}; // {XPlanStep, XTxId} is snapshot where the blob has been removed (i.e. compacted into another one)
+ ui64 XTxId{0};
+ ui32 ColumnId{0};
+ ui16 Chunk; // Number of blob for column ColumnName in Portion
+ TBlobRange BlobRange;
+ TString Metadata;
+
+ std::optional<ui32> GetChunkRowsCount() const {
+ return {};
+ }
+
+ bool operator == (const TColumnRecord& rec) const {
+ return (Granule == rec.Granule) && (ColumnId == rec.ColumnId) &&
+ (PlanStep == rec.PlanStep) && (TxId == rec.TxId) && (Portion == rec.Portion) && (Chunk == rec.Chunk);
+ }
+
+ TString SerializedBlobId() const {
+ return BlobRange.BlobId.SerializeBinary();
+ }
+
+ bool Valid() const {
+ return ValidExceptSnapshot() && ValidSnapshot();
+ }
+
+ bool ValidSnapshot() const {
+ return PlanStep && TxId;
+ }
+
+ bool ValidExceptSnapshot() const {
+ return Granule && ColumnId && Portion && ValidBlob();
+ }
+
+ bool ValidBlob() const {
+ return BlobRange.BlobId.IsValid() && BlobRange.Size;
+ }
+
+ void SetSnapshot(const TSnapshot& snap) {
+ Y_VERIFY(snap.Valid());
+ PlanStep = snap.GetPlanStep();
+ TxId = snap.GetTxId();
+ }
+
+ void SetXSnapshot(const TSnapshot& snap) {
+ Y_VERIFY(snap.Valid());
+ XPlanStep = snap.GetPlanStep();
+ XTxId = snap.GetTxId();
+ }
+
+ static TColumnRecord Make(ui64 granule, ui32 columnId, const TSnapshot& minSnapshot, ui64 portion, ui16 chunk = 0) {
+ TColumnRecord row;
+ row.Granule = granule;
+ row.PlanStep = minSnapshot.GetPlanStep();
+ row.TxId = minSnapshot.GetTxId();
+ row.Portion = portion;
+ row.ColumnId = columnId;
+ row.Chunk = chunk;
+ //row.BlobId
+ //row.Metadata
+ return row;
+ }
+
+ friend IOutputStream& operator << (IOutputStream& out, const TColumnRecord& rec) {
+ out << '{';
+ out << 'g' << rec.Granule << 'p' << rec.Portion;
+ if (rec.Chunk) {
+ out << 'n' << rec.Chunk;
+ }
+ out << ',' << (i32)rec.ColumnId;
+ out << ',' << rec.PlanStep << ':' << (rec.TxId == Max<ui64>() ? "max" : ToString(rec.TxId));
+ if (rec.XPlanStep) {
+ out << '-' << rec.XPlanStep << ':' << (rec.XTxId == Max<ui64>() ? "max" : ToString(rec.XTxId));
+ }
+ out << ',' << rec.BlobRange.ToString();
+ out << '}';
+ return out;
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
new file mode 100644
index 0000000000..b20a716f9e
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -0,0 +1,262 @@
+#include "portion_info.h"
+#include <ydb/core/protos/tx_columnshard.pb.h>
+#include <ydb/core/formats/arrow/arrow_filter.h>
+
+namespace NKikimr::NOlap {
+
+TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array,
+ const std::shared_ptr<arrow::Field>& field,
+ const TColumnSaver saver) {
+ auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{ field });
+ auto batch = arrow::RecordBatch::Make(schema, array->length(), { array });
+ Y_VERIFY(batch);
+
+ return saver.Apply(batch);
+}
+
+void TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record) {
+ record.Chunk = 0;
+ Records.emplace_back(std::move(record));
+}
+
+void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) {
+ Y_VERIFY(column->length());
+
+ std::pair<int, int> minMaxPos = {0, (column->length() - 1)};
+ if (!sorted) {
+ minMaxPos = NArrow::FindMinMaxPosition(column);
+ }
+
+ Y_VERIFY(minMaxPos.first >= 0);
+ Y_VERIFY(minMaxPos.second >= 0);
+
+ Meta.ColumnMeta[columnId].Min = NArrow::GetScalar(column, minMaxPos.first);
+ Meta.ColumnMeta[columnId].Max = NArrow::GetScalar(column, minMaxPos.second);
+}
+
+void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
+ const TString& tierName) {
+ const auto& indexInfo = snapshotSchema.GetIndexInfo();
+ const auto& minMaxColumns = indexInfo.GetMinMaxIdxColumns();
+
+ TierName = tierName;
+ Meta = {};
+ Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId();
+
+ // Copy first and last key rows into new batch to free source batch's memory
+ {
+ auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey());
+ std::vector<bool> bits(batch->num_rows(), false);
+ bits[0] = true;
+ bits[batch->num_rows() - 1] = true; // it colud be 0 if batch has one row
+
+ auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows());
+ auto res = arrow::compute::Filter(keyBatch, filter);
+ Y_VERIFY(res.ok());
+
+ Meta.ReplaceKeyEdges = res->record_batch();
+ Y_VERIFY(Meta.ReplaceKeyEdges->num_rows() == 1 || Meta.ReplaceKeyEdges->num_rows() == 2);
+ }
+
+ auto edgesBatch = NArrow::ExtractColumns(Meta.ReplaceKeyEdges, indexInfo.GetIndexKey());
+ Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0);
+ Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1);
+
+ /// @note It does not add RawBytes info for snapshot columns, only for user ones.
+ for (auto& [columnId, col] : indexInfo.Columns) {
+ const auto& columnName = col.Name;
+ auto column = batch->GetColumnByName(col.Name);
+ Y_VERIFY(column);
+ Meta.ColumnMeta[columnId].NumRows = column->length();
+ Meta.ColumnMeta[columnId].RawBytes = NArrow::GetArrayDataSize(column);
+
+ if (minMaxColumns.contains(columnId)) {
+ auto column = batch->GetColumnByName(columnName);
+ Y_VERIFY(column);
+
+ bool isSorted = (columnId == Meta.FirstPkColumn);
+ AddMinMax(columnId, column, isSorted);
+ Y_VERIFY(Meta.HasMinMax(columnId));
+ }
+ }
+}
+
+TString TPortionInfo::GetMetadata(const TColumnRecord& rec) const {
+ NKikimrTxColumnShard::TIndexColumnMeta meta; // TODO: move proto serialization out of engines folder
+ if (Meta.ColumnMeta.contains(rec.ColumnId)) {
+ const auto& columnMeta = Meta.ColumnMeta.find(rec.ColumnId)->second;
+ if (auto numRows = columnMeta.NumRows) {
+ meta.SetNumRows(numRows);
+ }
+ if (auto rawBytes = columnMeta.RawBytes) {
+ meta.SetRawBytes(rawBytes);
+ }
+ if (columnMeta.HasMinMax()) {
+ ScalarToConstant(*columnMeta.Min, *meta.MutableMinValue());
+ ScalarToConstant(*columnMeta.Max, *meta.MutableMaxValue());
+ }
+ }
+
+ if (rec.ColumnId == Meta.FirstPkColumn) {
+ auto* portionMeta = meta.MutablePortionMeta();
+
+ switch (Meta.Produced) {
+ case TPortionMeta::UNSPECIFIED:
+ Y_VERIFY(false);
+ case TPortionMeta::INSERTED:
+ portionMeta->SetIsInserted(true);
+ break;
+ case TPortionMeta::COMPACTED:
+ portionMeta->SetIsCompacted(true);
+ break;
+ case TPortionMeta::SPLIT_COMPACTED:
+ portionMeta->SetIsSplitCompacted(true);
+ break;
+ case TPortionMeta::EVICTED:
+ portionMeta->SetIsEvicted(true);
+ break;
+ case TPortionMeta::INACTIVE:
+ Y_FAIL("Unexpected inactive case");
+ //portionMeta->SetInactive(true);
+ break;
+ }
+
+ if (!TierName.empty()) {
+ portionMeta->SetTierName(TierName);
+ }
+
+ if (const auto& keyEdgesBatch = Meta.ReplaceKeyEdges) {
+ Y_VERIFY(keyEdgesBatch);
+ Y_VERIFY_DEBUG(keyEdgesBatch->ValidateFull().ok());
+ Y_VERIFY(keyEdgesBatch->num_rows() == 1 || keyEdgesBatch->num_rows() == 2);
+ portionMeta->SetPrimaryKeyBorders(NArrow::SerializeBatchNoCompression(keyEdgesBatch));
+ }
+ }
+
+ TString out;
+ Y_PROTOBUF_SUPPRESS_NODISCARD meta.SerializeToString(&out);
+ return out;
+}
+
+void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec) {
+ if (rec.Metadata.empty()) {
+ return;
+ }
+
+ NKikimrTxColumnShard::TIndexColumnMeta meta;
+ bool ok = meta.ParseFromString(rec.Metadata);
+ Y_VERIFY(ok);
+
+ Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId();
+ auto field = indexInfo.ArrowColumnField(rec.ColumnId);
+ const bool compositeIndexKey = indexInfo.IsCompositeIndexKey();
+
+ if (meta.HasPortionMeta()) {
+ Y_VERIFY_DEBUG(rec.ColumnId == Meta.FirstPkColumn);
+
+ auto& portionMeta = meta.GetPortionMeta();
+ TierName = portionMeta.GetTierName();
+
+ if (portionMeta.GetIsInserted()) {
+ Meta.Produced = TPortionMeta::INSERTED;
+ } else if (portionMeta.GetIsCompacted()) {
+ Meta.Produced = TPortionMeta::COMPACTED;
+ } else if (portionMeta.GetIsSplitCompacted()) {
+ Meta.Produced = TPortionMeta::SPLIT_COMPACTED;
+ } else if (portionMeta.GetIsEvicted()) {
+ Meta.Produced = TPortionMeta::EVICTED;
+ }
+
+ if (portionMeta.HasPrimaryKeyBorders()) {
+ Meta.ReplaceKeyEdges = NArrow::DeserializeBatch(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey());
+ Y_VERIFY(Meta.ReplaceKeyEdges);
+ Y_VERIFY_DEBUG(Meta.ReplaceKeyEdges->ValidateFull().ok());
+ Y_VERIFY(Meta.ReplaceKeyEdges->num_rows() == 1 || Meta.ReplaceKeyEdges->num_rows() == 2);
+
+ if (compositeIndexKey) {
+ auto edgesBatch = NArrow::ExtractColumns(Meta.ReplaceKeyEdges, indexInfo.GetIndexKey());
+ Y_VERIFY(edgesBatch);
+ Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0);
+ Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1);
+ }
+ }
+ }
+ if (meta.HasNumRows()) {
+ Meta.ColumnMeta[rec.ColumnId].NumRows = meta.GetNumRows();
+ }
+ if (meta.HasRawBytes()) {
+ Meta.ColumnMeta[rec.ColumnId].RawBytes = meta.GetRawBytes();
+ }
+ if (meta.HasMinValue()) {
+ auto scalar = ConstantToScalar(meta.GetMinValue(), field->type());
+ Meta.ColumnMeta[rec.ColumnId].Min = scalar;
+
+ // Restore Meta.IndexKeyStart for one column IndexKey
+ if (!compositeIndexKey && rec.ColumnId == Meta.FirstPkColumn) {
+ Meta.IndexKeyStart = NArrow::TReplaceKey::FromScalar(scalar);
+ }
+ }
+ if (meta.HasMaxValue()) {
+ auto scalar = ConstantToScalar(meta.GetMaxValue(), field->type());
+ Meta.ColumnMeta[rec.ColumnId].Max = scalar;
+
+ // Restore Meta.IndexKeyEnd for one column IndexKey
+ if (!compositeIndexKey && rec.ColumnId == Meta.FirstPkColumn) {
+ Meta.IndexKeyEnd = NArrow::TReplaceKey::FromScalar(scalar);
+ }
+ }
+
+ // Portion genarated without PrimaryKeyBorders and loaded with indexInfo.IsCompositeIndexKey()
+ // We should have no such portions for ForceColumnTablesCompositeMarks feature
+ if (rec.ColumnId == Meta.FirstPkColumn) {
+ Y_VERIFY(Meta.IndexKeyStart && Meta.IndexKeyEnd);
+ }
+}
+
+std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const {
+ if (!Meta.ColumnMeta.contains(columnId)) {
+ return {};
+ }
+ return Meta.ColumnMeta.find(columnId)->second.Min;
+}
+
+std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const {
+ if (!Meta.ColumnMeta.contains(columnId)) {
+ return {};
+ }
+ return Meta.ColumnMeta.find(columnId)->second.Max;
+}
+
+std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const {
+ Y_VERIFY(!Blobs.empty());
+
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ batches.reserve(Blobs.size());
+ for (auto& blob : Blobs) {
+ batches.push_back(blob.BuildRecordBatch(*Loader));
+ Y_VERIFY(batches.back());
+ }
+
+ auto res = arrow::Table::FromRecordBatches(batches);
+ Y_VERIFY_S(res.ok(), res.status().message());
+ return (*res)->column(0);
+}
+
+std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const {
+ std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
+ std::vector< std::shared_ptr<arrow::Field>> fields;
+ for (auto&& i : Columns) {
+ if (!options.IsAcceptedColumn(i.GetColumnId())) {
+ continue;
+ }
+ columns.emplace_back(i.Assemble());
+ fields.emplace_back(i.GetField());
+ }
+
+ auto table = arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns);
+ auto res = table->CombineChunks();
+ Y_VERIFY(res.ok());
+ return NArrow::ToBatch(*res);
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
new file mode 100644
index 0000000000..64f9cb643b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -0,0 +1,519 @@
+#pragma once
+#include "column_record.h"
+
+#include <ydb/core/formats/arrow/replace_key.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
+#include <ydb/core/formats/arrow/dictionary/conversion.h>
+#include <ydb/core/tx/columnshard/counters/indexation.h>
+#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/library/yverify_stream/yverify_stream.h>
+
+namespace NKikimr::NOlap {
+
+struct TPortionMeta {
+ // NOTE: These values are persisted in LocalDB so they must be stable
+ enum EProduced : ui32 {
+ UNSPECIFIED = 0,
+ INSERTED = 1,
+ COMPACTED = 2,
+ SPLIT_COMPACTED = 3,
+ INACTIVE = 4,
+ EVICTED = 5,
+ };
+
+ struct TColumnMeta {
+ ui32 NumRows{0};
+ ui32 RawBytes{0};
+ std::shared_ptr<arrow::Scalar> Min;
+ std::shared_ptr<arrow::Scalar> Max;
+
+ bool HasMinMax() const noexcept {
+ return Min.get() && Max.get();
+ }
+ };
+
+ EProduced Produced{UNSPECIFIED};
+ THashMap<ui32, TColumnMeta> ColumnMeta;
+ ui32 FirstPkColumn = 0;
+ std::shared_ptr<arrow::RecordBatch> ReplaceKeyEdges; // first and last PK rows
+ std::optional<NArrow::TReplaceKey> IndexKeyStart;
+ std::optional<NArrow::TReplaceKey> IndexKeyEnd;
+
+ bool HasMinMax(ui32 columnId) const {
+ if (!ColumnMeta.contains(columnId)) {
+ return false;
+ }
+ return ColumnMeta.find(columnId)->second.HasMinMax();
+ }
+
+ bool HasPkMinMax() const {
+ return HasMinMax(FirstPkColumn);
+ }
+
+ ui32 NumRows() const {
+ if (FirstPkColumn) {
+ Y_VERIFY(ColumnMeta.contains(FirstPkColumn));
+ return ColumnMeta.find(FirstPkColumn)->second.NumRows;
+ }
+ return 0;
+ }
+
+ friend IOutputStream& operator << (IOutputStream& out, const TPortionMeta& info) {
+ out << "reason" << (ui32)info.Produced;
+ for (const auto& [_, meta] : info.ColumnMeta) {
+ if (meta.NumRows) {
+ out << " " << meta.NumRows << " rows";
+ break;
+ }
+ }
+ return out;
+ }
+};
+
+class TPortionAddress {
+private:
+ YDB_READONLY(ui64, GranuleId, 0);
+ YDB_READONLY(ui64, PortionId, 0);
+public:
+ TPortionAddress(const ui64 granuleId, const ui64 portionId)
+ : GranuleId(granuleId)
+ , PortionId(portionId)
+ {
+
+ }
+
+ bool operator<(const TPortionAddress& item) const {
+ return std::tie(GranuleId, PortionId) < std::tie(item.GranuleId, item.PortionId);
+ }
+
+ bool operator==(const TPortionAddress& item) const {
+ return std::tie(GranuleId, PortionId) == std::tie(item.GranuleId, item.PortionId);
+ }
+};
+
+struct TPortionInfo {
+ static constexpr const ui32 BLOB_BYTES_LIMIT = 8 * 1024 * 1024;
+
+ std::vector<TColumnRecord> Records;
+ TPortionMeta Meta;
+ TString TierName;
+
+ bool Empty() const { return Records.empty(); }
+ bool Produced() const { return Meta.Produced != TPortionMeta::UNSPECIFIED; }
+ bool Valid() const { return !Empty() && Produced() && Meta.HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; }
+ bool IsInserted() const { return Meta.Produced == TPortionMeta::INSERTED; }
+ bool IsEvicted() const { return Meta.Produced == TPortionMeta::EVICTED; }
+ bool CanHaveDups() const { return !Produced(); /* || IsInserted(); */ }
+ bool CanIntersectOthers() const { return !Valid() || IsInserted() || IsEvicted(); }
+ size_t NumRecords() const { return Records.size(); }
+
+ bool CheckForCleanup(const TSnapshot& snapshot) const {
+ if (!CheckForCleanup()) {
+ return false;
+ }
+
+ return GetXSnapshot() < snapshot;
+ }
+
+ bool CheckForCleanup() const {
+ return !IsActive();
+ }
+
+ bool AllowEarlyFilter() const {
+ return Meta.Produced == TPortionMeta::COMPACTED
+ || Meta.Produced == TPortionMeta::SPLIT_COMPACTED;
+ }
+
+ bool EvictReady(size_t hotSize) const {
+ return Meta.Produced == TPortionMeta::COMPACTED
+ || Meta.Produced == TPortionMeta::SPLIT_COMPACTED
+ || Meta.Produced == TPortionMeta::EVICTED
+ || (Meta.Produced == TPortionMeta::INSERTED && BlobsSizes().first >= hotSize);
+ }
+
+ ui64 Portion() const {
+ Y_VERIFY(!Empty());
+ auto& rec = Records[0];
+ return rec.Portion;
+ }
+
+ ui64 Granule() const {
+ Y_VERIFY(!Empty());
+ auto& rec = Records[0];
+ return rec.Granule;
+ }
+
+ TPortionAddress GetAddress() const {
+ Y_VERIFY(!Empty());
+ auto& rec = Records[0];
+ return TPortionAddress(rec.Granule, rec.Portion);
+ }
+
+ void SetGranule(ui64 granule) {
+ for (auto& rec : Records) {
+ rec.Granule = granule;
+ }
+ }
+
+ TSnapshot GetSnapshot() const {
+ Y_VERIFY(!Empty());
+ auto& rec = Records[0];
+ return TSnapshot(rec.PlanStep, rec.TxId);
+ }
+
+ TSnapshot GetXSnapshot() const {
+ Y_VERIFY(!Empty());
+ auto& rec = Records[0];
+ return TSnapshot(rec.XPlanStep, rec.XTxId);
+ }
+
+ bool IsActive() const {
+ return GetXSnapshot().IsZero();
+ }
+
+ std::pair<ui32, ui32> BlobsSizes() const {
+ ui32 sum = 0;
+ ui32 max = 0;
+ for (const auto& rec : Records) {
+ sum += rec.BlobRange.Size;
+ max = Max(max, rec.BlobRange.Size);
+ }
+ return {sum, max};
+ }
+
+ ui64 BlobsBytes() const noexcept {
+ ui64 sum = 0;
+ for (const auto& rec : Records) {
+ sum += rec.BlobRange.Size;
+ }
+ return sum;
+ }
+
+ void UpdateRecords(ui64 portion, const THashMap<ui64, ui64>& granuleRemap) {
+ for (auto& rec : Records) {
+ rec.Portion = portion;
+ }
+ if (!granuleRemap.empty()) {
+ for (auto& rec : Records) {
+ Y_VERIFY(granuleRemap.contains(rec.Granule));
+ rec.Granule = granuleRemap.find(rec.Granule)->second;
+ }
+ }
+ }
+
+ void UpdateRecordsMeta(TPortionMeta::EProduced produced) {
+ Meta.Produced = produced;
+ for (auto& record : Records) {
+ record.Metadata = GetMetadata(record);
+ }
+ }
+
+ void SetStale(const TSnapshot& snapshot) {
+ for (auto& rec : Records) {
+ rec.SetXSnapshot(snapshot);
+ }
+ }
+
+ void AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec) {
+ Records.push_back(rec);
+ LoadMetadata(indexInfo, rec);
+ }
+
+ TString GetMetadata(const TColumnRecord& rec) const;
+ void LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec);
+ void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
+ const TString& tierName);
+ void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted);
+
+ std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const;
+ std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const;
+
+ const NArrow::TReplaceKey& IndexKeyStart() const {
+ Y_VERIFY(Meta.IndexKeyStart);
+ return *Meta.IndexKeyStart;
+ }
+
+ const NArrow::TReplaceKey& IndexKeyEnd() const {
+ Y_VERIFY(Meta.IndexKeyEnd);
+ return *Meta.IndexKeyEnd;
+ }
+
+ ui32 NumRows() const {
+ return Meta.NumRows();
+ }
+
+ ui64 GetRawBytes(const std::vector<ui32>& columnIds) const {
+ ui64 sum = 0;
+ const ui32 numRows = NumRows();
+ for (auto&& i : columnIds) {
+ if (TIndexInfo::IsSpecialColumn(i)) {
+ sum += numRows * TIndexInfo::GetSpecialColumnByteWidth(i);
+ } else {
+ auto it = Meta.ColumnMeta.find(i);
+ if (it != Meta.ColumnMeta.end()) {
+ sum += it->second.RawBytes;
+ }
+ }
+ }
+ return sum;
+ }
+
+ ui64 RawBytesSum() const {
+ ui64 sum = 0;
+ for (auto& [columnId, colMeta] : Meta.ColumnMeta) {
+ sum += colMeta.RawBytes;
+ }
+ return sum;
+ }
+
+private:
+ class TMinGetter {
+ public:
+ static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) {
+ return portionInfo.MinValue(columnId);
+ }
+ };
+
+ class TMaxGetter {
+ public:
+ static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) {
+ return portionInfo.MaxValue(columnId);
+ }
+ };
+
+ template <class TSelfGetter, class TItemGetter = TSelfGetter>
+ int CompareByColumnIdsImpl(const TPortionInfo& item, const std::vector<ui32>& columnIds) const {
+ for (auto&& i : columnIds) {
+ std::shared_ptr<arrow::Scalar> valueSelf = TSelfGetter::Get(*this, i);
+ std::shared_ptr<arrow::Scalar> valueItem = TItemGetter::Get(item, i);
+ if (!!valueSelf && !!valueItem) {
+ const int cmpResult = NArrow::ScalarCompare(valueSelf, valueItem);
+ if (cmpResult) {
+ return cmpResult;
+ }
+ } else if (!!valueSelf) {
+ return 1;
+ } else if (!!valueItem) {
+ return -1;
+ }
+ }
+ return 0;
+ }
+public:
+ int CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const {
+ return CompareByColumnIdsImpl<TMaxGetter, TMinGetter>(item, info.KeyColumns);
+ }
+
+ int CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const {
+ return CompareMinByColumnIds(item, info.KeyColumns);
+ }
+
+ int CompareMinByColumnIds(const TPortionInfo& item, const std::vector<ui32>& columnIds) const {
+ return CompareByColumnIdsImpl<TMinGetter>(item, columnIds);
+ }
+
+ class TAssembleBlobInfo {
+ private:
+ ui32 NullRowsCount = 0;
+ TString Data;
+ public:
+ TAssembleBlobInfo(const ui32 rowsCount)
+ : NullRowsCount(rowsCount) {
+
+ }
+
+ TAssembleBlobInfo(const TString& data)
+ : Data(data) {
+
+ }
+
+ ui32 GetNullRowsCount() const noexcept {
+ return NullRowsCount;
+ }
+
+ const TString& GetData() const noexcept {
+ return Data;
+ }
+
+ std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(const TColumnLoader& loader) const {
+ if (NullRowsCount) {
+ Y_VERIFY(!Data);
+ return NArrow::MakeEmptyBatch(loader.GetExpectedSchema(), NullRowsCount);
+ } else {
+ auto result = loader.Apply(Data);
+ if (!result.ok()) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "cannot unpack batch")("error", result.status().ToString())("loader", loader.DebugString());
+ return nullptr;
+ }
+ return *result;
+ }
+ }
+ };
+
+ class TPreparedColumn {
+ private:
+ std::shared_ptr<TColumnLoader> Loader;
+ std::vector<TAssembleBlobInfo> Blobs;
+ public:
+ ui32 GetColumnId() const {
+ return Loader->GetColumnId();
+ }
+
+ const std::string& GetName() const {
+ return Loader->GetExpectedSchema()->field(0)->name();
+ }
+
+ std::shared_ptr<arrow::Field> GetField() const {
+ return Loader->GetExpectedSchema()->field(0);
+ }
+
+ TPreparedColumn(std::vector<TAssembleBlobInfo>&& blobs, const std::shared_ptr<TColumnLoader>& loader)
+ : Loader(loader)
+ , Blobs(std::move(blobs))
+ {
+ Y_VERIFY(Loader);
+ Y_VERIFY(Loader->GetExpectedSchema()->num_fields() == 1);
+ }
+
+ std::shared_ptr<arrow::ChunkedArray> Assemble() const;
+ };
+
+ class TPreparedBatchData {
+ private:
+ std::vector<TPreparedColumn> Columns;
+ std::shared_ptr<arrow::Schema> Schema;
+ size_t RowsCount = 0;
+
+ public:
+ struct TAssembleOptions {
+ std::optional<std::set<ui32>> IncludedColumnIds;
+ std::optional<std::set<ui32>> ExcludedColumnIds;
+
+ bool IsAcceptedColumn(const ui32 columnId) const {
+ if (IncludedColumnIds && !IncludedColumnIds->contains(columnId)) {
+ return false;
+ }
+ if (ExcludedColumnIds && ExcludedColumnIds->contains(columnId)) {
+ return false;
+ }
+ return true;
+ }
+ };
+
+ std::vector<std::string> GetSchemaColumnNames() const {
+ return Schema->field_names();
+ }
+
+ size_t GetColumnsCount() const {
+ return Columns.size();
+ }
+
+ size_t GetRowsCount() const {
+ return RowsCount;
+ }
+
+ TPreparedBatchData(std::vector<TPreparedColumn>&& columns, std::shared_ptr<arrow::Schema> schema, const size_t rowsCount)
+ : Columns(std::move(columns))
+ , Schema(schema)
+ , RowsCount(rowsCount)
+ {
+ }
+
+ std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = {}) const;
+ };
+
+ template <class TExternalBlobInfo>
+ TPreparedBatchData PrepareForAssemble(const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema,
+ const THashMap<TBlobRange, TExternalBlobInfo>& blobsData) const {
+ std::vector<TPreparedColumn> columns;
+ columns.reserve(resultSchema.GetSchema()->num_fields());
+
+ Y_VERIFY(!Meta.ColumnMeta.empty());
+ const ui32 rowsCount = Meta.ColumnMeta.begin()->second.NumRows;
+ for (auto&& field : resultSchema.GetSchema()->fields()) {
+ columns.emplace_back(TPreparedColumn({ TAssembleBlobInfo(rowsCount) }, resultSchema.GetColumnLoader(field->name())));
+ }
+
+ TMap<size_t, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks
+ TMap<size_t, size_t> positionsMap;
+
+ for (auto& rec : Records) {
+ auto resulPos = resultSchema.GetFieldIndex(rec.ColumnId);
+ if (resulPos < 0) {
+ continue;
+ }
+ auto pos = dataSchema.GetFieldIndex(rec.ColumnId);
+ Y_ASSERT(pos >= 0);
+ positionsMap[resulPos] = pos;
+ columnChunks[resulPos][rec.Chunk] = rec.BlobRange;
+ auto columnMeta = Meta.ColumnMeta.FindPtr(rec.ColumnId);
+ if (columnMeta) {
+ Y_VERIFY_S(rowsCount == columnMeta->NumRows, TStringBuilder() << "Inconsistent rows " << rowsCount << "/" << columnMeta->NumRows);
+ }
+ }
+
+ // Make chunked arrays for columns
+ for (auto& [pos, orderedChunks] : columnChunks) {
+ Y_VERIFY(positionsMap.contains(pos));
+ size_t dataPos = positionsMap[pos];
+ auto portionField = dataSchema.GetFieldByIndex(dataPos);
+ auto resultField = resultSchema.GetFieldByIndex(pos);
+
+ Y_VERIFY(portionField->IsCompatibleWith(*resultField));
+
+ std::vector<TAssembleBlobInfo> blobs;
+ blobs.reserve(orderedChunks.size());
+ ui32 expected = 0;
+ for (auto& [chunk, blobRange] : orderedChunks) {
+ Y_VERIFY(chunk == expected);
+ ++expected;
+
+ auto it = blobsData.find(blobRange);
+ Y_VERIFY(it != blobsData.end());
+ blobs.emplace_back(it->second);
+ }
+
+ Y_VERIFY(pos < columns.size());
+ columns[pos] = TPreparedColumn(std::move(blobs), dataSchema.GetColumnLoader(resultField->name()));
+ }
+
+ return TPreparedBatchData(std::move(columns), resultSchema.GetSchema(), rowsCount);
+ }
+
+ std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const ISnapshotSchema& dataSchema,
+ const ISnapshotSchema& resultSchema,
+ const THashMap<TBlobRange, TString>& data) const {
+ auto batch = PrepareForAssemble(dataSchema, resultSchema, data).Assemble();
+ Y_VERIFY(batch->Validate().ok());
+ return batch;
+ }
+
+ static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array,
+ const std::shared_ptr<arrow::Field>& field,
+ const TColumnSaver saver);
+
+ void AppendOneChunkColumn(TColumnRecord&& record);
+
+ friend IOutputStream& operator << (IOutputStream& out, const TPortionInfo& info) {
+ for (auto& rec : info.Records) {
+ out << " " << rec;
+ out << " (1 of " << info.Records.size() << " blobs shown)";
+ break;
+ }
+ out << ";activity=" << info.IsActive() << ";";
+ if (!info.TierName.empty()) {
+ out << " tier: " << info.TierName;
+ }
+ out << " " << info.Meta;
+ return out;
+ }
+};
+
+/// Ensure that TPortionInfo can be effectively assigned by moving the value.
+static_assert(std::is_nothrow_move_assignable<TPortionInfo>::value);
+
+/// Ensure that TPortionInfo can be effectively constructed by moving the value.
+static_assert(std::is_nothrow_move_constructible<TPortionInfo>::value);
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/portions/ya.make b/ydb/core/tx/columnshard/engines/portions/ya.make
new file mode 100644
index 0000000000..c1be81651a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+SRCS(
+ portion_info.cpp
+ column_record.cpp
+)
+
+PEERDIR(
+ ydb/core/tx/columnshard/engines/scheme
+)
+
+GENERATE_ENUM_SERIALIZATION(portion_info.h)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/scalars.h b/ydb/core/tx/columnshard/engines/scalars.h
index a7fc346f68..ed70b1299a 100644
--- a/ydb/core/tx/columnshard/engines/scalars.h
+++ b/ydb/core/tx/columnshard/engines/scalars.h
@@ -1,17 +1,2 @@
#pragma once
-
-#include "defs.h"
-
-#include <ydb/core/protos/tx_columnshard.pb.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h>
-
-namespace NKikimr::NOlap {
-
-void ScalarToConstant(const arrow::Scalar& scalar, NKikimrSSA::TProgram_TConstant& value);
-std::shared_ptr<arrow::Scalar> ConstantToScalar(const NKikimrSSA::TProgram_TConstant& value,
- const std::shared_ptr<arrow::DataType>& type);
-
-TString SerializeKeyScalar(const std::shared_ptr<arrow::Scalar>& key);
-std::shared_ptr<arrow::Scalar> DeserializeKeyScalar(const TString& key, const std::shared_ptr<arrow::DataType>& type);
-
-}
+#include <ydb/core/tx/columnshard/common/scalars.h>
diff --git a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.darwin-x86_64.txt
index 3aeec490fa..ef6c197306 100644
--- a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.darwin-x86_64.txt
@@ -22,4 +22,7 @@ target_sources(columnshard-engines-scheme PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/column_features.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-aarch64.txt
index 9568802bdb..f2b4f18273 100644
--- a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-aarch64.txt
@@ -23,4 +23,7 @@ target_sources(columnshard-engines-scheme PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/column_features.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-x86_64.txt
index 9568802bdb..f2b4f18273 100644
--- a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-x86_64.txt
@@ -23,4 +23,7 @@ target_sources(columnshard-engines-scheme PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/column_features.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.windows-x86_64.txt
index 3aeec490fa..ef6c197306 100644
--- a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.windows-x86_64.txt
@@ -22,4 +22,7 @@ target_sources(columnshard-engines-scheme PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/column_features.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/scheme/column_features.cpp b/ydb/core/tx/columnshard/engines/scheme/column_features.cpp
new file mode 100644
index 0000000000..dd6341ec7a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/column_features.cpp
@@ -0,0 +1,79 @@
+#include "column_features.h"
+#include "index_info.h"
+#include <ydb/core/formats/arrow/serializer/full.h>
+#include <ydb/core/formats/arrow/serializer/batch_only.h>
+#include <util/string/builder.h>
+
+namespace NKikimr::NOlap {
+
+NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetSaveTransformer() const {
+ NArrow::NTransformation::ITransformer::TPtr transformer;
+ if (DictionaryEncoding) {
+ transformer = DictionaryEncoding->BuildEncoder();
+ }
+ return transformer;
+}
+
+NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetLoadTransformer() const {
+ NArrow::NTransformation::ITransformer::TPtr transformer;
+ if (DictionaryEncoding) {
+ transformer = DictionaryEncoding->BuildDecoder();
+ }
+ return transformer;
+}
+
+std::shared_ptr<NKikimr::NOlap::TColumnLoader> TColumnFeatures::GetLoader(const TIndexInfo& info) const {
+ if (!LoaderCache) {
+ NArrow::NTransformation::ITransformer::TPtr transformer = GetLoadTransformer();
+ auto schema = info.GetColumnSchema(ColumnId);
+ if (!transformer) {
+ LoaderCache = std::make_shared<TColumnLoader>(transformer,
+ std::make_shared<NArrow::NSerialization::TBatchPayloadDeserializer>(schema),
+ schema, ColumnId);
+ } else {
+ LoaderCache = std::make_shared<TColumnLoader>(transformer,
+ std::make_shared<NArrow::NSerialization::TFullDataDeserializer>(),
+ schema, ColumnId);
+ }
+ }
+ return LoaderCache;
+}
+
+std::optional<NKikimr::NOlap::TColumnFeatures> TColumnFeatures::BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo, const ui32 columnId) {
+ TColumnFeatures result(columnId);
+ if (columnInfo.HasCompression()) {
+ auto settings = NArrow::TCompression::BuildFromProto(columnInfo.GetCompression());
+ Y_VERIFY(settings.IsSuccess());
+ result.Compression = *settings;
+ }
+ if (columnInfo.HasDictionaryEncoding()) {
+ auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnInfo.GetDictionaryEncoding());
+ Y_VERIFY(settings.IsSuccess());
+ result.DictionaryEncoding = *settings;
+ }
+ return result;
+}
+
+std::unique_ptr<arrow::util::Codec> TColumnFeatures::GetCompressionCodec() const {
+ if (Compression) {
+ return Compression->BuildArrowCodec();
+ } else {
+ return nullptr;
+ }
+}
+
+TString TColumnLoader::DebugString() const {
+ TStringBuilder result;
+ if (ExpectedSchema) {
+ result << "schema:" << ExpectedSchema->ToString() << ";";
+ }
+ if (Transformer) {
+ result << "transformer:" << Transformer->DebugString() << ";";
+ }
+ if (Deserializer) {
+ result << "deserializer:" << Deserializer->DebugString() << ";";
+ }
+ return result;
+}
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/scheme/column_features.h b/ydb/core/tx/columnshard/engines/scheme/column_features.h
new file mode 100644
index 0000000000..bb5e79428a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/column_features.h
@@ -0,0 +1,120 @@
+#pragma once
+#include <ydb/core/formats/arrow/compression/object.h>
+#include <ydb/core/formats/arrow/dictionary/object.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
+#include <ydb/core/formats/arrow/transformer/abstract.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
+
+namespace NKikimr::NOlap {
+
+class TSaverContext {
+private:
+ TString TierName;
+ std::optional<NArrow::TCompression> ExternalCompression;
+public:
+ const std::optional<NArrow::TCompression>& GetExternalCompression() const {
+ return ExternalCompression;
+ }
+ TSaverContext& SetExternalCompression(const std::optional<NArrow::TCompression>& value) {
+ ExternalCompression = value;
+ return *this;
+ }
+ const TString& GetTierName() const {
+ return TierName;
+ }
+ TSaverContext& SetTierName(const TString& value) {
+ TierName = value;
+ return *this;
+ }
+};
+
+class TColumnSaver {
+private:
+ NArrow::NTransformation::ITransformer::TPtr Transformer;
+ NArrow::NSerialization::ISerializer::TPtr Serializer;
+public:
+ TColumnSaver() = default;
+ TColumnSaver(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::ISerializer::TPtr serializer)
+ : Transformer(transformer)
+ , Serializer(serializer) {
+ Y_VERIFY(Serializer);
+ }
+
+ TString Apply(const std::shared_ptr<arrow::RecordBatch>& data) const {
+ Y_VERIFY(Serializer);
+ if (Transformer) {
+ return Serializer->Serialize(Transformer->Transform(data));
+ } else {
+ return Serializer->Serialize(data);
+ }
+ }
+};
+
+class TColumnLoader {
+private:
+ NArrow::NTransformation::ITransformer::TPtr Transformer;
+ NArrow::NSerialization::IDeserializer::TPtr Deserializer;
+ std::shared_ptr<arrow::Schema> ExpectedSchema;
+ const ui32 ColumnId;
+public:
+ TString DebugString() const;
+
+ TColumnLoader(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::IDeserializer::TPtr deserializer,
+ const std::shared_ptr<arrow::Schema>& expectedSchema, const ui32 columnId)
+ : Transformer(transformer)
+ , Deserializer(deserializer)
+ , ExpectedSchema(expectedSchema)
+ , ColumnId(columnId)
+ {
+ Y_VERIFY(ExpectedSchema);
+ Y_VERIFY(Deserializer);
+ }
+
+ ui32 GetColumnId() const {
+ return ColumnId;
+ }
+
+ std::shared_ptr<arrow::Schema> GetExpectedSchema() const {
+ return ExpectedSchema;
+ }
+
+ arrow::Result<std::shared_ptr<arrow::RecordBatch>> Apply(const TString& data) const {
+ Y_VERIFY(Deserializer);
+ arrow::Result<std::shared_ptr<arrow::RecordBatch>> columnArray = Deserializer->Deserialize(data);
+ if (!columnArray.ok()) {
+ return columnArray;
+ }
+ if (Transformer) {
+ return Transformer->Transform(*columnArray);
+ } else {
+ return columnArray;
+ }
+ }
+};
+
+struct TIndexInfo;
+
+class TColumnFeatures {
+private:
+ const ui32 ColumnId;
+ std::optional<NArrow::TCompression> Compression;
+ std::optional<NArrow::NDictionary::TEncodingSettings> DictionaryEncoding;
+ mutable std::shared_ptr<TColumnLoader> LoaderCache;
+public:
+ TColumnFeatures(const ui32 columnId)
+ : ColumnId(columnId)
+ {
+
+ }
+ static std::optional<TColumnFeatures> BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo, const ui32 columnId);
+
+ NArrow::NTransformation::ITransformer::TPtr GetSaveTransformer() const;
+ NArrow::NTransformation::ITransformer::TPtr GetLoadTransformer() const;
+
+ std::unique_ptr<arrow::util::Codec> GetCompressionCodec() const;
+
+ std::shared_ptr<TColumnLoader> GetLoader(const TIndexInfo& info) const;
+
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
new file mode 100644
index 0000000000..b87054b82c
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
@@ -0,0 +1,429 @@
+#include "index_info.h"
+
+#include <ydb/core/formats/arrow/arrow_batch_builder.h>
+#include <ydb/core/formats/arrow/sort_cursor.h>
+#include <ydb/core/sys_view/common/schema.h>
+#include <ydb/core/formats/arrow/serializer/batch_only.h>
+#include <ydb/core/formats/arrow/transformer/dictionary.h>
+#include <ydb/core/formats/arrow/serializer/full.h>
+#include <ydb/core/base/appdata.h>
+
+namespace NKikimr::NOlap {
+
+const TString TIndexInfo::STORE_INDEX_STATS_TABLE = TString("/") + NSysView::SysPathName + "/" + NSysView::StorePrimaryIndexStatsName;
+const TString TIndexInfo::TABLE_INDEX_STATS_TABLE = TString("/") + NSysView::SysPathName + "/" + NSysView::TablePrimaryIndexStatsName;
+
+static std::vector<TString> NamesOnly(const std::vector<TNameTypeInfo>& columns) {
+ std::vector<TString> out;
+ out.reserve(columns.size());
+ for (const auto& [name, _] : columns) {
+ out.push_back(name);
+ }
+ return out;
+}
+
+TIndexInfo::TIndexInfo(const TString& name, ui32 id)
+ : NTable::TScheme::TTableSchema()
+ , Id(id)
+ , Name(name)
+{}
+
+std::shared_ptr<arrow::RecordBatch> TIndexInfo::AddSpecialColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) {
+ Y_VERIFY(batch);
+ i64 numColumns = batch->num_columns();
+ i64 numRows = batch->num_rows();
+
+ auto res = batch->AddColumn(numColumns, arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()),
+ NArrow::MakeUI64Array(snapshot.GetPlanStep(), numRows));
+ Y_VERIFY(res.ok());
+ res = (*res)->AddColumn(numColumns + 1, arrow::field(SPEC_COL_TX_ID, arrow::uint64()),
+ NArrow::MakeUI64Array(snapshot.GetTxId(), numRows));
+ Y_VERIFY(res.ok());
+ Y_VERIFY((*res)->num_columns() == numColumns + 2);
+ return *res;
+}
+
+std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchemaSnapshot() {
+ static std::shared_ptr<arrow::Schema> result = std::make_shared<arrow::Schema>(arrow::FieldVector{
+ arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()),
+ arrow::field(SPEC_COL_TX_ID, arrow::uint64())
+ });
+ return result;
+}
+
+bool TIndexInfo::IsSpecialColumn(const arrow::Field& field) {
+ return IsSpecialColumn(field.name());
+}
+
+bool TIndexInfo::IsSpecialColumn(const std::string& fieldName) {
+ return fieldName == SPEC_COL_PLAN_STEP
+ || fieldName == SPEC_COL_TX_ID;
+}
+
+bool TIndexInfo::IsSpecialColumn(const ui32 fieldId) {
+ return fieldId == (ui32)ESpecialColumn::PLAN_STEP
+ || fieldId == (ui32)ESpecialColumn::TX_ID;
+}
+
+ui32 TIndexInfo::GetColumnId(const std::string& name) const {
+ auto id = GetColumnIdOptional(name);
+ Y_VERIFY(!!id, "undefined column %s", name.data());
+ return *id;
+}
+
+std::optional<ui32> TIndexInfo::GetColumnIdOptional(const std::string& name) const {
+ const auto ni = ColumnNames.find(name);
+
+ if (ni != ColumnNames.end()) {
+ return ni->second;
+ }
+ if (name == SPEC_COL_PLAN_STEP) {
+ return ui32(ESpecialColumn::PLAN_STEP);
+ } else if (name == SPEC_COL_TX_ID) {
+ return ui32(ESpecialColumn::TX_ID);
+ }
+ return {};
+}
+
+TString TIndexInfo::GetColumnName(ui32 id, bool required) const {
+ if (ESpecialColumn(id) == ESpecialColumn::PLAN_STEP) {
+ return SPEC_COL_PLAN_STEP;
+ } else if (ESpecialColumn(id) == ESpecialColumn::TX_ID) {
+ return SPEC_COL_TX_ID;
+ } else {
+ const auto ci = Columns.find(id);
+
+ if (!required && ci == Columns.end()) {
+ return {};
+ }
+
+ Y_VERIFY(ci != Columns.end());
+ return ci->second.Name;
+ }
+}
+
+std::vector<ui32> TIndexInfo::GetColumnIds() const {
+ std::vector<ui32> result;
+ for (auto&& i : Columns) {
+ result.emplace_back(i.first);
+ }
+ result.emplace_back((ui32)ESpecialColumn::PLAN_STEP);
+ result.emplace_back((ui32)ESpecialColumn::TX_ID);
+ return result;
+}
+
+std::vector<TString> TIndexInfo::GetColumnNames(const std::vector<ui32>& ids) const {
+ std::vector<TString> out;
+ out.reserve(ids.size());
+ for (ui32 id : ids) {
+ const auto ci = Columns.find(id);
+ Y_VERIFY(ci != Columns.end());
+ out.push_back(ci->second.Name);
+ }
+ return out;
+}
+
+std::vector<TNameTypeInfo> TIndexInfo::GetColumns(const std::vector<ui32>& ids) const {
+ return NOlap::GetColumns(*this, ids);
+}
+
+std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema() const {
+ if (!Schema) {
+ std::vector<ui32> ids;
+ ids.reserve(Columns.size());
+ for (const auto& [id, _] : Columns) {
+ ids.push_back(id);
+ }
+
+ // The ids had a set type before so we keep them sorted.
+ std::sort(ids.begin(), ids.end());
+ Schema = MakeArrowSchema(Columns, ids);
+ }
+
+ return Schema;
+}
+
+std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchemaWithSpecials() const {
+ if (SchemaWithSpecials) {
+ return SchemaWithSpecials;
+ }
+
+ const auto& schema = ArrowSchema();
+
+ std::vector<std::shared_ptr<arrow::Field>> extended;
+ extended.reserve(schema->num_fields() + 3);
+
+ // Place special fields at the beginning of the schema.
+ extended.push_back(arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()));
+ extended.push_back(arrow::field(SPEC_COL_TX_ID, arrow::uint64()));
+ // Append fields from the regular schema afterward.
+ extended.insert(extended.end(), schema->fields().begin(), schema->fields().end());
+
+ SchemaWithSpecials = std::make_shared<arrow::Schema>(std::move(extended));
+ return SchemaWithSpecials;
+}
+
+std::shared_ptr<arrow::Schema> TIndexInfo::AddColumns(
+ const std::shared_ptr<arrow::Schema>& src,
+ const std::vector<TString>& columns) const
+{
+ std::shared_ptr<arrow::Schema> all = ArrowSchemaWithSpecials();
+ auto fields = src->fields();
+
+ for (const auto& col : columns) {
+ const std::string name(col.data(), col.size());
+ if (!src->GetFieldByName(name)) {
+ auto field = all->GetFieldByName(name);
+ if (!field) {
+ return {};
+ }
+ fields.push_back(field);
+ }
+ }
+ return std::make_shared<arrow::Schema>(std::move(fields));
+}
+
+std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const std::vector<ui32>& columnIds, bool withSpecials) const {
+ return MakeArrowSchema(Columns, columnIds, withSpecials);
+}
+
+std::vector<ui32> TIndexInfo::GetColumnIds(const std::vector<TString>& columnNames) const {
+ std::vector<ui32> ids;
+ ids.reserve(columnNames.size());
+ for (auto& name : columnNames) {
+ auto columnId = GetColumnIdOptional(name);
+ if (!columnId) {
+ return {};
+ }
+ ids.emplace_back(*columnId);
+ }
+ return ids;
+}
+
+std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const std::vector<TString>& names) const {
+ auto columnIds = GetColumnIds(names);
+ if (columnIds.empty()) {
+ return {};
+ }
+ return MakeArrowSchema(Columns, columnIds);
+}
+
+std::shared_ptr<arrow::Field> TIndexInfo::ArrowColumnField(ui32 columnId) const {
+ auto it = ArrowColumnByColumnIdCache.find(columnId);
+ if (it == ArrowColumnByColumnIdCache.end()) {
+ it = ArrowColumnByColumnIdCache.emplace(columnId, ArrowSchema()->GetFieldByName(GetColumnName(columnId, true))).first;
+ }
+ return it->second;
+}
+
+void TIndexInfo::SetAllKeys() {
+ /// @note Setting replace and sorting key to PK we are able to:
+ /// * apply REPLACE by MergeSort
+ /// * apply PK predicate before REPLACE
+ const auto& primaryKeyNames = NamesOnly(GetPrimaryKey());
+ // Update set of required columns with names from primary key.
+ for (const auto& name: primaryKeyNames) {
+ RequiredColumns.insert(name);
+ }
+
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ if (primaryKeyNames.size()) {
+ SortingKey = ArrowSchema(primaryKeyNames);
+ ReplaceKey = SortingKey;
+ fields = ReplaceKey->fields();
+ if (CompositeIndexKey) {
+ IndexKey = ReplaceKey;
+ } else {
+ IndexKey = std::make_shared<arrow::Schema>(arrow::FieldVector({ fields[0] }));
+ }
+ }
+
+ fields.push_back(arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()));
+ fields.push_back(arrow::field(SPEC_COL_TX_ID, arrow::uint64()));
+ ExtendedKey = std::make_shared<arrow::Schema>(std::move(fields));
+
+ for (const auto& [colId, column] : Columns) {
+ if (NArrow::IsPrimitiveYqlType(column.PType)) {
+ MinMaxIdxColumnsIds.insert(colId);
+ }
+ }
+ MinMaxIdxColumnsIds.insert(GetPKFirstColumnId());
+}
+
+std::shared_ptr<NArrow::TSortDescription> TIndexInfo::SortDescription() const {
+ if (GetSortingKey()) {
+ auto key = GetExtendedKey(); // Sort with extended key, greater snapshot first
+ Y_VERIFY(key && key->num_fields() > 2);
+ auto description = std::make_shared<NArrow::TSortDescription>(key);
+ description->Directions[key->num_fields() - 1] = -1;
+ description->Directions[key->num_fields() - 2] = -1;
+ description->NotNull = true; // TODO
+ return description;
+ }
+ return {};
+}
+
+std::shared_ptr<NArrow::TSortDescription> TIndexInfo::SortReplaceDescription() const {
+ if (GetSortingKey()) {
+ auto key = GetExtendedKey(); // Sort with extended key, greater snapshot first
+ Y_VERIFY(key && key->num_fields() > 2);
+ auto description = std::make_shared<NArrow::TSortDescription>(key, GetReplaceKey());
+ description->Directions[key->num_fields() - 1] = -1;
+ description->Directions[key->num_fields() - 2] = -1;
+ description->NotNull = true; // TODO
+ return description;
+ }
+ return {};
+}
+
+bool TIndexInfo::AllowTtlOverColumn(const TString& name) const {
+ auto it = ColumnNames.find(name);
+ if (it == ColumnNames.end()) {
+ return false;
+ }
+ return MinMaxIdxColumnsIds.contains(it->second);
+}
+
+TColumnSaver TIndexInfo::GetColumnSaver(const ui32 columnId, const TSaverContext& context) const {
+ arrow::ipc::IpcWriteOptions options;
+ options.use_threads = false;
+
+ NArrow::NTransformation::ITransformer::TPtr transformer;
+ std::unique_ptr<arrow::util::Codec> columnCodec;
+ {
+ auto it = ColumnFeatures.find(columnId);
+ if (it != ColumnFeatures.end()) {
+ transformer = it->second.GetSaveTransformer();
+ columnCodec = it->second.GetCompressionCodec();
+ }
+ }
+
+ if (context.GetExternalCompression()) {
+ options.codec = context.GetExternalCompression()->BuildArrowCodec();
+ } else if (columnCodec) {
+ options.codec = std::move(columnCodec);
+ } else if (DefaultCompression) {
+ options.codec = DefaultCompression->BuildArrowCodec();
+ } else {
+ options.codec = NArrow::TCompression::BuildDefaultCodec();
+ }
+
+ if (!transformer) {
+ return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TBatchPayloadSerializer>(options));
+ } else {
+ return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TFullDataSerializer>(options));
+ }
+}
+
+TColumnFeatures& TIndexInfo::GetOrCreateColumnFeatures(const ui32 columnId) const {
+ auto it = ColumnFeatures.find(columnId);
+ if (it == ColumnFeatures.end()) {
+ it = ColumnFeatures.emplace(columnId, TColumnFeatures(columnId)).first;
+ }
+ return it->second;
+}
+
+std::shared_ptr<TColumnLoader> TIndexInfo::GetColumnLoader(const ui32 columnId) const {
+ TColumnFeatures& features = GetOrCreateColumnFeatures(columnId);
+ return features.GetLoader(*this);
+}
+
+std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId) const {
+ std::shared_ptr<arrow::Schema> schema = Schema;
+ if (IsSpecialColumn(columnId)) {
+ schema = ArrowSchemaSnapshot();
+ }
+ auto field = schema->GetFieldByName(GetColumnName(columnId));
+ Y_VERIFY(field);
+ std::vector<std::shared_ptr<arrow::Field>> fields = { field };
+ return std::make_shared<arrow::Schema>(fields);
+}
+
+bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) {
+ if (schema.GetEngine() != NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", "incorrect_engine_in_schema");
+ return false;
+ }
+
+ for (const auto& col : schema.GetColumns()) {
+ const ui32 id = col.GetId();
+ const TString& name = col.GetName();
+ auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(),
+ col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr);
+ Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod);
+ ColumnNames[name] = id;
+ std::optional<TColumnFeatures> cFeatures = TColumnFeatures::BuildFromProto(col, id);
+ if (!cFeatures) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_column_feature");
+ return false;
+ }
+ ColumnFeatures.emplace(id, *cFeatures);
+ }
+
+ for (const auto& keyName : schema.GetKeyColumnNames()) {
+ Y_VERIFY(ColumnNames.contains(keyName));
+ KeyColumns.push_back(ColumnNames[keyName]);
+ }
+
+ if (schema.HasDefaultCompression()) {
+ auto result = NArrow::TCompression::BuildFromProto(schema.GetDefaultCompression());
+ if (!result) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", result.GetErrorMessage());
+ return false;
+ }
+ DefaultCompression = *result;
+ }
+
+ CompositeMarks = schema.GetCompositeMarks();
+ CompositeIndexKey = AppData()->FeatureFlags.GetForceColumnTablesCompositeMarks() ? true : CompositeMarks;
+ return true;
+}
+
+bool TIndexInfo::CheckAlterScheme(const NKikimrSchemeOp::TColumnTableSchema& scheme) const {
+ return CompositeMarks == scheme.GetCompositeMarks();
+}
+
+std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials) {
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ fields.reserve(withSpecials ? ids.size() + 2 : ids.size());
+
+ if (withSpecials) {
+ // Place special fields at the beginning of the schema.
+ fields.push_back(arrow::field(TIndexInfo::SPEC_COL_PLAN_STEP, arrow::uint64()));
+ fields.push_back(arrow::field(TIndexInfo::SPEC_COL_TX_ID, arrow::uint64()));
+ }
+
+ for (const ui32 id: ids) {
+ auto it = columns.find(id);
+ if (it == columns.end()) {
+ continue;
+ }
+
+ const auto& column = it->second;
+ std::string colName(column.Name.data(), column.Name.size());
+ fields.emplace_back(std::make_shared<arrow::Field>(colName, NArrow::GetArrowType(column.PType)));
+ }
+
+ return std::make_shared<arrow::Schema>(std::move(fields));
+}
+
+std::vector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const std::vector<ui32>& ids) {
+ std::vector<std::pair<TString, NScheme::TTypeInfo>> out;
+ out.reserve(ids.size());
+ for (const ui32 id : ids) {
+ const auto ci = tableSchema.Columns.find(id);
+ Y_VERIFY(ci != tableSchema.Columns.end());
+ out.emplace_back(ci->second.Name, ci->second.PType);
+ }
+ return out;
+}
+
+std::optional<TIndexInfo> TIndexInfo::BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) {
+ TIndexInfo result("", 0);
+ if (!result.DeserializeFromProto(schema)) {
+ return std::nullopt;
+ }
+ return result;
+}
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h
new file mode 100644
index 0000000000..7bb0b9bdda
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h
@@ -0,0 +1,203 @@
+#pragma once
+
+#include "column_features.h"
+#include "tier_info.h"
+
+#include <ydb/core/tx/columnshard/common/snapshot.h>
+
+#include <ydb/core/sys_view/common/schema.h>
+#include <ydb/core/tablet_flat/flat_dbase_scheme.h>
+#include <ydb/core/tx/columnshard/common/scalars.h>
+#include <ydb/core/formats/arrow/dictionary/object.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
+#include <ydb/core/formats/arrow/transformer/abstract.h>
+#include <ydb/core/scheme/scheme_types_proto.h>
+
+namespace arrow {
+ class Array;
+ class Field;
+ class Schema;
+}
+
+namespace NKikimr::NArrow {
+ struct TSortDescription;
+}
+
+namespace NKikimr::NOlap {
+
+struct TInsertedData;
+class TSnapshotColumnInfo;
+using TNameTypeInfo = std::pair<TString, NScheme::TTypeInfo>;
+
+/// Column engine index description in terms of tablet's local table.
+/// We have to use YDB types for keys here.
+struct TIndexInfo : public NTable::TScheme::TTableSchema {
+private:
+ mutable THashMap<ui32, TColumnFeatures> ColumnFeatures;
+ mutable THashMap<ui32, std::shared_ptr<arrow::Field>> ArrowColumnByColumnIdCache;
+ TIndexInfo(const TString& name, ui32 id);
+ bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema);
+ TColumnFeatures& GetOrCreateColumnFeatures(const ui32 columnId) const;
+public:
+ static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step";
+ static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id";
+ static const TString STORE_INDEX_STATS_TABLE;
+ static const TString TABLE_INDEX_STATS_TABLE;
+
+ enum class ESpecialColumn : ui32 {
+ PLAN_STEP = 0xffffff00,
+ TX_ID,
+ };
+
+ /// Appends the special columns to the batch.
+ static std::shared_ptr<arrow::RecordBatch> AddSpecialColumns(
+ const std::shared_ptr<arrow::RecordBatch>& batch,
+ const TSnapshot& snapshot);
+
+ /// Makes schema as set of the special columns.
+ static std::shared_ptr<arrow::Schema> ArrowSchemaSnapshot();
+
+ /// Matches name of the filed with names of the special columns.
+ static bool IsSpecialColumn(const arrow::Field& field);
+ static bool IsSpecialColumn(const ui32 field);
+ static ui32 GetSpecialColumnByteWidth(const ui32 field) {
+ Y_VERIFY(IsSpecialColumn(field));
+ return 8;
+ }
+ static bool IsSpecialColumn(const std::string& fieldName);
+ template <class TContainer>
+ static bool IsSpecialColumns(const TContainer& c) {
+ for (auto&& i : c) {
+ if (!IsSpecialColumn(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool CheckAlterScheme(const NKikimrSchemeOp::TColumnTableSchema& scheme) const;
+public:
+
+ static TIndexInfo BuildDefault() {
+ TIndexInfo result("dummy", 0);
+ return result;
+ }
+
+ static std::optional<TIndexInfo> BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema);
+
+ /// Returns id of the index.
+ ui32 GetId() const noexcept {
+ return Id;
+ }
+
+ std::shared_ptr<arrow::Schema> GetColumnSchema(const ui32 columnId) const;
+ TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const;
+ std::shared_ptr<TColumnLoader> GetColumnLoader(const ui32 columnId) const;
+
+ /// Returns an id of the column located by name. The name should exists in the schema.
+ ui32 GetColumnId(const std::string& name) const;
+ std::optional<ui32> GetColumnIdOptional(const std::string& name) const;
+
+ /// Returns a name of the column located by id.
+ TString GetColumnName(ui32 id, bool required = true) const;
+
+ /// Returns names of columns defined by the specific ids.
+ std::vector<TString> GetColumnNames(const std::vector<ui32>& ids) const;
+ std::vector<ui32> GetColumnIds() const;
+
+ /// Returns info of columns defined by specific ids.
+ std::vector<TNameTypeInfo> GetColumns(const std::vector<ui32>& ids) const;
+
+ /// Traditional Primary Key (includes uniqueness, search and sorting logic)
+ std::vector<TNameTypeInfo> GetPrimaryKey() const {
+ return GetColumns(KeyColumns);
+ }
+
+ /// Returns id of the first column of the primary key.
+ ui32 GetPKFirstColumnId() const {
+ Y_VERIFY(KeyColumns.size());
+ return KeyColumns[0];
+ }
+
+ // Sorting key: could be less or greater then traditional PK
+ // It could be empty for append-only tables. It could be greater then PK for better columns compression.
+ // If sorting key includes uniqueness key as a prefix we are able to use MergeSort for REPLACE.
+ const std::shared_ptr<arrow::Schema>& GetSortingKey() const { return SortingKey; }
+ const std::shared_ptr<arrow::Schema>& GetReplaceKey() const { return ReplaceKey; }
+ const std::shared_ptr<arrow::Schema>& GetExtendedKey() const { return ExtendedKey; }
+ const std::shared_ptr<arrow::Schema>& GetIndexKey() const { return IndexKey; }
+
+ /// Initializes sorting, replace, index and extended keys.
+ void SetAllKeys();
+
+ void CheckTtlColumn(const TString& ttlColumn) const {
+ Y_VERIFY(!ttlColumn.empty());
+ Y_VERIFY(MinMaxIdxColumnsIds.contains(GetColumnId(ttlColumn)));
+ }
+
+ std::vector<ui32> GetColumnIds(const std::vector<TString>& columnNames) const;
+
+ std::shared_ptr<arrow::Schema> ArrowSchema() const;
+ std::shared_ptr<arrow::Schema> ArrowSchemaWithSpecials() const;
+ std::shared_ptr<arrow::Schema> AddColumns(const std::shared_ptr<arrow::Schema>& schema,
+ const std::vector<TString>& columns) const;
+
+ std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<ui32>& columnIds, bool withSpecials = false) const;
+ std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<TString>& columnNames) const;
+ std::shared_ptr<arrow::Field> ArrowColumnField(ui32 columnId) const;
+
+ const THashSet<TString>& GetRequiredColumns() const {
+ return RequiredColumns;
+ }
+
+ const THashSet<ui32>& GetMinMaxIdxColumns() const {
+ return MinMaxIdxColumnsIds;
+ }
+
+ bool AllowTtlOverColumn(const TString& name) const;
+
+ /// Returns whether the sorting keys defined.
+ bool IsSorted() const { return SortingKey.get(); }
+
+ /// Returns whether the replace keys defined.
+ bool IsReplacing() const { return ReplaceKey.get(); }
+
+ bool IsCompositeIndexKey() const {
+ return CompositeIndexKey;
+ }
+
+ std::shared_ptr<NArrow::TSortDescription> SortDescription() const;
+ std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription() const;
+
+ static const std::vector<std::string>& GetSpecialColumnNames() {
+ static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) };
+ return result;
+ }
+
+ static const std::vector<ui32>& GetSpecialColumnIds() {
+ static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID };
+ return result;
+ }
+
+private:
+ ui32 Id;
+ TString Name;
+ bool CompositeIndexKey = false;
+ mutable std::shared_ptr<arrow::Schema> Schema;
+ mutable std::shared_ptr<arrow::Schema> SchemaWithSpecials;
+ std::shared_ptr<arrow::Schema> SortingKey;
+ std::shared_ptr<arrow::Schema> ReplaceKey;
+ std::shared_ptr<arrow::Schema> ExtendedKey; // Extend PK with snapshot columns to allow old shapshot reads
+ std::shared_ptr<arrow::Schema> IndexKey;
+ THashSet<TString> RequiredColumns;
+ THashSet<ui32> MinMaxIdxColumnsIds;
+ std::optional<NArrow::TCompression> DefaultCompression;
+ bool CompositeMarks = false;
+};
+
+std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials = false);
+
+/// Extracts columns with the specific ids from the schema.
+std::vector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const std::vector<ui32>& ids);
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp b/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp
new file mode 100644
index 0000000000..8114b5f11a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp
@@ -0,0 +1,5 @@
+#include "tier_info.h"
+
+namespace NKikimr::NOlap {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/scheme/tier_info.h b/ydb/core/tx/columnshard/engines/scheme/tier_info.h
new file mode 100644
index 0000000000..b05a1d7655
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/tier_info.h
@@ -0,0 +1,234 @@
+#pragma once
+
+#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/formats/arrow/common/validation.h>
+#include <ydb/core/formats/arrow/serializer/abstract.h>
+#include <ydb/core/formats/arrow/compression/object.h>
+#include <ydb/core/tx/columnshard/common/scalars.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>
+
+namespace NKikimr::NOlap {
+
+class TTierInfo {
+private:
+ TString Name;
+ TString EvictColumnName;
+ TInstant EvictBorder;
+ bool NeedExport = false;
+
+ ui32 TtlUnitsInSecond;
+ std::optional<NArrow::TCompression> Compression;
+ mutable std::shared_ptr<arrow::Scalar> Scalar;
+
+public:
+ TTierInfo(const TString& tierName, TInstant evictBorder, const TString& column, ui32 unitsInSecond = 0)
+ : Name(tierName)
+ , EvictColumnName(column)
+ , EvictBorder(evictBorder)
+ , TtlUnitsInSecond(unitsInSecond)
+ {
+ Y_VERIFY(!Name.empty());
+ Y_VERIFY(!EvictColumnName.empty());
+ }
+
+ const TString& GetName() const {
+ return Name;
+ }
+
+ const TString& GetEvictColumnName() const {
+ return EvictColumnName;
+ }
+
+ const TInstant GetEvictBorder() const {
+ return EvictBorder;
+ }
+
+ bool GetNeedExport() const {
+ return NeedExport;
+ }
+
+ TTierInfo& SetNeedExport(const bool value) {
+ NeedExport = value;
+ return *this;
+ }
+
+ TTierInfo& SetCompression(const NArrow::TCompression& value) {
+ Compression = value;
+ return *this;
+ }
+
+ const std::optional<NArrow::TCompression> GetCompression() const {
+ if (NeedExport) {
+ return {};
+ }
+ return Compression;
+ }
+
+ std::shared_ptr<arrow::Field> GetEvictColumn(const std::shared_ptr<arrow::Schema>& schema) const {
+ return schema->GetFieldByName(EvictColumnName);
+ }
+
+ std::shared_ptr<arrow::Scalar> EvictScalar(const std::shared_ptr<arrow::Schema>& schema) const {
+ if (Scalar) {
+ return Scalar;
+ }
+ auto evictColumn = GetEvictColumn(schema);
+ Y_VERIFY(evictColumn);
+
+ ui32 multiplier = TtlUnitsInSecond ? TtlUnitsInSecond : 1;
+ switch (evictColumn->type()->id()) {
+ case arrow::Type::TIMESTAMP:
+ Scalar = std::make_shared<arrow::TimestampScalar>(
+ EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO));
+ break;
+ case arrow::Type::UINT16: // YQL Date
+ Scalar = std::make_shared<arrow::UInt16Scalar>(EvictBorder.Days());
+ break;
+ case arrow::Type::UINT32: // YQL Datetime or Uint32
+ Scalar = std::make_shared<arrow::UInt32Scalar>(EvictBorder.Seconds() * multiplier);
+ break;
+ case arrow::Type::UINT64:
+ Scalar = std::make_shared<arrow::UInt64Scalar>(EvictBorder.Seconds() * multiplier);
+ break;
+ default:
+ break;
+ }
+
+ return Scalar;
+ }
+
+ static std::shared_ptr<TTierInfo> MakeTtl(TInstant ttlBorder, const TString& ttlColumn, ui32 unitsInSecond = 0) {
+ return std::make_shared<TTierInfo>("TTL", ttlBorder, ttlColumn, unitsInSecond);
+ }
+
+ TString GetDebugString() const {
+ TStringBuilder sb;
+ sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << EvictColumnName << "' ";
+ if (Compression) {
+ sb << Compression->DebugString();
+ } else {
+ sb << "NOT_SPECIFIED(Default)";
+ }
+ return sb;
+ }
+};
+
+class TTierRef {
+public:
+ TTierRef(const std::shared_ptr<TTierInfo>& tierInfo)
+ : Info(tierInfo)
+ {
+ Y_VERIFY(tierInfo);
+ }
+
+ bool operator < (const TTierRef& b) const {
+ if (Info->GetEvictBorder() < b.Info->GetEvictBorder()) {
+ return true;
+ } else if (Info->GetEvictBorder() == b.Info->GetEvictBorder()) {
+ return Info->GetName() > b.Info->GetName(); // add stability: smaller name is hotter
+ }
+ return false;
+ }
+
+ bool operator == (const TTierRef& b) const {
+ return Info->GetEvictBorder() == b.Info->GetEvictBorder()
+ && Info->GetName() == b.Info->GetName();
+ }
+
+ const TTierInfo& Get() const {
+ return *Info;
+ }
+
+private:
+ std::shared_ptr<TTierInfo> Info;
+};
+
+class TTiering {
+ using TTiersMap = THashMap<TString, std::shared_ptr<TTierInfo>>;
+ TTiersMap TierByName;
+ TSet<TTierRef> OrderedTiers;
+public:
+ std::shared_ptr<TTierInfo> Ttl;
+
+ const TTiersMap& GetTierByName() const {
+ return TierByName;
+ }
+
+ const TSet<TTierRef>& GetOrderedTiers() const {
+ return OrderedTiers;
+ }
+
+ bool HasTiers() const {
+ return !OrderedTiers.empty();
+ }
+
+ void Add(const std::shared_ptr<TTierInfo>& tier) {
+ if (HasTiers()) {
+ // TODO: support different ttl columns
+ Y_VERIFY(tier->GetEvictColumnName() == OrderedTiers.begin()->Get().GetEvictColumnName());
+ }
+
+ TierByName.emplace(tier->GetName(), tier);
+ OrderedTiers.emplace(tier);
+ }
+
+ TString GetHottestTierName() const {
+ if (OrderedTiers.size()) {
+ return OrderedTiers.rbegin()->Get().GetName(); // hottest one
+ }
+ return {};
+ }
+
+ std::shared_ptr<arrow::Scalar> EvictScalar(const std::shared_ptr<arrow::Schema>& schema) const {
+ auto ttlTs = Ttl ? Ttl->EvictScalar(schema) : nullptr;
+ auto tierTs = OrderedTiers.empty() ? nullptr : OrderedTiers.begin()->Get().EvictScalar(schema);
+ if (!ttlTs) {
+ return tierTs;
+ } else if (!tierTs) {
+ return ttlTs;
+ }
+ return NArrow::ScalarLess(ttlTs, tierTs) ? tierTs : ttlTs; // either TTL or tier border appear
+ }
+
+ std::optional<NArrow::TCompression> GetCompression(const TString& name) const {
+ auto it = TierByName.find(name);
+ if (it != TierByName.end()) {
+ Y_VERIFY(!name.empty());
+ return it->second->GetCompression();
+ }
+ return {};
+ }
+
+ bool NeedExport(const TString& name) const {
+ auto it = TierByName.find(name);
+ if (it != TierByName.end()) {
+ Y_VERIFY(!name.empty());
+ return it->second->GetNeedExport();
+ }
+ return false;
+ }
+
+ THashSet<TString> GetTtlColumns() const {
+ THashSet<TString> out;
+ if (Ttl) {
+ out.insert(Ttl->GetEvictColumnName());
+ }
+ for (auto& [tierName, tier] : TierByName) {
+ out.insert(tier->GetEvictColumnName());
+ }
+ return out;
+ }
+
+ TString GetDebugString() const {
+ TStringBuilder sb;
+ if (Ttl) {
+ sb << Ttl->GetDebugString() << "; ";
+ }
+ for (auto&& i : OrderedTiers) {
+ sb << i.Get().GetDebugString() << "; ";
+ }
+ return sb;
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/scheme/ya.make b/ydb/core/tx/columnshard/engines/scheme/ya.make
index f9839bb0f1..73a7766802 100644
--- a/ydb/core/tx/columnshard/engines/scheme/ya.make
+++ b/ydb/core/tx/columnshard/engines/scheme/ya.make
@@ -4,6 +4,9 @@ SRCS(
abstract_scheme.cpp
snapshot_scheme.cpp
filtered_scheme.cpp
+ index_info.cpp
+ tier_info.cpp
+ column_features.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/engines/tier_info.h b/ydb/core/tx/columnshard/engines/tier_info.h
index e1d7fb5309..beaf9cefc2 100644
--- a/ydb/core/tx/columnshard/engines/tier_info.h
+++ b/ydb/core/tx/columnshard/engines/tier_info.h
@@ -1,236 +1,3 @@
#pragma once
-#include "defs.h"
-#include "scalars.h"
-
-#include <ydb/core/formats/arrow/arrow_helpers.h>
-#include <ydb/core/formats/arrow/common/validation.h>
-#include <ydb/core/formats/arrow/serializer/abstract.h>
-#include <ydb/core/formats/arrow/compression/object.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>
-
-namespace NKikimr::NOlap {
-
-class TTierInfo {
-private:
- TString Name;
- TString EvictColumnName;
- TInstant EvictBorder;
- bool NeedExport = false;
-
- ui32 TtlUnitsInSecond;
- std::optional<NArrow::TCompression> Compression;
- mutable std::shared_ptr<arrow::Scalar> Scalar;
-
-public:
- TTierInfo(const TString& tierName, TInstant evictBorder, const TString& column, ui32 unitsInSecond = 0)
- : Name(tierName)
- , EvictColumnName(column)
- , EvictBorder(evictBorder)
- , TtlUnitsInSecond(unitsInSecond)
- {
- Y_VERIFY(!Name.empty());
- Y_VERIFY(!EvictColumnName.empty());
- }
-
- const TString& GetName() const {
- return Name;
- }
-
- const TString& GetEvictColumnName() const {
- return EvictColumnName;
- }
-
- const TInstant GetEvictBorder() const {
- return EvictBorder;
- }
-
- bool GetNeedExport() const {
- return NeedExport;
- }
-
- TTierInfo& SetNeedExport(const bool value) {
- NeedExport = value;
- return *this;
- }
-
- TTierInfo& SetCompression(const NArrow::TCompression& value) {
- Compression = value;
- return *this;
- }
-
- const std::optional<NArrow::TCompression> GetCompression() const {
- if (NeedExport) {
- return {};
- }
- return Compression;
- }
-
- std::shared_ptr<arrow::Field> GetEvictColumn(const std::shared_ptr<arrow::Schema>& schema) const {
- return schema->GetFieldByName(EvictColumnName);
- }
-
- std::shared_ptr<arrow::Scalar> EvictScalar(const std::shared_ptr<arrow::Schema>& schema) const {
- if (Scalar) {
- return Scalar;
- }
- auto evictColumn = GetEvictColumn(schema);
- Y_VERIFY(evictColumn);
-
- ui32 multiplier = TtlUnitsInSecond ? TtlUnitsInSecond : 1;
- switch (evictColumn->type()->id()) {
- case arrow::Type::TIMESTAMP:
- Scalar = std::make_shared<arrow::TimestampScalar>(
- EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO));
- break;
- case arrow::Type::UINT16: // YQL Date
- Scalar = std::make_shared<arrow::UInt16Scalar>(EvictBorder.Days());
- break;
- case arrow::Type::UINT32: // YQL Datetime or Uint32
- Scalar = std::make_shared<arrow::UInt32Scalar>(EvictBorder.Seconds() * multiplier);
- break;
- case arrow::Type::UINT64:
- Scalar = std::make_shared<arrow::UInt64Scalar>(EvictBorder.Seconds() * multiplier);
- break;
- default:
- break;
- }
-
- return Scalar;
- }
-
- static std::shared_ptr<TTierInfo> MakeTtl(TInstant ttlBorder, const TString& ttlColumn, ui32 unitsInSecond = 0) {
- return std::make_shared<TTierInfo>("TTL", ttlBorder, ttlColumn, unitsInSecond);
- }
-
- TString GetDebugString() const {
- TStringBuilder sb;
- sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << EvictColumnName << "' ";
- if (Compression) {
- sb << Compression->DebugString();
- } else {
- sb << "NOT_SPECIFIED(Default)";
- }
- return sb;
- }
-};
-
-class TTierRef {
-public:
- TTierRef(const std::shared_ptr<TTierInfo>& tierInfo)
- : Info(tierInfo)
- {
- Y_VERIFY(tierInfo);
- }
-
- bool operator < (const TTierRef& b) const {
- if (Info->GetEvictBorder() < b.Info->GetEvictBorder()) {
- return true;
- } else if (Info->GetEvictBorder() == b.Info->GetEvictBorder()) {
- return Info->GetName() > b.Info->GetName(); // add stability: smaller name is hotter
- }
- return false;
- }
-
- bool operator == (const TTierRef& b) const {
- return Info->GetEvictBorder() == b.Info->GetEvictBorder()
- && Info->GetName() == b.Info->GetName();
- }
-
- const TTierInfo& Get() const {
- return *Info;
- }
-
-private:
- std::shared_ptr<TTierInfo> Info;
-};
-
-class TTiering {
- using TTiersMap = THashMap<TString, std::shared_ptr<TTierInfo>>;
- TTiersMap TierByName;
- TSet<TTierRef> OrderedTiers;
-public:
- std::shared_ptr<TTierInfo> Ttl;
-
- const TTiersMap& GetTierByName() const {
- return TierByName;
- }
-
- const TSet<TTierRef>& GetOrderedTiers() const {
- return OrderedTiers;
- }
-
- bool HasTiers() const {
- return !OrderedTiers.empty();
- }
-
- void Add(const std::shared_ptr<TTierInfo>& tier) {
- if (HasTiers()) {
- // TODO: support different ttl columns
- Y_VERIFY(tier->GetEvictColumnName() == OrderedTiers.begin()->Get().GetEvictColumnName());
- }
-
- TierByName.emplace(tier->GetName(), tier);
- OrderedTiers.emplace(tier);
- }
-
- TString GetHottestTierName() const {
- if (OrderedTiers.size()) {
- return OrderedTiers.rbegin()->Get().GetName(); // hottest one
- }
- return {};
- }
-
- std::shared_ptr<arrow::Scalar> EvictScalar(const std::shared_ptr<arrow::Schema>& schema) const {
- auto ttlTs = Ttl ? Ttl->EvictScalar(schema) : nullptr;
- auto tierTs = OrderedTiers.empty() ? nullptr : OrderedTiers.begin()->Get().EvictScalar(schema);
- if (!ttlTs) {
- return tierTs;
- } else if (!tierTs) {
- return ttlTs;
- }
- return NArrow::ScalarLess(ttlTs, tierTs) ? tierTs : ttlTs; // either TTL or tier border appear
- }
-
- std::optional<NArrow::TCompression> GetCompression(const TString& name) const {
- auto it = TierByName.find(name);
- if (it != TierByName.end()) {
- Y_VERIFY(!name.empty());
- return it->second->GetCompression();
- }
- return {};
- }
-
- bool NeedExport(const TString& name) const {
- auto it = TierByName.find(name);
- if (it != TierByName.end()) {
- Y_VERIFY(!name.empty());
- return it->second->GetNeedExport();
- }
- return false;
- }
-
- THashSet<TString> GetTtlColumns() const {
- THashSet<TString> out;
- if (Ttl) {
- out.insert(Ttl->GetEvictColumnName());
- }
- for (auto& [tierName, tier] : TierByName) {
- out.insert(tier->GetEvictColumnName());
- }
- return out;
- }
-
- TString GetDebugString() const {
- TStringBuilder sb;
- if (Ttl) {
- sb << Ttl->GetDebugString() << "; ";
- }
- for (auto&& i : OrderedTiers) {
- sb << i.Get().GetDebugString() << "; ";
- }
- return sb;
- }
-};
-
-}
+#include "scheme/tier_info.h"
diff --git a/ydb/core/tx/columnshard/engines/ya.make b/ydb/core/tx/columnshard/engines/ya.make
index a2582a1657..e409fd8830 100644
--- a/ydb/core/tx/columnshard/engines/ya.make
+++ b/ydb/core/tx/columnshard/engines/ya.make
@@ -15,7 +15,6 @@ SRCS(
index_logic_logs.cpp
filter.cpp
portion_info.cpp
- scalars.cpp
tier_info.cpp
)
@@ -31,6 +30,7 @@ PEERDIR(
ydb/core/tx/columnshard/engines/predicate
ydb/core/tx/columnshard/engines/storage
ydb/core/tx/columnshard/engines/insert_table
+ ydb/core/tx/columnshard/engines/portions
ydb/core/formats/arrow/compression
ydb/core/tx/program
@@ -38,7 +38,6 @@ PEERDIR(
ydb/library/yql/public/udf/service/exception_policy
)
-GENERATE_ENUM_SERIALIZATION(portion_info.h)
YQL_LAST_ABI_VERSION()
END()