diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-19 16:57:24 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-19 16:57:24 +0300 |
commit | d34037f1ebac7a9b3d7669010224c828ed89c5aa (patch) | |
tree | 044c353106aa8f09b9808b4b16eeea5467dbbd70 | |
parent | 610e03a93f88446e713af1c9e30c483394fe26e6 (diff) | |
download | ydb-d34037f1ebac7a9b3d7669010224c828ed89c5aa.tar.gz |
KIKIMR-18796:move libraries for usage in future
45 files changed, 2229 insertions, 2029 deletions
diff --git a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt index 228521a024..f9f50dfaa1 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt @@ -11,7 +11,11 @@ add_library(tx-columnshard-common) target_link_libraries(tx-columnshard-common PUBLIC contrib-libs-cxxsupp yutil + ydb-core-protos + libs-apache-arrow ) target_sources(tx-columnshard-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/scalars.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/snapshot.cpp ) diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt index 9f38a9ba79..6128aa8f0e 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt @@ -12,7 +12,11 @@ target_link_libraries(tx-columnshard-common PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil + ydb-core-protos + libs-apache-arrow ) target_sources(tx-columnshard-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/scalars.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/snapshot.cpp ) diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt index 9f38a9ba79..6128aa8f0e 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt @@ -12,7 +12,11 @@ target_link_libraries(tx-columnshard-common PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil + ydb-core-protos + libs-apache-arrow ) target_sources(tx-columnshard-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/scalars.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/snapshot.cpp ) diff --git a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt index 228521a024..f9f50dfaa1 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt @@ -11,7 +11,11 @@ add_library(tx-columnshard-common) target_link_libraries(tx-columnshard-common PUBLIC contrib-libs-cxxsupp yutil + ydb-core-protos + libs-apache-arrow ) target_sources(tx-columnshard-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/scalars.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/snapshot.cpp ) diff --git a/ydb/core/tx/columnshard/engines/scalars.cpp b/ydb/core/tx/columnshard/common/scalars.cpp index 9cc6993808..bbf24e3cc1 100644 --- a/ydb/core/tx/columnshard/engines/scalars.cpp +++ b/ydb/core/tx/columnshard/common/scalars.cpp @@ -2,6 +2,7 @@ #include <ydb/core/formats/arrow/switch_type.h> #include <ydb/library/yverify_stream/yverify_stream.h> +#include <util/system/unaligned_mem.h> namespace NKikimr::NOlap { diff --git a/ydb/core/tx/columnshard/common/scalars.h b/ydb/core/tx/columnshard/common/scalars.h new file mode 100644 index 0000000000..c2bfed65c1 --- /dev/null +++ b/ydb/core/tx/columnshard/common/scalars.h @@ -0,0 +1,17 @@ +#pragma once + +#include <ydb/core/protos/tx_columnshard.pb.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/scalar.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> +#include <memory> + +namespace NKikimr::NOlap { + +void ScalarToConstant(const arrow::Scalar& scalar, NKikimrSSA::TProgram_TConstant& value); +std::shared_ptr<arrow::Scalar> ConstantToScalar(const NKikimrSSA::TProgram_TConstant& value, + const std::shared_ptr<arrow::DataType>& type); + +TString SerializeKeyScalar(const std::shared_ptr<arrow::Scalar>& key); +std::shared_ptr<arrow::Scalar> DeserializeKeyScalar(const TString& key, const std::shared_ptr<arrow::DataType>& type); + +} diff --git a/ydb/core/tx/columnshard/common/snapshot.cpp b/ydb/core/tx/columnshard/common/snapshot.cpp new file mode 100644 index 0000000000..9f90580806 --- /dev/null +++ b/ydb/core/tx/columnshard/common/snapshot.cpp @@ -0,0 +1,5 @@ +#include "snapshot.h" + +namespace NKikimr::NOlap { + +}; diff --git a/ydb/core/tx/columnshard/common/snapshot.h b/ydb/core/tx/columnshard/common/snapshot.h new file mode 100644 index 0000000000..3da8294491 --- /dev/null +++ b/ydb/core/tx/columnshard/common/snapshot.h @@ -0,0 +1,51 @@ +#pragma once +#include <util/stream/output.h> +#include <util/string/cast.h> + +namespace NKikimr::NOlap { + +class TSnapshot { +private: + ui64 PlanStep = 0; + ui64 TxId = 0; + +public: + constexpr TSnapshot(const ui64 planStep, const ui64 txId) noexcept + : PlanStep(planStep) + , TxId(txId) { + } + + constexpr ui64 GetPlanStep() const noexcept { + return PlanStep; + } + + constexpr ui64 GetTxId() const noexcept { + return TxId; + } + + constexpr bool IsZero() const noexcept { + return PlanStep == 0 && TxId == 0; + } + + constexpr bool Valid() const noexcept { + return PlanStep && TxId; + } + + static constexpr TSnapshot Zero() noexcept { + return TSnapshot(0, 0); + } + + static constexpr TSnapshot Max() noexcept { + return TSnapshot(-1ll, -1ll); + } + + constexpr bool operator==(const TSnapshot&) const noexcept = default; + + constexpr auto operator<=>(const TSnapshot&) const noexcept = default; + + friend IOutputStream& operator<<(IOutputStream& out, const TSnapshot& s) { + return out << "{" << s.PlanStep << ':' << (s.TxId == std::numeric_limits<ui64>::max() ? "max" : ::ToString(s.TxId)) << "}"; + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/common/ya.make b/ydb/core/tx/columnshard/common/ya.make index 91a0d3a545..36c5c82b5d 100644 --- a/ydb/core/tx/columnshard/common/ya.make +++ b/ydb/core/tx/columnshard/common/ya.make @@ -2,9 +2,13 @@ LIBRARY() SRCS( reverse_accessor.cpp + scalars.cpp + snapshot.cpp ) PEERDIR( + ydb/core/protos + contrib/libs/apache/arrow ) END() diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt index 6d9335d6a8..204ac83fc1 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt @@ -7,18 +7,13 @@ add_subdirectory(insert_table) +add_subdirectory(portions) add_subdirectory(predicate) add_subdirectory(reader) add_subdirectory(scheme) add_subdirectory(storage) add_subdirectory(ut) add_subdirectory(writer) -get_built_tool_path( - TOOL_enum_parser_bin - TOOL_enum_parser_dependency - tools/enum_parser/enum_parser - enum_parser -) add_library(tx-columnshard-engines) target_compile_options(tx-columnshard-engines PRIVATE @@ -38,10 +33,10 @@ target_link_libraries(tx-columnshard-engines PUBLIC columnshard-engines-predicate columnshard-engines-storage columnshard-engines-insert_table + columnshard-engines-portions formats-arrow-compression core-tx-program udf-service-exception_policy - tools-enum_parser-enum_serialization_runtime ) target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -54,11 +49,5 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp ) -generate_enum_serilization(tx-columnshard-engines - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.h - INCLUDE_HEADERS - ydb/core/tx/columnshard/engines/portion_info.h -) diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt index df8b47a1b8..a7bd0121aa 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt @@ -7,18 +7,13 @@ add_subdirectory(insert_table) +add_subdirectory(portions) add_subdirectory(predicate) add_subdirectory(reader) add_subdirectory(scheme) add_subdirectory(storage) add_subdirectory(ut) add_subdirectory(writer) -get_built_tool_path( - TOOL_enum_parser_bin - TOOL_enum_parser_dependency - tools/enum_parser/enum_parser - enum_parser -) add_library(tx-columnshard-engines) target_compile_options(tx-columnshard-engines PRIVATE @@ -39,10 +34,10 @@ target_link_libraries(tx-columnshard-engines PUBLIC columnshard-engines-predicate columnshard-engines-storage columnshard-engines-insert_table + columnshard-engines-portions formats-arrow-compression core-tx-program udf-service-exception_policy - tools-enum_parser-enum_serialization_runtime ) target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -55,11 +50,5 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp ) -generate_enum_serilization(tx-columnshard-engines - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.h - INCLUDE_HEADERS - ydb/core/tx/columnshard/engines/portion_info.h -) diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt index df8b47a1b8..a7bd0121aa 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt @@ -7,18 +7,13 @@ add_subdirectory(insert_table) +add_subdirectory(portions) add_subdirectory(predicate) add_subdirectory(reader) add_subdirectory(scheme) add_subdirectory(storage) add_subdirectory(ut) add_subdirectory(writer) -get_built_tool_path( - TOOL_enum_parser_bin - TOOL_enum_parser_dependency - tools/enum_parser/enum_parser - enum_parser -) add_library(tx-columnshard-engines) target_compile_options(tx-columnshard-engines PRIVATE @@ -39,10 +34,10 @@ target_link_libraries(tx-columnshard-engines PUBLIC columnshard-engines-predicate columnshard-engines-storage columnshard-engines-insert_table + columnshard-engines-portions formats-arrow-compression core-tx-program udf-service-exception_policy - tools-enum_parser-enum_serialization_runtime ) target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -55,11 +50,5 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp ) -generate_enum_serilization(tx-columnshard-engines - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.h - INCLUDE_HEADERS - ydb/core/tx/columnshard/engines/portion_info.h -) diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt index 6d9335d6a8..204ac83fc1 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt @@ -7,18 +7,13 @@ add_subdirectory(insert_table) +add_subdirectory(portions) add_subdirectory(predicate) add_subdirectory(reader) add_subdirectory(scheme) add_subdirectory(storage) add_subdirectory(ut) add_subdirectory(writer) -get_built_tool_path( - TOOL_enum_parser_bin - TOOL_enum_parser_dependency - tools/enum_parser/enum_parser - enum_parser -) add_library(tx-columnshard-engines) target_compile_options(tx-columnshard-engines PRIVATE @@ -38,10 +33,10 @@ target_link_libraries(tx-columnshard-engines PUBLIC columnshard-engines-predicate columnshard-engines-storage columnshard-engines-insert_table + columnshard-engines-portions formats-arrow-compression core-tx-program udf-service-exception_policy - tools-enum_parser-enum_serialization_runtime ) target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -54,11 +49,5 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp ) -generate_enum_serilization(tx-columnshard-engines - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.h - INCLUDE_HEADERS - ydb/core/tx/columnshard/engines/portion_info.h -) diff --git a/ydb/core/tx/columnshard/engines/column_features.cpp b/ydb/core/tx/columnshard/engines/column_features.cpp index dd6341ec7a..44faec99a7 100644 --- a/ydb/core/tx/columnshard/engines/column_features.cpp +++ b/ydb/core/tx/columnshard/engines/column_features.cpp @@ -1,79 +1 @@ #include "column_features.h" -#include "index_info.h" -#include <ydb/core/formats/arrow/serializer/full.h> -#include <ydb/core/formats/arrow/serializer/batch_only.h> -#include <util/string/builder.h> - -namespace NKikimr::NOlap { - -NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetSaveTransformer() const { - NArrow::NTransformation::ITransformer::TPtr transformer; - if (DictionaryEncoding) { - transformer = DictionaryEncoding->BuildEncoder(); - } - return transformer; -} - -NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetLoadTransformer() const { - NArrow::NTransformation::ITransformer::TPtr transformer; - if (DictionaryEncoding) { - transformer = DictionaryEncoding->BuildDecoder(); - } - return transformer; -} - -std::shared_ptr<NKikimr::NOlap::TColumnLoader> TColumnFeatures::GetLoader(const TIndexInfo& info) const { - if (!LoaderCache) { - NArrow::NTransformation::ITransformer::TPtr transformer = GetLoadTransformer(); - auto schema = info.GetColumnSchema(ColumnId); - if (!transformer) { - LoaderCache = std::make_shared<TColumnLoader>(transformer, - std::make_shared<NArrow::NSerialization::TBatchPayloadDeserializer>(schema), - schema, ColumnId); - } else { - LoaderCache = std::make_shared<TColumnLoader>(transformer, - std::make_shared<NArrow::NSerialization::TFullDataDeserializer>(), - schema, ColumnId); - } - } - return LoaderCache; -} - -std::optional<NKikimr::NOlap::TColumnFeatures> TColumnFeatures::BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo, const ui32 columnId) { - TColumnFeatures result(columnId); - if (columnInfo.HasCompression()) { - auto settings = NArrow::TCompression::BuildFromProto(columnInfo.GetCompression()); - Y_VERIFY(settings.IsSuccess()); - result.Compression = *settings; - } - if (columnInfo.HasDictionaryEncoding()) { - auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnInfo.GetDictionaryEncoding()); - Y_VERIFY(settings.IsSuccess()); - result.DictionaryEncoding = *settings; - } - return result; -} - -std::unique_ptr<arrow::util::Codec> TColumnFeatures::GetCompressionCodec() const { - if (Compression) { - return Compression->BuildArrowCodec(); - } else { - return nullptr; - } -} - -TString TColumnLoader::DebugString() const { - TStringBuilder result; - if (ExpectedSchema) { - result << "schema:" << ExpectedSchema->ToString() << ";"; - } - if (Transformer) { - result << "transformer:" << Transformer->DebugString() << ";"; - } - if (Deserializer) { - result << "deserializer:" << Deserializer->DebugString() << ";"; - } - return result; -} - -} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/column_features.h b/ydb/core/tx/columnshard/engines/column_features.h index bb5e79428a..053578affb 100644 --- a/ydb/core/tx/columnshard/engines/column_features.h +++ b/ydb/core/tx/columnshard/engines/column_features.h @@ -1,120 +1,2 @@ #pragma once -#include <ydb/core/formats/arrow/compression/object.h> -#include <ydb/core/formats/arrow/dictionary/object.h> -#include <ydb/core/formats/arrow/serializer/abstract.h> -#include <ydb/core/formats/arrow/transformer/abstract.h> -#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> - -namespace NKikimr::NOlap { - -class TSaverContext { -private: - TString TierName; - std::optional<NArrow::TCompression> ExternalCompression; -public: - const std::optional<NArrow::TCompression>& GetExternalCompression() const { - return ExternalCompression; - } - TSaverContext& SetExternalCompression(const std::optional<NArrow::TCompression>& value) { - ExternalCompression = value; - return *this; - } - const TString& GetTierName() const { - return TierName; - } - TSaverContext& SetTierName(const TString& value) { - TierName = value; - return *this; - } -}; - -class TColumnSaver { -private: - NArrow::NTransformation::ITransformer::TPtr Transformer; - NArrow::NSerialization::ISerializer::TPtr Serializer; -public: - TColumnSaver() = default; - TColumnSaver(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::ISerializer::TPtr serializer) - : Transformer(transformer) - , Serializer(serializer) { - Y_VERIFY(Serializer); - } - - TString Apply(const std::shared_ptr<arrow::RecordBatch>& data) const { - Y_VERIFY(Serializer); - if (Transformer) { - return Serializer->Serialize(Transformer->Transform(data)); - } else { - return Serializer->Serialize(data); - } - } -}; - -class TColumnLoader { -private: - NArrow::NTransformation::ITransformer::TPtr Transformer; - NArrow::NSerialization::IDeserializer::TPtr Deserializer; - std::shared_ptr<arrow::Schema> ExpectedSchema; - const ui32 ColumnId; -public: - TString DebugString() const; - - TColumnLoader(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::IDeserializer::TPtr deserializer, - const std::shared_ptr<arrow::Schema>& expectedSchema, const ui32 columnId) - : Transformer(transformer) - , Deserializer(deserializer) - , ExpectedSchema(expectedSchema) - , ColumnId(columnId) - { - Y_VERIFY(ExpectedSchema); - Y_VERIFY(Deserializer); - } - - ui32 GetColumnId() const { - return ColumnId; - } - - std::shared_ptr<arrow::Schema> GetExpectedSchema() const { - return ExpectedSchema; - } - - arrow::Result<std::shared_ptr<arrow::RecordBatch>> Apply(const TString& data) const { - Y_VERIFY(Deserializer); - arrow::Result<std::shared_ptr<arrow::RecordBatch>> columnArray = Deserializer->Deserialize(data); - if (!columnArray.ok()) { - return columnArray; - } - if (Transformer) { - return Transformer->Transform(*columnArray); - } else { - return columnArray; - } - } -}; - -struct TIndexInfo; - -class TColumnFeatures { -private: - const ui32 ColumnId; - std::optional<NArrow::TCompression> Compression; - std::optional<NArrow::NDictionary::TEncodingSettings> DictionaryEncoding; - mutable std::shared_ptr<TColumnLoader> LoaderCache; -public: - TColumnFeatures(const ui32 columnId) - : ColumnId(columnId) - { - - } - static std::optional<TColumnFeatures> BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo, const ui32 columnId); - - NArrow::NTransformation::ITransformer::TPtr GetSaveTransformer() const; - NArrow::NTransformation::ITransformer::TPtr GetLoadTransformer() const; - - std::unique_ptr<arrow::util::Codec> GetCompressionCodec() const; - - std::shared_ptr<TColumnLoader> GetLoader(const TIndexInfo& info) const; - -}; - -} // namespace NKikimr::NOlap +#include "scheme/column_features.h" diff --git a/ydb/core/tx/columnshard/engines/columns_table.h b/ydb/core/tx/columnshard/engines/columns_table.h index 4be4a052b2..37f07d3472 100644 --- a/ydb/core/tx/columnshard/engines/columns_table.h +++ b/ydb/core/tx/columnshard/engines/columns_table.h @@ -1,95 +1,12 @@ #pragma once -#include "defs.h" #include "db_wrapper.h" +#include "portions/column_record.h" #include <ydb/core/tx/columnshard/blob.h> namespace NKikimr::NOlap { -struct TColumnRecord { - ui64 Granule; - ui64 PlanStep; // {PlanStep, TxId} is min snapshot for {Granule, Portion} - ui64 TxId; - ui64 Portion; // Id of independent (overlayed by PK) portion of data in granule - ui64 XPlanStep{0}; // {XPlanStep, XTxId} is snapshot where the blob has been removed (i.e. compacted into another one) - ui64 XTxId{0}; - ui32 ColumnId{0}; - ui16 Chunk; // Number of blob for column ColumnName in Portion - TBlobRange BlobRange; - TString Metadata; - - std::optional<ui32> GetChunkRowsCount() const { - return {}; - } - - bool operator == (const TColumnRecord& rec) const { - return (Granule == rec.Granule) && (ColumnId == rec.ColumnId) && - (PlanStep == rec.PlanStep) && (TxId == rec.TxId) && (Portion == rec.Portion) && (Chunk == rec.Chunk); - } - - TString SerializedBlobId() const { - return BlobRange.BlobId.SerializeBinary(); - } - - bool Valid() const { - return ValidExceptSnapshot() && ValidSnapshot(); - } - - bool ValidSnapshot() const { - return PlanStep && TxId; - } - - bool ValidExceptSnapshot() const { - return Granule && ColumnId && Portion && ValidBlob(); - } - - bool ValidBlob() const { - return BlobRange.BlobId.IsValid() && BlobRange.Size; - } - - void SetSnapshot(const TSnapshot& snap) { - Y_VERIFY(snap.Valid()); - PlanStep = snap.GetPlanStep(); - TxId = snap.GetTxId(); - } - - void SetXSnapshot(const TSnapshot& snap) { - Y_VERIFY(snap.Valid()); - XPlanStep = snap.GetPlanStep(); - XTxId = snap.GetTxId(); - } - - static TColumnRecord Make(ui64 granule, ui32 columnId, const TSnapshot& minSnapshot, ui64 portion, ui16 chunk = 0) { - TColumnRecord row; - row.Granule = granule; - row.PlanStep = minSnapshot.GetPlanStep(); - row.TxId = minSnapshot.GetTxId(); - row.Portion = portion; - row.ColumnId = columnId; - row.Chunk = chunk; - //row.BlobId - //row.Metadata - return row; - } - - friend IOutputStream& operator << (IOutputStream& out, const TColumnRecord& rec) { - out << '{'; - out << 'g' << rec.Granule << 'p' << rec.Portion; - if (rec.Chunk) { - out << 'n' << rec.Chunk; - } - out << ',' << (i32)rec.ColumnId; - out << ',' << rec.PlanStep << ':' << (rec.TxId == Max<ui64>() ? "max" : ToString(rec.TxId)); - if (rec.XPlanStep) { - out << '-' << rec.XPlanStep << ':' << (rec.XTxId == Max<ui64>() ? "max" : ToString(rec.XTxId)); - } - out << ',' << rec.BlobRange.ToString(); - out << '}'; - return out; - } -}; - class TColumnsTable { public: TColumnsTable(ui32 indexId) diff --git a/ydb/core/tx/columnshard/engines/defs.h b/ydb/core/tx/columnshard/engines/defs.h index beb1880f7a..6c4e0371d7 100644 --- a/ydb/core/tx/columnshard/engines/defs.h +++ b/ydb/core/tx/columnshard/engines/defs.h @@ -3,6 +3,7 @@ #include <ydb/core/base/defs.h> #include <ydb/core/base/logoblob.h> #include <ydb/core/tx/ctor_logger.h> +#include <ydb/core/tx/columnshard/common/snapshot.h> namespace NKikimr::NOlap { @@ -15,50 +16,6 @@ inline TWriteId operator++(TWriteId& w) noexcept { return w; } -class TSnapshot { -private: - ui64 PlanStep = 0; - ui64 TxId = 0; - -public: - constexpr TSnapshot(const ui64 planStep, const ui64 txId) noexcept - : PlanStep(planStep) - , TxId(txId) { - } - - constexpr ui64 GetPlanStep() const noexcept { - return PlanStep; - } - - constexpr ui64 GetTxId() const noexcept { - return TxId; - } - - constexpr bool IsZero() const noexcept { - return PlanStep == 0 && TxId == 0; - } - - constexpr bool Valid() const noexcept { - return PlanStep && TxId; - } - - static constexpr TSnapshot Zero() noexcept { - return TSnapshot(0, 0); - } - - static constexpr TSnapshot Max() noexcept { - return TSnapshot(-1ll, -1ll); - } - - constexpr bool operator==(const TSnapshot&) const noexcept = default; - - constexpr auto operator<=>(const TSnapshot&) const noexcept = default; - - friend IOutputStream& operator<<(IOutputStream& out, const TSnapshot& s) { - return out << "{" << s.PlanStep << ':' << (s.TxId == std::numeric_limits<ui64>::max() ? "max" : ToString(s.TxId)) << "}"; - } -}; - class IBlobGroupSelector { protected: virtual ~IBlobGroupSelector() = default; diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index 2ca6b57cfa..80fbea3cee 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -1,430 +1 @@ #include "index_info.h" -#include "column_engine.h" - -#include <ydb/core/formats/arrow/arrow_batch_builder.h> -#include <ydb/core/formats/arrow/sort_cursor.h> -#include <ydb/core/sys_view/common/schema.h> -#include <ydb/core/formats/arrow/serializer/batch_only.h> -#include <ydb/core/formats/arrow/transformer/dictionary.h> -#include <ydb/core/formats/arrow/serializer/full.h> -#include <ydb/core/base/appdata.h> - -namespace NKikimr::NOlap { - -const TString TIndexInfo::STORE_INDEX_STATS_TABLE = TString("/") + NSysView::SysPathName + "/" + NSysView::StorePrimaryIndexStatsName; -const TString TIndexInfo::TABLE_INDEX_STATS_TABLE = TString("/") + NSysView::SysPathName + "/" + NSysView::TablePrimaryIndexStatsName; - -static std::vector<TString> NamesOnly(const std::vector<TNameTypeInfo>& columns) { - std::vector<TString> out; - out.reserve(columns.size()); - for (const auto& [name, _] : columns) { - out.push_back(name); - } - return out; -} - -TIndexInfo::TIndexInfo(const TString& name, ui32 id) - : NTable::TScheme::TTableSchema() - , Id(id) - , Name(name) -{} - -std::shared_ptr<arrow::RecordBatch> TIndexInfo::AddSpecialColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) { - Y_VERIFY(batch); - i64 numColumns = batch->num_columns(); - i64 numRows = batch->num_rows(); - - auto res = batch->AddColumn(numColumns, arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), - NArrow::MakeUI64Array(snapshot.GetPlanStep(), numRows)); - Y_VERIFY(res.ok()); - res = (*res)->AddColumn(numColumns + 1, arrow::field(SPEC_COL_TX_ID, arrow::uint64()), - NArrow::MakeUI64Array(snapshot.GetTxId(), numRows)); - Y_VERIFY(res.ok()); - Y_VERIFY((*res)->num_columns() == numColumns + 2); - return *res; -} - -std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchemaSnapshot() { - static std::shared_ptr<arrow::Schema> result = std::make_shared<arrow::Schema>(arrow::FieldVector{ - arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), - arrow::field(SPEC_COL_TX_ID, arrow::uint64()) - }); - return result; -} - -bool TIndexInfo::IsSpecialColumn(const arrow::Field& field) { - return IsSpecialColumn(field.name()); -} - -bool TIndexInfo::IsSpecialColumn(const std::string& fieldName) { - return fieldName == SPEC_COL_PLAN_STEP - || fieldName == SPEC_COL_TX_ID; -} - -bool TIndexInfo::IsSpecialColumn(const ui32 fieldId) { - return fieldId == (ui32)ESpecialColumn::PLAN_STEP - || fieldId == (ui32)ESpecialColumn::TX_ID; -} - -ui32 TIndexInfo::GetColumnId(const std::string& name) const { - auto id = GetColumnIdOptional(name); - Y_VERIFY(!!id, "undefined column %s", name.data()); - return *id; -} - -std::optional<ui32> TIndexInfo::GetColumnIdOptional(const std::string& name) const { - const auto ni = ColumnNames.find(name); - - if (ni != ColumnNames.end()) { - return ni->second; - } - if (name == SPEC_COL_PLAN_STEP) { - return ui32(ESpecialColumn::PLAN_STEP); - } else if (name == SPEC_COL_TX_ID) { - return ui32(ESpecialColumn::TX_ID); - } - return {}; -} - -TString TIndexInfo::GetColumnName(ui32 id, bool required) const { - if (ESpecialColumn(id) == ESpecialColumn::PLAN_STEP) { - return SPEC_COL_PLAN_STEP; - } else if (ESpecialColumn(id) == ESpecialColumn::TX_ID) { - return SPEC_COL_TX_ID; - } else { - const auto ci = Columns.find(id); - - if (!required && ci == Columns.end()) { - return {}; - } - - Y_VERIFY(ci != Columns.end()); - return ci->second.Name; - } -} - -std::vector<ui32> TIndexInfo::GetColumnIds() const { - std::vector<ui32> result; - for (auto&& i : Columns) { - result.emplace_back(i.first); - } - result.emplace_back((ui32)ESpecialColumn::PLAN_STEP); - result.emplace_back((ui32)ESpecialColumn::TX_ID); - return result; -} - -std::vector<TString> TIndexInfo::GetColumnNames(const std::vector<ui32>& ids) const { - std::vector<TString> out; - out.reserve(ids.size()); - for (ui32 id : ids) { - const auto ci = Columns.find(id); - Y_VERIFY(ci != Columns.end()); - out.push_back(ci->second.Name); - } - return out; -} - -std::vector<TNameTypeInfo> TIndexInfo::GetColumns(const std::vector<ui32>& ids) const { - return NOlap::GetColumns(*this, ids); -} - -std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema() const { - if (!Schema) { - std::vector<ui32> ids; - ids.reserve(Columns.size()); - for (const auto& [id, _] : Columns) { - ids.push_back(id); - } - - // The ids had a set type before so we keep them sorted. - std::sort(ids.begin(), ids.end()); - Schema = MakeArrowSchema(Columns, ids); - } - - return Schema; -} - -std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchemaWithSpecials() const { - if (SchemaWithSpecials) { - return SchemaWithSpecials; - } - - const auto& schema = ArrowSchema(); - - std::vector<std::shared_ptr<arrow::Field>> extended; - extended.reserve(schema->num_fields() + 3); - - // Place special fields at the beginning of the schema. - extended.push_back(arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64())); - extended.push_back(arrow::field(SPEC_COL_TX_ID, arrow::uint64())); - // Append fields from the regular schema afterward. - extended.insert(extended.end(), schema->fields().begin(), schema->fields().end()); - - SchemaWithSpecials = std::make_shared<arrow::Schema>(std::move(extended)); - return SchemaWithSpecials; -} - -std::shared_ptr<arrow::Schema> TIndexInfo::AddColumns( - const std::shared_ptr<arrow::Schema>& src, - const std::vector<TString>& columns) const -{ - std::shared_ptr<arrow::Schema> all = ArrowSchemaWithSpecials(); - auto fields = src->fields(); - - for (const auto& col : columns) { - const std::string name(col.data(), col.size()); - if (!src->GetFieldByName(name)) { - auto field = all->GetFieldByName(name); - if (!field) { - return {}; - } - fields.push_back(field); - } - } - return std::make_shared<arrow::Schema>(std::move(fields)); -} - -std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const std::vector<ui32>& columnIds, bool withSpecials) const { - return MakeArrowSchema(Columns, columnIds, withSpecials); -} - -std::vector<ui32> TIndexInfo::GetColumnIds(const std::vector<TString>& columnNames) const { - std::vector<ui32> ids; - ids.reserve(columnNames.size()); - for (auto& name : columnNames) { - auto columnId = GetColumnIdOptional(name); - if (!columnId) { - return {}; - } - ids.emplace_back(*columnId); - } - return ids; -} - -std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const std::vector<TString>& names) const { - auto columnIds = GetColumnIds(names); - if (columnIds.empty()) { - return {}; - } - return MakeArrowSchema(Columns, columnIds); -} - -std::shared_ptr<arrow::Field> TIndexInfo::ArrowColumnField(ui32 columnId) const { - auto it = ArrowColumnByColumnIdCache.find(columnId); - if (it == ArrowColumnByColumnIdCache.end()) { - it = ArrowColumnByColumnIdCache.emplace(columnId, ArrowSchema()->GetFieldByName(GetColumnName(columnId, true))).first; - } - return it->second; -} - -void TIndexInfo::SetAllKeys() { - /// @note Setting replace and sorting key to PK we are able to: - /// * apply REPLACE by MergeSort - /// * apply PK predicate before REPLACE - const auto& primaryKeyNames = NamesOnly(GetPrimaryKey()); - // Update set of required columns with names from primary key. - for (const auto& name: primaryKeyNames) { - RequiredColumns.insert(name); - } - - std::vector<std::shared_ptr<arrow::Field>> fields; - if (primaryKeyNames.size()) { - SortingKey = ArrowSchema(primaryKeyNames); - ReplaceKey = SortingKey; - fields = ReplaceKey->fields(); - if (CompositeIndexKey) { - IndexKey = ReplaceKey; - } else { - IndexKey = std::make_shared<arrow::Schema>(arrow::FieldVector({ fields[0] })); - } - } - - fields.push_back(arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64())); - fields.push_back(arrow::field(SPEC_COL_TX_ID, arrow::uint64())); - ExtendedKey = std::make_shared<arrow::Schema>(std::move(fields)); - - for (const auto& [colId, column] : Columns) { - if (NArrow::IsPrimitiveYqlType(column.PType)) { - MinMaxIdxColumnsIds.insert(colId); - } - } - MinMaxIdxColumnsIds.insert(GetPKFirstColumnId()); -} - -std::shared_ptr<NArrow::TSortDescription> TIndexInfo::SortDescription() const { - if (GetSortingKey()) { - auto key = GetExtendedKey(); // Sort with extended key, greater snapshot first - Y_VERIFY(key && key->num_fields() > 2); - auto description = std::make_shared<NArrow::TSortDescription>(key); - description->Directions[key->num_fields() - 1] = -1; - description->Directions[key->num_fields() - 2] = -1; - description->NotNull = true; // TODO - return description; - } - return {}; -} - -std::shared_ptr<NArrow::TSortDescription> TIndexInfo::SortReplaceDescription() const { - if (GetSortingKey()) { - auto key = GetExtendedKey(); // Sort with extended key, greater snapshot first - Y_VERIFY(key && key->num_fields() > 2); - auto description = std::make_shared<NArrow::TSortDescription>(key, GetReplaceKey()); - description->Directions[key->num_fields() - 1] = -1; - description->Directions[key->num_fields() - 2] = -1; - description->NotNull = true; // TODO - return description; - } - return {}; -} - -bool TIndexInfo::AllowTtlOverColumn(const TString& name) const { - auto it = ColumnNames.find(name); - if (it == ColumnNames.end()) { - return false; - } - return MinMaxIdxColumnsIds.contains(it->second); -} - -TColumnSaver TIndexInfo::GetColumnSaver(const ui32 columnId, const TSaverContext& context) const { - arrow::ipc::IpcWriteOptions options; - options.use_threads = false; - - NArrow::NTransformation::ITransformer::TPtr transformer; - std::unique_ptr<arrow::util::Codec> columnCodec; - { - auto it = ColumnFeatures.find(columnId); - if (it != ColumnFeatures.end()) { - transformer = it->second.GetSaveTransformer(); - columnCodec = it->second.GetCompressionCodec(); - } - } - - if (context.GetExternalCompression()) { - options.codec = context.GetExternalCompression()->BuildArrowCodec(); - } else if (columnCodec) { - options.codec = std::move(columnCodec); - } else if (DefaultCompression) { - options.codec = DefaultCompression->BuildArrowCodec(); - } else { - options.codec = NArrow::TCompression::BuildDefaultCodec(); - } - - if (!transformer) { - return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TBatchPayloadSerializer>(options)); - } else { - return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TFullDataSerializer>(options)); - } -} - -TColumnFeatures& TIndexInfo::GetOrCreateColumnFeatures(const ui32 columnId) const { - auto it = ColumnFeatures.find(columnId); - if (it == ColumnFeatures.end()) { - it = ColumnFeatures.emplace(columnId, TColumnFeatures(columnId)).first; - } - return it->second; -} - -std::shared_ptr<TColumnLoader> TIndexInfo::GetColumnLoader(const ui32 columnId) const { - TColumnFeatures& features = GetOrCreateColumnFeatures(columnId); - return features.GetLoader(*this); -} - -std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId) const { - std::shared_ptr<arrow::Schema> schema = Schema; - if (IsSpecialColumn(columnId)) { - schema = ArrowSchemaSnapshot(); - } - auto field = schema->GetFieldByName(GetColumnName(columnId)); - Y_VERIFY(field); - std::vector<std::shared_ptr<arrow::Field>> fields = { field }; - return std::make_shared<arrow::Schema>(fields); -} - -bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) { - if (schema.GetEngine() != NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", "incorrect_engine_in_schema"); - return false; - } - - for (const auto& col : schema.GetColumns()) { - const ui32 id = col.GetId(); - const TString& name = col.GetName(); - auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(), - col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr); - Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod); - ColumnNames[name] = id; - std::optional<TColumnFeatures> cFeatures = TColumnFeatures::BuildFromProto(col, id); - if (!cFeatures) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_column_feature"); - return false; - } - ColumnFeatures.emplace(id, *cFeatures); - } - - for (const auto& keyName : schema.GetKeyColumnNames()) { - Y_VERIFY(ColumnNames.contains(keyName)); - KeyColumns.push_back(ColumnNames[keyName]); - } - - if (schema.HasDefaultCompression()) { - auto result = NArrow::TCompression::BuildFromProto(schema.GetDefaultCompression()); - if (!result) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", result.GetErrorMessage()); - return false; - } - DefaultCompression = *result; - } - - CompositeMarks = schema.GetCompositeMarks(); - CompositeIndexKey = AppData()->FeatureFlags.GetForceColumnTablesCompositeMarks() ? true : CompositeMarks; - return true; -} - -bool TIndexInfo::CheckAlterScheme(const NKikimrSchemeOp::TColumnTableSchema& scheme) const { - return CompositeMarks == scheme.GetCompositeMarks(); -} - -std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials) { - std::vector<std::shared_ptr<arrow::Field>> fields; - fields.reserve(withSpecials ? ids.size() + 2 : ids.size()); - - if (withSpecials) { - // Place special fields at the beginning of the schema. - fields.push_back(arrow::field(TIndexInfo::SPEC_COL_PLAN_STEP, arrow::uint64())); - fields.push_back(arrow::field(TIndexInfo::SPEC_COL_TX_ID, arrow::uint64())); - } - - for (const ui32 id: ids) { - auto it = columns.find(id); - if (it == columns.end()) { - continue; - } - - const auto& column = it->second; - std::string colName(column.Name.data(), column.Name.size()); - fields.emplace_back(std::make_shared<arrow::Field>(colName, NArrow::GetArrowType(column.PType))); - } - - return std::make_shared<arrow::Schema>(std::move(fields)); -} - -std::vector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const std::vector<ui32>& ids) { - std::vector<std::pair<TString, NScheme::TTypeInfo>> out; - out.reserve(ids.size()); - for (const ui32 id : ids) { - const auto ci = tableSchema.Columns.find(id); - Y_VERIFY(ci != tableSchema.Columns.end()); - out.emplace_back(ci->second.Name, ci->second.PType); - } - return out; -} - -std::optional<TIndexInfo> TIndexInfo::BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) { - TIndexInfo result("", 0); - if (!result.DeserializeFromProto(schema)) { - return std::nullopt; - } - return result; -} - -} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 33f98892d1..382c410d92 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -1,202 +1,3 @@ #pragma once -#include "column_features.h" -#include "defs.h" -#include "scalars.h" -#include "tier_info.h" - -#include <ydb/core/sys_view/common/schema.h> -#include <ydb/core/tablet_flat/flat_dbase_scheme.h> -#include <ydb/core/formats/arrow/dictionary/object.h> -#include <ydb/core/formats/arrow/serializer/abstract.h> -#include <ydb/core/formats/arrow/transformer/abstract.h> -#include <ydb/core/scheme/scheme_types_proto.h> - -namespace arrow { - class Array; - class Field; - class Schema; -} - -namespace NKikimr::NArrow { - struct TSortDescription; -} - -namespace NKikimr::NOlap { - -struct TInsertedData; -class TSnapshotColumnInfo; -using TNameTypeInfo = std::pair<TString, NScheme::TTypeInfo>; - -/// Column engine index description in terms of tablet's local table. -/// We have to use YDB types for keys here. -struct TIndexInfo : public NTable::TScheme::TTableSchema { -private: - mutable THashMap<ui32, TColumnFeatures> ColumnFeatures; - mutable THashMap<ui32, std::shared_ptr<arrow::Field>> ArrowColumnByColumnIdCache; - TIndexInfo(const TString& name, ui32 id); - bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); - TColumnFeatures& GetOrCreateColumnFeatures(const ui32 columnId) const; -public: - static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step"; - static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id"; - static const TString STORE_INDEX_STATS_TABLE; - static const TString TABLE_INDEX_STATS_TABLE; - - enum class ESpecialColumn : ui32 { - PLAN_STEP = 0xffffff00, - TX_ID, - }; - - /// Appends the special columns to the batch. - static std::shared_ptr<arrow::RecordBatch> AddSpecialColumns( - const std::shared_ptr<arrow::RecordBatch>& batch, - const TSnapshot& snapshot); - - /// Makes schema as set of the special columns. - static std::shared_ptr<arrow::Schema> ArrowSchemaSnapshot(); - - /// Matches name of the filed with names of the special columns. - static bool IsSpecialColumn(const arrow::Field& field); - static bool IsSpecialColumn(const ui32 field); - static ui32 GetSpecialColumnByteWidth(const ui32 field) { - Y_VERIFY(IsSpecialColumn(field)); - return 8; - } - static bool IsSpecialColumn(const std::string& fieldName); - template <class TContainer> - static bool IsSpecialColumns(const TContainer& c) { - for (auto&& i : c) { - if (!IsSpecialColumn(i)) { - return false; - } - } - return true; - } - - bool CheckAlterScheme(const NKikimrSchemeOp::TColumnTableSchema& scheme) const; -public: - - static TIndexInfo BuildDefault() { - TIndexInfo result("dummy", 0); - return result; - } - - static std::optional<TIndexInfo> BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); - - /// Returns id of the index. - ui32 GetId() const noexcept { - return Id; - } - - std::shared_ptr<arrow::Schema> GetColumnSchema(const ui32 columnId) const; - TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const; - std::shared_ptr<TColumnLoader> GetColumnLoader(const ui32 columnId) const; - - /// Returns an id of the column located by name. The name should exists in the schema. - ui32 GetColumnId(const std::string& name) const; - std::optional<ui32> GetColumnIdOptional(const std::string& name) const; - - /// Returns a name of the column located by id. - TString GetColumnName(ui32 id, bool required = true) const; - - /// Returns names of columns defined by the specific ids. - std::vector<TString> GetColumnNames(const std::vector<ui32>& ids) const; - std::vector<ui32> GetColumnIds() const; - - /// Returns info of columns defined by specific ids. - std::vector<TNameTypeInfo> GetColumns(const std::vector<ui32>& ids) const; - - /// Traditional Primary Key (includes uniqueness, search and sorting logic) - std::vector<TNameTypeInfo> GetPrimaryKey() const { - return GetColumns(KeyColumns); - } - - /// Returns id of the first column of the primary key. - ui32 GetPKFirstColumnId() const { - Y_VERIFY(KeyColumns.size()); - return KeyColumns[0]; - } - - // Sorting key: could be less or greater then traditional PK - // It could be empty for append-only tables. It could be greater then PK for better columns compression. - // If sorting key includes uniqueness key as a prefix we are able to use MergeSort for REPLACE. - const std::shared_ptr<arrow::Schema>& GetSortingKey() const { return SortingKey; } - const std::shared_ptr<arrow::Schema>& GetReplaceKey() const { return ReplaceKey; } - const std::shared_ptr<arrow::Schema>& GetExtendedKey() const { return ExtendedKey; } - const std::shared_ptr<arrow::Schema>& GetIndexKey() const { return IndexKey; } - - /// Initializes sorting, replace, index and extended keys. - void SetAllKeys(); - - void CheckTtlColumn(const TString& ttlColumn) const { - Y_VERIFY(!ttlColumn.empty()); - Y_VERIFY(MinMaxIdxColumnsIds.contains(GetColumnId(ttlColumn))); - } - - std::vector<ui32> GetColumnIds(const std::vector<TString>& columnNames) const; - - std::shared_ptr<arrow::Schema> ArrowSchema() const; - std::shared_ptr<arrow::Schema> ArrowSchemaWithSpecials() const; - std::shared_ptr<arrow::Schema> AddColumns(const std::shared_ptr<arrow::Schema>& schema, - const std::vector<TString>& columns) const; - - std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<ui32>& columnIds, bool withSpecials = false) const; - std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<TString>& columnNames) const; - std::shared_ptr<arrow::Field> ArrowColumnField(ui32 columnId) const; - - const THashSet<TString>& GetRequiredColumns() const { - return RequiredColumns; - } - - const THashSet<ui32>& GetMinMaxIdxColumns() const { - return MinMaxIdxColumnsIds; - } - - bool AllowTtlOverColumn(const TString& name) const; - - /// Returns whether the sorting keys defined. - bool IsSorted() const { return SortingKey.get(); } - - /// Returns whether the replace keys defined. - bool IsReplacing() const { return ReplaceKey.get(); } - - bool IsCompositeIndexKey() const { - return CompositeIndexKey; - } - - std::shared_ptr<NArrow::TSortDescription> SortDescription() const; - std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription() const; - - static const std::vector<std::string>& GetSpecialColumnNames() { - static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) }; - return result; - } - - static const std::vector<ui32>& GetSpecialColumnIds() { - static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID }; - return result; - } - -private: - ui32 Id; - TString Name; - bool CompositeIndexKey = false; - mutable std::shared_ptr<arrow::Schema> Schema; - mutable std::shared_ptr<arrow::Schema> SchemaWithSpecials; - std::shared_ptr<arrow::Schema> SortingKey; - std::shared_ptr<arrow::Schema> ReplaceKey; - std::shared_ptr<arrow::Schema> ExtendedKey; // Extend PK with snapshot columns to allow old shapshot reads - std::shared_ptr<arrow::Schema> IndexKey; - THashSet<TString> RequiredColumns; - THashSet<ui32> MinMaxIdxColumnsIds; - std::optional<NArrow::TCompression> DefaultCompression; - bool CompositeMarks = false; -}; - -std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials = false); - -/// Extracts columns with the specific ids from the schema. -std::vector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const std::vector<ui32>& ids); - -} // namespace NKikimr::NOlap +#include "scheme/index_info.h" diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp index b20a716f9e..9b11963e99 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portion_info.cpp @@ -1,262 +1,5 @@ #include "portion_info.h" -#include <ydb/core/protos/tx_columnshard.pb.h> -#include <ydb/core/formats/arrow/arrow_filter.h> namespace NKikimr::NOlap { -TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array, - const std::shared_ptr<arrow::Field>& field, - const TColumnSaver saver) { - auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{ field }); - auto batch = arrow::RecordBatch::Make(schema, array->length(), { array }); - Y_VERIFY(batch); - - return saver.Apply(batch); -} - -void TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record) { - record.Chunk = 0; - Records.emplace_back(std::move(record)); -} - -void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) { - Y_VERIFY(column->length()); - - std::pair<int, int> minMaxPos = {0, (column->length() - 1)}; - if (!sorted) { - minMaxPos = NArrow::FindMinMaxPosition(column); - } - - Y_VERIFY(minMaxPos.first >= 0); - Y_VERIFY(minMaxPos.second >= 0); - - Meta.ColumnMeta[columnId].Min = NArrow::GetScalar(column, minMaxPos.first); - Meta.ColumnMeta[columnId].Max = NArrow::GetScalar(column, minMaxPos.second); -} - -void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, - const TString& tierName) { - const auto& indexInfo = snapshotSchema.GetIndexInfo(); - const auto& minMaxColumns = indexInfo.GetMinMaxIdxColumns(); - - TierName = tierName; - Meta = {}; - Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId(); - - // Copy first and last key rows into new batch to free source batch's memory - { - auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey()); - std::vector<bool> bits(batch->num_rows(), false); - bits[0] = true; - bits[batch->num_rows() - 1] = true; // it colud be 0 if batch has one row - - auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows()); - auto res = arrow::compute::Filter(keyBatch, filter); - Y_VERIFY(res.ok()); - - Meta.ReplaceKeyEdges = res->record_batch(); - Y_VERIFY(Meta.ReplaceKeyEdges->num_rows() == 1 || Meta.ReplaceKeyEdges->num_rows() == 2); - } - - auto edgesBatch = NArrow::ExtractColumns(Meta.ReplaceKeyEdges, indexInfo.GetIndexKey()); - Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0); - Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1); - - /// @note It does not add RawBytes info for snapshot columns, only for user ones. - for (auto& [columnId, col] : indexInfo.Columns) { - const auto& columnName = col.Name; - auto column = batch->GetColumnByName(col.Name); - Y_VERIFY(column); - Meta.ColumnMeta[columnId].NumRows = column->length(); - Meta.ColumnMeta[columnId].RawBytes = NArrow::GetArrayDataSize(column); - - if (minMaxColumns.contains(columnId)) { - auto column = batch->GetColumnByName(columnName); - Y_VERIFY(column); - - bool isSorted = (columnId == Meta.FirstPkColumn); - AddMinMax(columnId, column, isSorted); - Y_VERIFY(Meta.HasMinMax(columnId)); - } - } -} - -TString TPortionInfo::GetMetadata(const TColumnRecord& rec) const { - NKikimrTxColumnShard::TIndexColumnMeta meta; // TODO: move proto serialization out of engines folder - if (Meta.ColumnMeta.contains(rec.ColumnId)) { - const auto& columnMeta = Meta.ColumnMeta.find(rec.ColumnId)->second; - if (auto numRows = columnMeta.NumRows) { - meta.SetNumRows(numRows); - } - if (auto rawBytes = columnMeta.RawBytes) { - meta.SetRawBytes(rawBytes); - } - if (columnMeta.HasMinMax()) { - ScalarToConstant(*columnMeta.Min, *meta.MutableMinValue()); - ScalarToConstant(*columnMeta.Max, *meta.MutableMaxValue()); - } - } - - if (rec.ColumnId == Meta.FirstPkColumn) { - auto* portionMeta = meta.MutablePortionMeta(); - - switch (Meta.Produced) { - case TPortionMeta::UNSPECIFIED: - Y_VERIFY(false); - case TPortionMeta::INSERTED: - portionMeta->SetIsInserted(true); - break; - case TPortionMeta::COMPACTED: - portionMeta->SetIsCompacted(true); - break; - case TPortionMeta::SPLIT_COMPACTED: - portionMeta->SetIsSplitCompacted(true); - break; - case TPortionMeta::EVICTED: - portionMeta->SetIsEvicted(true); - break; - case TPortionMeta::INACTIVE: - Y_FAIL("Unexpected inactive case"); - //portionMeta->SetInactive(true); - break; - } - - if (!TierName.empty()) { - portionMeta->SetTierName(TierName); - } - - if (const auto& keyEdgesBatch = Meta.ReplaceKeyEdges) { - Y_VERIFY(keyEdgesBatch); - Y_VERIFY_DEBUG(keyEdgesBatch->ValidateFull().ok()); - Y_VERIFY(keyEdgesBatch->num_rows() == 1 || keyEdgesBatch->num_rows() == 2); - portionMeta->SetPrimaryKeyBorders(NArrow::SerializeBatchNoCompression(keyEdgesBatch)); - } - } - - TString out; - Y_PROTOBUF_SUPPRESS_NODISCARD meta.SerializeToString(&out); - return out; -} - -void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec) { - if (rec.Metadata.empty()) { - return; - } - - NKikimrTxColumnShard::TIndexColumnMeta meta; - bool ok = meta.ParseFromString(rec.Metadata); - Y_VERIFY(ok); - - Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId(); - auto field = indexInfo.ArrowColumnField(rec.ColumnId); - const bool compositeIndexKey = indexInfo.IsCompositeIndexKey(); - - if (meta.HasPortionMeta()) { - Y_VERIFY_DEBUG(rec.ColumnId == Meta.FirstPkColumn); - - auto& portionMeta = meta.GetPortionMeta(); - TierName = portionMeta.GetTierName(); - - if (portionMeta.GetIsInserted()) { - Meta.Produced = TPortionMeta::INSERTED; - } else if (portionMeta.GetIsCompacted()) { - Meta.Produced = TPortionMeta::COMPACTED; - } else if (portionMeta.GetIsSplitCompacted()) { - Meta.Produced = TPortionMeta::SPLIT_COMPACTED; - } else if (portionMeta.GetIsEvicted()) { - Meta.Produced = TPortionMeta::EVICTED; - } - - if (portionMeta.HasPrimaryKeyBorders()) { - Meta.ReplaceKeyEdges = NArrow::DeserializeBatch(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey()); - Y_VERIFY(Meta.ReplaceKeyEdges); - Y_VERIFY_DEBUG(Meta.ReplaceKeyEdges->ValidateFull().ok()); - Y_VERIFY(Meta.ReplaceKeyEdges->num_rows() == 1 || Meta.ReplaceKeyEdges->num_rows() == 2); - - if (compositeIndexKey) { - auto edgesBatch = NArrow::ExtractColumns(Meta.ReplaceKeyEdges, indexInfo.GetIndexKey()); - Y_VERIFY(edgesBatch); - Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0); - Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1); - } - } - } - if (meta.HasNumRows()) { - Meta.ColumnMeta[rec.ColumnId].NumRows = meta.GetNumRows(); - } - if (meta.HasRawBytes()) { - Meta.ColumnMeta[rec.ColumnId].RawBytes = meta.GetRawBytes(); - } - if (meta.HasMinValue()) { - auto scalar = ConstantToScalar(meta.GetMinValue(), field->type()); - Meta.ColumnMeta[rec.ColumnId].Min = scalar; - - // Restore Meta.IndexKeyStart for one column IndexKey - if (!compositeIndexKey && rec.ColumnId == Meta.FirstPkColumn) { - Meta.IndexKeyStart = NArrow::TReplaceKey::FromScalar(scalar); - } - } - if (meta.HasMaxValue()) { - auto scalar = ConstantToScalar(meta.GetMaxValue(), field->type()); - Meta.ColumnMeta[rec.ColumnId].Max = scalar; - - // Restore Meta.IndexKeyEnd for one column IndexKey - if (!compositeIndexKey && rec.ColumnId == Meta.FirstPkColumn) { - Meta.IndexKeyEnd = NArrow::TReplaceKey::FromScalar(scalar); - } - } - - // Portion genarated without PrimaryKeyBorders and loaded with indexInfo.IsCompositeIndexKey() - // We should have no such portions for ForceColumnTablesCompositeMarks feature - if (rec.ColumnId == Meta.FirstPkColumn) { - Y_VERIFY(Meta.IndexKeyStart && Meta.IndexKeyEnd); - } -} - -std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const { - if (!Meta.ColumnMeta.contains(columnId)) { - return {}; - } - return Meta.ColumnMeta.find(columnId)->second.Min; -} - -std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const { - if (!Meta.ColumnMeta.contains(columnId)) { - return {}; - } - return Meta.ColumnMeta.find(columnId)->second.Max; -} - -std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const { - Y_VERIFY(!Blobs.empty()); - - std::vector<std::shared_ptr<arrow::RecordBatch>> batches; - batches.reserve(Blobs.size()); - for (auto& blob : Blobs) { - batches.push_back(blob.BuildRecordBatch(*Loader)); - Y_VERIFY(batches.back()); - } - - auto res = arrow::Table::FromRecordBatches(batches); - Y_VERIFY_S(res.ok(), res.status().message()); - return (*res)->column(0); -} - -std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const { - std::vector<std::shared_ptr<arrow::ChunkedArray>> columns; - std::vector< std::shared_ptr<arrow::Field>> fields; - for (auto&& i : Columns) { - if (!options.IsAcceptedColumn(i.GetColumnId())) { - continue; - } - columns.emplace_back(i.Assemble()); - fields.emplace_back(i.GetField()); - } - - auto table = arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns); - auto res = table->CombineChunks(); - Y_VERIFY(res.ok()); - return NArrow::ToBatch(*res); -} - } diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index 12abed9b25..673e4f6c0b 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -1,520 +1,7 @@ #pragma once -#include "defs.h" -#include "columns_table.h" -#include "index_info.h" - -#include <ydb/core/formats/arrow/replace_key.h> -#include <ydb/core/formats/arrow/serializer/abstract.h> -#include <ydb/core/formats/arrow/dictionary/conversion.h> -#include <ydb/core/tx/columnshard/counters/indexation.h> -#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> +#include "portions/portion_info.h" namespace NKikimr::NOlap { -struct TPortionMeta { - // NOTE: These values are persisted in LocalDB so they must be stable - enum EProduced : ui32 { - UNSPECIFIED = 0, - INSERTED = 1, - COMPACTED = 2, - SPLIT_COMPACTED = 3, - INACTIVE = 4, - EVICTED = 5, - }; - - struct TColumnMeta { - ui32 NumRows{0}; - ui32 RawBytes{0}; - std::shared_ptr<arrow::Scalar> Min; - std::shared_ptr<arrow::Scalar> Max; - - bool HasMinMax() const noexcept { - return Min.get() && Max.get(); - } - }; - - EProduced Produced{UNSPECIFIED}; - THashMap<ui32, TColumnMeta> ColumnMeta; - ui32 FirstPkColumn = 0; - std::shared_ptr<arrow::RecordBatch> ReplaceKeyEdges; // first and last PK rows - std::optional<NArrow::TReplaceKey> IndexKeyStart; - std::optional<NArrow::TReplaceKey> IndexKeyEnd; - - bool HasMinMax(ui32 columnId) const { - if (!ColumnMeta.contains(columnId)) { - return false; - } - return ColumnMeta.find(columnId)->second.HasMinMax(); - } - - bool HasPkMinMax() const { - return HasMinMax(FirstPkColumn); - } - - ui32 NumRows() const { - if (FirstPkColumn) { - Y_VERIFY(ColumnMeta.contains(FirstPkColumn)); - return ColumnMeta.find(FirstPkColumn)->second.NumRows; - } - return 0; - } - - friend IOutputStream& operator << (IOutputStream& out, const TPortionMeta& info) { - out << "reason" << (ui32)info.Produced; - for (const auto& [_, meta] : info.ColumnMeta) { - if (meta.NumRows) { - out << " " << meta.NumRows << " rows"; - break; - } - } - return out; - } -}; - -class TPortionAddress { -private: - YDB_READONLY(ui64, GranuleId, 0); - YDB_READONLY(ui64, PortionId, 0); -public: - TPortionAddress(const ui64 granuleId, const ui64 portionId) - : GranuleId(granuleId) - , PortionId(portionId) - { - - } - - bool operator<(const TPortionAddress& item) const { - return std::tie(GranuleId, PortionId) < std::tie(item.GranuleId, item.PortionId); - } - - bool operator==(const TPortionAddress& item) const { - return std::tie(GranuleId, PortionId) == std::tie(item.GranuleId, item.PortionId); - } -}; - -struct TPortionInfo { - static constexpr const ui32 BLOB_BYTES_LIMIT = 8 * 1024 * 1024; - - std::vector<TColumnRecord> Records; - TPortionMeta Meta; - TString TierName; - - bool Empty() const { return Records.empty(); } - bool Produced() const { return Meta.Produced != TPortionMeta::UNSPECIFIED; } - bool Valid() const { return !Empty() && Produced() && Meta.HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; } - bool IsInserted() const { return Meta.Produced == TPortionMeta::INSERTED; } - bool IsEvicted() const { return Meta.Produced == TPortionMeta::EVICTED; } - bool CanHaveDups() const { return !Produced(); /* || IsInserted(); */ } - bool CanIntersectOthers() const { return !Valid() || IsInserted() || IsEvicted(); } - size_t NumRecords() const { return Records.size(); } - - bool CheckForCleanup(const TSnapshot& snapshot) const { - if (!CheckForCleanup()) { - return false; - } - - return GetXSnapshot() < snapshot; - } - - bool CheckForCleanup() const { - return !IsActive(); - } - - bool AllowEarlyFilter() const { - return Meta.Produced == TPortionMeta::COMPACTED - || Meta.Produced == TPortionMeta::SPLIT_COMPACTED; - } - - bool EvictReady(size_t hotSize) const { - return Meta.Produced == TPortionMeta::COMPACTED - || Meta.Produced == TPortionMeta::SPLIT_COMPACTED - || Meta.Produced == TPortionMeta::EVICTED - || (Meta.Produced == TPortionMeta::INSERTED && BlobsSizes().first >= hotSize); - } - - ui64 Portion() const { - Y_VERIFY(!Empty()); - auto& rec = Records[0]; - return rec.Portion; - } - - ui64 Granule() const { - Y_VERIFY(!Empty()); - auto& rec = Records[0]; - return rec.Granule; - } - - TPortionAddress GetAddress() const { - Y_VERIFY(!Empty()); - auto& rec = Records[0]; - return TPortionAddress(rec.Granule, rec.Portion); - } - - void SetGranule(ui64 granule) { - for (auto& rec : Records) { - rec.Granule = granule; - } - } - - TSnapshot GetSnapshot() const { - Y_VERIFY(!Empty()); - auto& rec = Records[0]; - return TSnapshot(rec.PlanStep, rec.TxId); - } - - TSnapshot GetXSnapshot() const { - Y_VERIFY(!Empty()); - auto& rec = Records[0]; - return TSnapshot(rec.XPlanStep, rec.XTxId); - } - - bool IsActive() const { - return GetXSnapshot().IsZero(); - } - - std::pair<ui32, ui32> BlobsSizes() const { - ui32 sum = 0; - ui32 max = 0; - for (const auto& rec : Records) { - sum += rec.BlobRange.Size; - max = Max(max, rec.BlobRange.Size); - } - return {sum, max}; - } - - ui64 BlobsBytes() const noexcept { - ui64 sum = 0; - for (const auto& rec : Records) { - sum += rec.BlobRange.Size; - } - return sum; - } - - void UpdateRecords(ui64 portion, const THashMap<ui64, ui64>& granuleRemap) { - for (auto& rec : Records) { - rec.Portion = portion; - } - if (!granuleRemap.empty()) { - for (auto& rec : Records) { - Y_VERIFY(granuleRemap.contains(rec.Granule)); - rec.Granule = granuleRemap.find(rec.Granule)->second; - } - } - } - - void UpdateRecordsMeta(TPortionMeta::EProduced produced) { - Meta.Produced = produced; - for (auto& record : Records) { - record.Metadata = GetMetadata(record); - } - } - - void SetStale(const TSnapshot& snapshot) { - for (auto& rec : Records) { - rec.SetXSnapshot(snapshot); - } - } - - void AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec) { - Records.push_back(rec); - LoadMetadata(indexInfo, rec); - } - - TString GetMetadata(const TColumnRecord& rec) const; - void LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec); - void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, - const TString& tierName); - void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted); - - std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const; - std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const; - - const NArrow::TReplaceKey& IndexKeyStart() const { - Y_VERIFY(Meta.IndexKeyStart); - return *Meta.IndexKeyStart; - } - - const NArrow::TReplaceKey& IndexKeyEnd() const { - Y_VERIFY(Meta.IndexKeyEnd); - return *Meta.IndexKeyEnd; - } - - ui32 NumRows() const { - return Meta.NumRows(); - } - - ui64 GetRawBytes(const std::vector<ui32>& columnIds) const { - ui64 sum = 0; - const ui32 numRows = NumRows(); - for (auto&& i : columnIds) { - if (TIndexInfo::IsSpecialColumn(i)) { - sum += numRows * TIndexInfo::GetSpecialColumnByteWidth(i); - } else { - auto it = Meta.ColumnMeta.find(i); - if (it != Meta.ColumnMeta.end()) { - sum += it->second.RawBytes; - } - } - } - return sum; - } - - ui64 RawBytesSum() const { - ui64 sum = 0; - for (auto& [columnId, colMeta] : Meta.ColumnMeta) { - sum += colMeta.RawBytes; - } - return sum; - } - -private: - class TMinGetter { - public: - static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) { - return portionInfo.MinValue(columnId); - } - }; - - class TMaxGetter { - public: - static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) { - return portionInfo.MaxValue(columnId); - } - }; - - template <class TSelfGetter, class TItemGetter = TSelfGetter> - int CompareByColumnIdsImpl(const TPortionInfo& item, const std::vector<ui32>& columnIds) const { - for (auto&& i : columnIds) { - std::shared_ptr<arrow::Scalar> valueSelf = TSelfGetter::Get(*this, i); - std::shared_ptr<arrow::Scalar> valueItem = TItemGetter::Get(item, i); - if (!!valueSelf && !!valueItem) { - const int cmpResult = NArrow::ScalarCompare(valueSelf, valueItem); - if (cmpResult) { - return cmpResult; - } - } else if (!!valueSelf) { - return 1; - } else if (!!valueItem) { - return -1; - } - } - return 0; - } -public: - int CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const { - return CompareByColumnIdsImpl<TMaxGetter, TMinGetter>(item, info.KeyColumns); - } - - int CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const { - return CompareMinByColumnIds(item, info.KeyColumns); - } - - int CompareMinByColumnIds(const TPortionInfo& item, const std::vector<ui32>& columnIds) const { - return CompareByColumnIdsImpl<TMinGetter>(item, columnIds); - } - - class TAssembleBlobInfo { - private: - ui32 NullRowsCount = 0; - TString Data; - public: - TAssembleBlobInfo(const ui32 rowsCount) - : NullRowsCount(rowsCount) { - - } - - TAssembleBlobInfo(const TString& data) - : Data(data) { - - } - - ui32 GetNullRowsCount() const noexcept { - return NullRowsCount; - } - - const TString& GetData() const noexcept { - return Data; - } - - std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(const TColumnLoader& loader) const { - if (NullRowsCount) { - Y_VERIFY(!Data); - return NArrow::MakeEmptyBatch(loader.GetExpectedSchema(), NullRowsCount); - } else { - auto result = loader.Apply(Data); - if (!result.ok()) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "cannot unpack batch")("error", result.status().ToString())("loader", loader.DebugString()); - return nullptr; - } - return *result; - } - } - }; - - class TPreparedColumn { - private: - std::shared_ptr<TColumnLoader> Loader; - std::vector<TAssembleBlobInfo> Blobs; - public: - ui32 GetColumnId() const { - return Loader->GetColumnId(); - } - - const std::string& GetName() const { - return Loader->GetExpectedSchema()->field(0)->name(); - } - - std::shared_ptr<arrow::Field> GetField() const { - return Loader->GetExpectedSchema()->field(0); - } - - TPreparedColumn(std::vector<TAssembleBlobInfo>&& blobs, const std::shared_ptr<TColumnLoader>& loader) - : Loader(loader) - , Blobs(std::move(blobs)) - { - Y_VERIFY(Loader); - Y_VERIFY(Loader->GetExpectedSchema()->num_fields() == 1); - } - - std::shared_ptr<arrow::ChunkedArray> Assemble() const; - }; - - class TPreparedBatchData { - private: - std::vector<TPreparedColumn> Columns; - std::shared_ptr<arrow::Schema> Schema; - size_t RowsCount = 0; - - public: - struct TAssembleOptions { - std::optional<std::set<ui32>> IncludedColumnIds; - std::optional<std::set<ui32>> ExcludedColumnIds; - - bool IsAcceptedColumn(const ui32 columnId) const { - if (IncludedColumnIds && !IncludedColumnIds->contains(columnId)) { - return false; - } - if (ExcludedColumnIds && ExcludedColumnIds->contains(columnId)) { - return false; - } - return true; - } - }; - - std::vector<std::string> GetSchemaColumnNames() const { - return Schema->field_names(); - } - - size_t GetColumnsCount() const { - return Columns.size(); - } - - size_t GetRowsCount() const { - return RowsCount; - } - - TPreparedBatchData(std::vector<TPreparedColumn>&& columns, std::shared_ptr<arrow::Schema> schema, const size_t rowsCount) - : Columns(std::move(columns)) - , Schema(schema) - , RowsCount(rowsCount) - { - } - - std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = {}) const; - }; - - template <class TExternalBlobInfo> - TPreparedBatchData PrepareForAssemble(const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema, - const THashMap<TBlobRange, TExternalBlobInfo>& blobsData) const { - std::vector<TPreparedColumn> columns; - columns.reserve(resultSchema.GetSchema()->num_fields()); - - Y_VERIFY(!Meta.ColumnMeta.empty()); - const ui32 rowsCount = Meta.ColumnMeta.begin()->second.NumRows; - for (auto&& field : resultSchema.GetSchema()->fields()) { - columns.emplace_back(TPreparedColumn({ TAssembleBlobInfo(rowsCount) }, resultSchema.GetColumnLoader(field->name()))); - } - - TMap<size_t, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks - TMap<size_t, size_t> positionsMap; - - for (auto& rec : Records) { - auto resulPos = resultSchema.GetFieldIndex(rec.ColumnId); - if (resulPos < 0) { - continue; - } - auto pos = dataSchema.GetFieldIndex(rec.ColumnId); - Y_ASSERT(pos >= 0); - positionsMap[resulPos] = pos; - columnChunks[resulPos][rec.Chunk] = rec.BlobRange; - auto columnMeta = Meta.ColumnMeta.FindPtr(rec.ColumnId); - if (columnMeta) { - Y_VERIFY_S(rowsCount == columnMeta->NumRows, TStringBuilder() << "Inconsistent rows " << rowsCount << "/" << columnMeta->NumRows); - } - } - - // Make chunked arrays for columns - for (auto& [pos, orderedChunks] : columnChunks) { - Y_VERIFY(positionsMap.contains(pos)); - size_t dataPos = positionsMap[pos]; - auto portionField = dataSchema.GetFieldByIndex(dataPos); - auto resultField = resultSchema.GetFieldByIndex(pos); - - Y_VERIFY(portionField->IsCompatibleWith(*resultField)); - - std::vector<TAssembleBlobInfo> blobs; - blobs.reserve(orderedChunks.size()); - ui32 expected = 0; - for (auto& [chunk, blobRange] : orderedChunks) { - Y_VERIFY(chunk == expected); - ++expected; - - auto it = blobsData.find(blobRange); - Y_VERIFY(it != blobsData.end()); - blobs.emplace_back(it->second); - } - - Y_VERIFY(pos < columns.size()); - columns[pos] = TPreparedColumn(std::move(blobs), dataSchema.GetColumnLoader(resultField->name())); - } - - return TPreparedBatchData(std::move(columns), resultSchema.GetSchema(), rowsCount); - } - - std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const ISnapshotSchema& dataSchema, - const ISnapshotSchema& resultSchema, - const THashMap<TBlobRange, TString>& data) const { - auto batch = PrepareForAssemble(dataSchema, resultSchema, data).Assemble(); - Y_VERIFY(batch->Validate().ok()); - return batch; - } - - static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array, - const std::shared_ptr<arrow::Field>& field, - const TColumnSaver saver); - - void AppendOneChunkColumn(TColumnRecord&& record); - - friend IOutputStream& operator << (IOutputStream& out, const TPortionInfo& info) { - for (auto& rec : info.Records) { - out << " " << rec; - out << " (1 of " << info.Records.size() << " blobs shown)"; - break; - } - out << ";activity=" << info.IsActive() << ";"; - if (!info.TierName.empty()) { - out << " tier: " << info.TierName; - } - out << " " << info.Meta; - return out; - } -}; - -/// Ensure that TPortionInfo can be effectively assigned by moving the value. -static_assert(std::is_nothrow_move_assignable<TPortionInfo>::value); - -/// Ensure that TPortionInfo can be effectively constructed by moving the value. -static_assert(std::is_nothrow_move_constructible<TPortionInfo>::value); - } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..914aa5278b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,31 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) + +add_library(columnshard-engines-portions) +target_link_libraries(columnshard-engines-portions PUBLIC + contrib-libs-cxxsupp + yutil + columnshard-engines-scheme + tools-enum_parser-enum_serialization_runtime +) +target_sources(columnshard-engines-portions PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp +) +generate_enum_serilization(columnshard-engines-portions + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h + INCLUDE_HEADERS + ydb/core/tx/columnshard/engines/portions/portion_info.h +) diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..86efcb228b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt @@ -0,0 +1,32 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) + +add_library(columnshard-engines-portions) +target_link_libraries(columnshard-engines-portions PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + columnshard-engines-scheme + tools-enum_parser-enum_serialization_runtime +) +target_sources(columnshard-engines-portions PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp +) +generate_enum_serilization(columnshard-engines-portions + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h + INCLUDE_HEADERS + ydb/core/tx/columnshard/engines/portions/portion_info.h +) diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..86efcb228b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt @@ -0,0 +1,32 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) + +add_library(columnshard-engines-portions) +target_link_libraries(columnshard-engines-portions PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + columnshard-engines-scheme + tools-enum_parser-enum_serialization_runtime +) +target_sources(columnshard-engines-portions PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp +) +generate_enum_serilization(columnshard-engines-portions + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h + INCLUDE_HEADERS + ydb/core/tx/columnshard/engines/portions/portion_info.h +) diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..914aa5278b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt @@ -0,0 +1,31 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) + +add_library(columnshard-engines-portions) +target_link_libraries(columnshard-engines-portions PUBLIC + contrib-libs-cxxsupp + yutil + columnshard-engines-scheme + tools-enum_parser-enum_serialization_runtime +) +target_sources(columnshard-engines-portions PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp +) +generate_enum_serilization(columnshard-engines-portions + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h + INCLUDE_HEADERS + ydb/core/tx/columnshard/engines/portions/portion_info.h +) diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.cpp b/ydb/core/tx/columnshard/engines/portions/column_record.cpp new file mode 100644 index 0000000000..621928056a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/column_record.cpp @@ -0,0 +1,5 @@ +#include "column_record.h" + +namespace NKikimr::NOlap { + +} diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h new file mode 100644 index 0000000000..cc66e266f0 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/column_record.h @@ -0,0 +1,91 @@ +#pragma once + +#include <ydb/core/tx/columnshard/blob.h> +#include <ydb/core/tx/columnshard/common/snapshot.h> + +namespace NKikimr::NOlap { + +struct TColumnRecord { + ui64 Granule; + ui64 PlanStep; // {PlanStep, TxId} is min snapshot for {Granule, Portion} + ui64 TxId; + ui64 Portion; // Id of independent (overlayed by PK) portion of data in granule + ui64 XPlanStep{0}; // {XPlanStep, XTxId} is snapshot where the blob has been removed (i.e. compacted into another one) + ui64 XTxId{0}; + ui32 ColumnId{0}; + ui16 Chunk; // Number of blob for column ColumnName in Portion + TBlobRange BlobRange; + TString Metadata; + + std::optional<ui32> GetChunkRowsCount() const { + return {}; + } + + bool operator == (const TColumnRecord& rec) const { + return (Granule == rec.Granule) && (ColumnId == rec.ColumnId) && + (PlanStep == rec.PlanStep) && (TxId == rec.TxId) && (Portion == rec.Portion) && (Chunk == rec.Chunk); + } + + TString SerializedBlobId() const { + return BlobRange.BlobId.SerializeBinary(); + } + + bool Valid() const { + return ValidExceptSnapshot() && ValidSnapshot(); + } + + bool ValidSnapshot() const { + return PlanStep && TxId; + } + + bool ValidExceptSnapshot() const { + return Granule && ColumnId && Portion && ValidBlob(); + } + + bool ValidBlob() const { + return BlobRange.BlobId.IsValid() && BlobRange.Size; + } + + void SetSnapshot(const TSnapshot& snap) { + Y_VERIFY(snap.Valid()); + PlanStep = snap.GetPlanStep(); + TxId = snap.GetTxId(); + } + + void SetXSnapshot(const TSnapshot& snap) { + Y_VERIFY(snap.Valid()); + XPlanStep = snap.GetPlanStep(); + XTxId = snap.GetTxId(); + } + + static TColumnRecord Make(ui64 granule, ui32 columnId, const TSnapshot& minSnapshot, ui64 portion, ui16 chunk = 0) { + TColumnRecord row; + row.Granule = granule; + row.PlanStep = minSnapshot.GetPlanStep(); + row.TxId = minSnapshot.GetTxId(); + row.Portion = portion; + row.ColumnId = columnId; + row.Chunk = chunk; + //row.BlobId + //row.Metadata + return row; + } + + friend IOutputStream& operator << (IOutputStream& out, const TColumnRecord& rec) { + out << '{'; + out << 'g' << rec.Granule << 'p' << rec.Portion; + if (rec.Chunk) { + out << 'n' << rec.Chunk; + } + out << ',' << (i32)rec.ColumnId; + out << ',' << rec.PlanStep << ':' << (rec.TxId == Max<ui64>() ? "max" : ToString(rec.TxId)); + if (rec.XPlanStep) { + out << '-' << rec.XPlanStep << ':' << (rec.XTxId == Max<ui64>() ? "max" : ToString(rec.XTxId)); + } + out << ',' << rec.BlobRange.ToString(); + out << '}'; + return out; + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp new file mode 100644 index 0000000000..b20a716f9e --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -0,0 +1,262 @@ +#include "portion_info.h" +#include <ydb/core/protos/tx_columnshard.pb.h> +#include <ydb/core/formats/arrow/arrow_filter.h> + +namespace NKikimr::NOlap { + +TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array, + const std::shared_ptr<arrow::Field>& field, + const TColumnSaver saver) { + auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{ field }); + auto batch = arrow::RecordBatch::Make(schema, array->length(), { array }); + Y_VERIFY(batch); + + return saver.Apply(batch); +} + +void TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record) { + record.Chunk = 0; + Records.emplace_back(std::move(record)); +} + +void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) { + Y_VERIFY(column->length()); + + std::pair<int, int> minMaxPos = {0, (column->length() - 1)}; + if (!sorted) { + minMaxPos = NArrow::FindMinMaxPosition(column); + } + + Y_VERIFY(minMaxPos.first >= 0); + Y_VERIFY(minMaxPos.second >= 0); + + Meta.ColumnMeta[columnId].Min = NArrow::GetScalar(column, minMaxPos.first); + Meta.ColumnMeta[columnId].Max = NArrow::GetScalar(column, minMaxPos.second); +} + +void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, + const TString& tierName) { + const auto& indexInfo = snapshotSchema.GetIndexInfo(); + const auto& minMaxColumns = indexInfo.GetMinMaxIdxColumns(); + + TierName = tierName; + Meta = {}; + Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId(); + + // Copy first and last key rows into new batch to free source batch's memory + { + auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey()); + std::vector<bool> bits(batch->num_rows(), false); + bits[0] = true; + bits[batch->num_rows() - 1] = true; // it colud be 0 if batch has one row + + auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows()); + auto res = arrow::compute::Filter(keyBatch, filter); + Y_VERIFY(res.ok()); + + Meta.ReplaceKeyEdges = res->record_batch(); + Y_VERIFY(Meta.ReplaceKeyEdges->num_rows() == 1 || Meta.ReplaceKeyEdges->num_rows() == 2); + } + + auto edgesBatch = NArrow::ExtractColumns(Meta.ReplaceKeyEdges, indexInfo.GetIndexKey()); + Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0); + Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1); + + /// @note It does not add RawBytes info for snapshot columns, only for user ones. + for (auto& [columnId, col] : indexInfo.Columns) { + const auto& columnName = col.Name; + auto column = batch->GetColumnByName(col.Name); + Y_VERIFY(column); + Meta.ColumnMeta[columnId].NumRows = column->length(); + Meta.ColumnMeta[columnId].RawBytes = NArrow::GetArrayDataSize(column); + + if (minMaxColumns.contains(columnId)) { + auto column = batch->GetColumnByName(columnName); + Y_VERIFY(column); + + bool isSorted = (columnId == Meta.FirstPkColumn); + AddMinMax(columnId, column, isSorted); + Y_VERIFY(Meta.HasMinMax(columnId)); + } + } +} + +TString TPortionInfo::GetMetadata(const TColumnRecord& rec) const { + NKikimrTxColumnShard::TIndexColumnMeta meta; // TODO: move proto serialization out of engines folder + if (Meta.ColumnMeta.contains(rec.ColumnId)) { + const auto& columnMeta = Meta.ColumnMeta.find(rec.ColumnId)->second; + if (auto numRows = columnMeta.NumRows) { + meta.SetNumRows(numRows); + } + if (auto rawBytes = columnMeta.RawBytes) { + meta.SetRawBytes(rawBytes); + } + if (columnMeta.HasMinMax()) { + ScalarToConstant(*columnMeta.Min, *meta.MutableMinValue()); + ScalarToConstant(*columnMeta.Max, *meta.MutableMaxValue()); + } + } + + if (rec.ColumnId == Meta.FirstPkColumn) { + auto* portionMeta = meta.MutablePortionMeta(); + + switch (Meta.Produced) { + case TPortionMeta::UNSPECIFIED: + Y_VERIFY(false); + case TPortionMeta::INSERTED: + portionMeta->SetIsInserted(true); + break; + case TPortionMeta::COMPACTED: + portionMeta->SetIsCompacted(true); + break; + case TPortionMeta::SPLIT_COMPACTED: + portionMeta->SetIsSplitCompacted(true); + break; + case TPortionMeta::EVICTED: + portionMeta->SetIsEvicted(true); + break; + case TPortionMeta::INACTIVE: + Y_FAIL("Unexpected inactive case"); + //portionMeta->SetInactive(true); + break; + } + + if (!TierName.empty()) { + portionMeta->SetTierName(TierName); + } + + if (const auto& keyEdgesBatch = Meta.ReplaceKeyEdges) { + Y_VERIFY(keyEdgesBatch); + Y_VERIFY_DEBUG(keyEdgesBatch->ValidateFull().ok()); + Y_VERIFY(keyEdgesBatch->num_rows() == 1 || keyEdgesBatch->num_rows() == 2); + portionMeta->SetPrimaryKeyBorders(NArrow::SerializeBatchNoCompression(keyEdgesBatch)); + } + } + + TString out; + Y_PROTOBUF_SUPPRESS_NODISCARD meta.SerializeToString(&out); + return out; +} + +void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec) { + if (rec.Metadata.empty()) { + return; + } + + NKikimrTxColumnShard::TIndexColumnMeta meta; + bool ok = meta.ParseFromString(rec.Metadata); + Y_VERIFY(ok); + + Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId(); + auto field = indexInfo.ArrowColumnField(rec.ColumnId); + const bool compositeIndexKey = indexInfo.IsCompositeIndexKey(); + + if (meta.HasPortionMeta()) { + Y_VERIFY_DEBUG(rec.ColumnId == Meta.FirstPkColumn); + + auto& portionMeta = meta.GetPortionMeta(); + TierName = portionMeta.GetTierName(); + + if (portionMeta.GetIsInserted()) { + Meta.Produced = TPortionMeta::INSERTED; + } else if (portionMeta.GetIsCompacted()) { + Meta.Produced = TPortionMeta::COMPACTED; + } else if (portionMeta.GetIsSplitCompacted()) { + Meta.Produced = TPortionMeta::SPLIT_COMPACTED; + } else if (portionMeta.GetIsEvicted()) { + Meta.Produced = TPortionMeta::EVICTED; + } + + if (portionMeta.HasPrimaryKeyBorders()) { + Meta.ReplaceKeyEdges = NArrow::DeserializeBatch(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey()); + Y_VERIFY(Meta.ReplaceKeyEdges); + Y_VERIFY_DEBUG(Meta.ReplaceKeyEdges->ValidateFull().ok()); + Y_VERIFY(Meta.ReplaceKeyEdges->num_rows() == 1 || Meta.ReplaceKeyEdges->num_rows() == 2); + + if (compositeIndexKey) { + auto edgesBatch = NArrow::ExtractColumns(Meta.ReplaceKeyEdges, indexInfo.GetIndexKey()); + Y_VERIFY(edgesBatch); + Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0); + Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1); + } + } + } + if (meta.HasNumRows()) { + Meta.ColumnMeta[rec.ColumnId].NumRows = meta.GetNumRows(); + } + if (meta.HasRawBytes()) { + Meta.ColumnMeta[rec.ColumnId].RawBytes = meta.GetRawBytes(); + } + if (meta.HasMinValue()) { + auto scalar = ConstantToScalar(meta.GetMinValue(), field->type()); + Meta.ColumnMeta[rec.ColumnId].Min = scalar; + + // Restore Meta.IndexKeyStart for one column IndexKey + if (!compositeIndexKey && rec.ColumnId == Meta.FirstPkColumn) { + Meta.IndexKeyStart = NArrow::TReplaceKey::FromScalar(scalar); + } + } + if (meta.HasMaxValue()) { + auto scalar = ConstantToScalar(meta.GetMaxValue(), field->type()); + Meta.ColumnMeta[rec.ColumnId].Max = scalar; + + // Restore Meta.IndexKeyEnd for one column IndexKey + if (!compositeIndexKey && rec.ColumnId == Meta.FirstPkColumn) { + Meta.IndexKeyEnd = NArrow::TReplaceKey::FromScalar(scalar); + } + } + + // Portion genarated without PrimaryKeyBorders and loaded with indexInfo.IsCompositeIndexKey() + // We should have no such portions for ForceColumnTablesCompositeMarks feature + if (rec.ColumnId == Meta.FirstPkColumn) { + Y_VERIFY(Meta.IndexKeyStart && Meta.IndexKeyEnd); + } +} + +std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const { + if (!Meta.ColumnMeta.contains(columnId)) { + return {}; + } + return Meta.ColumnMeta.find(columnId)->second.Min; +} + +std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const { + if (!Meta.ColumnMeta.contains(columnId)) { + return {}; + } + return Meta.ColumnMeta.find(columnId)->second.Max; +} + +std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const { + Y_VERIFY(!Blobs.empty()); + + std::vector<std::shared_ptr<arrow::RecordBatch>> batches; + batches.reserve(Blobs.size()); + for (auto& blob : Blobs) { + batches.push_back(blob.BuildRecordBatch(*Loader)); + Y_VERIFY(batches.back()); + } + + auto res = arrow::Table::FromRecordBatches(batches); + Y_VERIFY_S(res.ok(), res.status().message()); + return (*res)->column(0); +} + +std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const { + std::vector<std::shared_ptr<arrow::ChunkedArray>> columns; + std::vector< std::shared_ptr<arrow::Field>> fields; + for (auto&& i : Columns) { + if (!options.IsAcceptedColumn(i.GetColumnId())) { + continue; + } + columns.emplace_back(i.Assemble()); + fields.emplace_back(i.GetField()); + } + + auto table = arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns); + auto res = table->CombineChunks(); + Y_VERIFY(res.ok()); + return NArrow::ToBatch(*res); +} + +} diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h new file mode 100644 index 0000000000..64f9cb643b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -0,0 +1,519 @@ +#pragma once +#include "column_record.h" + +#include <ydb/core/formats/arrow/replace_key.h> +#include <ydb/core/formats/arrow/serializer/abstract.h> +#include <ydb/core/formats/arrow/dictionary/conversion.h> +#include <ydb/core/tx/columnshard/counters/indexation.h> +#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> +#include <ydb/library/yverify_stream/yverify_stream.h> + +namespace NKikimr::NOlap { + +struct TPortionMeta { + // NOTE: These values are persisted in LocalDB so they must be stable + enum EProduced : ui32 { + UNSPECIFIED = 0, + INSERTED = 1, + COMPACTED = 2, + SPLIT_COMPACTED = 3, + INACTIVE = 4, + EVICTED = 5, + }; + + struct TColumnMeta { + ui32 NumRows{0}; + ui32 RawBytes{0}; + std::shared_ptr<arrow::Scalar> Min; + std::shared_ptr<arrow::Scalar> Max; + + bool HasMinMax() const noexcept { + return Min.get() && Max.get(); + } + }; + + EProduced Produced{UNSPECIFIED}; + THashMap<ui32, TColumnMeta> ColumnMeta; + ui32 FirstPkColumn = 0; + std::shared_ptr<arrow::RecordBatch> ReplaceKeyEdges; // first and last PK rows + std::optional<NArrow::TReplaceKey> IndexKeyStart; + std::optional<NArrow::TReplaceKey> IndexKeyEnd; + + bool HasMinMax(ui32 columnId) const { + if (!ColumnMeta.contains(columnId)) { + return false; + } + return ColumnMeta.find(columnId)->second.HasMinMax(); + } + + bool HasPkMinMax() const { + return HasMinMax(FirstPkColumn); + } + + ui32 NumRows() const { + if (FirstPkColumn) { + Y_VERIFY(ColumnMeta.contains(FirstPkColumn)); + return ColumnMeta.find(FirstPkColumn)->second.NumRows; + } + return 0; + } + + friend IOutputStream& operator << (IOutputStream& out, const TPortionMeta& info) { + out << "reason" << (ui32)info.Produced; + for (const auto& [_, meta] : info.ColumnMeta) { + if (meta.NumRows) { + out << " " << meta.NumRows << " rows"; + break; + } + } + return out; + } +}; + +class TPortionAddress { +private: + YDB_READONLY(ui64, GranuleId, 0); + YDB_READONLY(ui64, PortionId, 0); +public: + TPortionAddress(const ui64 granuleId, const ui64 portionId) + : GranuleId(granuleId) + , PortionId(portionId) + { + + } + + bool operator<(const TPortionAddress& item) const { + return std::tie(GranuleId, PortionId) < std::tie(item.GranuleId, item.PortionId); + } + + bool operator==(const TPortionAddress& item) const { + return std::tie(GranuleId, PortionId) == std::tie(item.GranuleId, item.PortionId); + } +}; + +struct TPortionInfo { + static constexpr const ui32 BLOB_BYTES_LIMIT = 8 * 1024 * 1024; + + std::vector<TColumnRecord> Records; + TPortionMeta Meta; + TString TierName; + + bool Empty() const { return Records.empty(); } + bool Produced() const { return Meta.Produced != TPortionMeta::UNSPECIFIED; } + bool Valid() const { return !Empty() && Produced() && Meta.HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; } + bool IsInserted() const { return Meta.Produced == TPortionMeta::INSERTED; } + bool IsEvicted() const { return Meta.Produced == TPortionMeta::EVICTED; } + bool CanHaveDups() const { return !Produced(); /* || IsInserted(); */ } + bool CanIntersectOthers() const { return !Valid() || IsInserted() || IsEvicted(); } + size_t NumRecords() const { return Records.size(); } + + bool CheckForCleanup(const TSnapshot& snapshot) const { + if (!CheckForCleanup()) { + return false; + } + + return GetXSnapshot() < snapshot; + } + + bool CheckForCleanup() const { + return !IsActive(); + } + + bool AllowEarlyFilter() const { + return Meta.Produced == TPortionMeta::COMPACTED + || Meta.Produced == TPortionMeta::SPLIT_COMPACTED; + } + + bool EvictReady(size_t hotSize) const { + return Meta.Produced == TPortionMeta::COMPACTED + || Meta.Produced == TPortionMeta::SPLIT_COMPACTED + || Meta.Produced == TPortionMeta::EVICTED + || (Meta.Produced == TPortionMeta::INSERTED && BlobsSizes().first >= hotSize); + } + + ui64 Portion() const { + Y_VERIFY(!Empty()); + auto& rec = Records[0]; + return rec.Portion; + } + + ui64 Granule() const { + Y_VERIFY(!Empty()); + auto& rec = Records[0]; + return rec.Granule; + } + + TPortionAddress GetAddress() const { + Y_VERIFY(!Empty()); + auto& rec = Records[0]; + return TPortionAddress(rec.Granule, rec.Portion); + } + + void SetGranule(ui64 granule) { + for (auto& rec : Records) { + rec.Granule = granule; + } + } + + TSnapshot GetSnapshot() const { + Y_VERIFY(!Empty()); + auto& rec = Records[0]; + return TSnapshot(rec.PlanStep, rec.TxId); + } + + TSnapshot GetXSnapshot() const { + Y_VERIFY(!Empty()); + auto& rec = Records[0]; + return TSnapshot(rec.XPlanStep, rec.XTxId); + } + + bool IsActive() const { + return GetXSnapshot().IsZero(); + } + + std::pair<ui32, ui32> BlobsSizes() const { + ui32 sum = 0; + ui32 max = 0; + for (const auto& rec : Records) { + sum += rec.BlobRange.Size; + max = Max(max, rec.BlobRange.Size); + } + return {sum, max}; + } + + ui64 BlobsBytes() const noexcept { + ui64 sum = 0; + for (const auto& rec : Records) { + sum += rec.BlobRange.Size; + } + return sum; + } + + void UpdateRecords(ui64 portion, const THashMap<ui64, ui64>& granuleRemap) { + for (auto& rec : Records) { + rec.Portion = portion; + } + if (!granuleRemap.empty()) { + for (auto& rec : Records) { + Y_VERIFY(granuleRemap.contains(rec.Granule)); + rec.Granule = granuleRemap.find(rec.Granule)->second; + } + } + } + + void UpdateRecordsMeta(TPortionMeta::EProduced produced) { + Meta.Produced = produced; + for (auto& record : Records) { + record.Metadata = GetMetadata(record); + } + } + + void SetStale(const TSnapshot& snapshot) { + for (auto& rec : Records) { + rec.SetXSnapshot(snapshot); + } + } + + void AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec) { + Records.push_back(rec); + LoadMetadata(indexInfo, rec); + } + + TString GetMetadata(const TColumnRecord& rec) const; + void LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec); + void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, + const TString& tierName); + void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted); + + std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const; + std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const; + + const NArrow::TReplaceKey& IndexKeyStart() const { + Y_VERIFY(Meta.IndexKeyStart); + return *Meta.IndexKeyStart; + } + + const NArrow::TReplaceKey& IndexKeyEnd() const { + Y_VERIFY(Meta.IndexKeyEnd); + return *Meta.IndexKeyEnd; + } + + ui32 NumRows() const { + return Meta.NumRows(); + } + + ui64 GetRawBytes(const std::vector<ui32>& columnIds) const { + ui64 sum = 0; + const ui32 numRows = NumRows(); + for (auto&& i : columnIds) { + if (TIndexInfo::IsSpecialColumn(i)) { + sum += numRows * TIndexInfo::GetSpecialColumnByteWidth(i); + } else { + auto it = Meta.ColumnMeta.find(i); + if (it != Meta.ColumnMeta.end()) { + sum += it->second.RawBytes; + } + } + } + return sum; + } + + ui64 RawBytesSum() const { + ui64 sum = 0; + for (auto& [columnId, colMeta] : Meta.ColumnMeta) { + sum += colMeta.RawBytes; + } + return sum; + } + +private: + class TMinGetter { + public: + static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) { + return portionInfo.MinValue(columnId); + } + }; + + class TMaxGetter { + public: + static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) { + return portionInfo.MaxValue(columnId); + } + }; + + template <class TSelfGetter, class TItemGetter = TSelfGetter> + int CompareByColumnIdsImpl(const TPortionInfo& item, const std::vector<ui32>& columnIds) const { + for (auto&& i : columnIds) { + std::shared_ptr<arrow::Scalar> valueSelf = TSelfGetter::Get(*this, i); + std::shared_ptr<arrow::Scalar> valueItem = TItemGetter::Get(item, i); + if (!!valueSelf && !!valueItem) { + const int cmpResult = NArrow::ScalarCompare(valueSelf, valueItem); + if (cmpResult) { + return cmpResult; + } + } else if (!!valueSelf) { + return 1; + } else if (!!valueItem) { + return -1; + } + } + return 0; + } +public: + int CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const { + return CompareByColumnIdsImpl<TMaxGetter, TMinGetter>(item, info.KeyColumns); + } + + int CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const { + return CompareMinByColumnIds(item, info.KeyColumns); + } + + int CompareMinByColumnIds(const TPortionInfo& item, const std::vector<ui32>& columnIds) const { + return CompareByColumnIdsImpl<TMinGetter>(item, columnIds); + } + + class TAssembleBlobInfo { + private: + ui32 NullRowsCount = 0; + TString Data; + public: + TAssembleBlobInfo(const ui32 rowsCount) + : NullRowsCount(rowsCount) { + + } + + TAssembleBlobInfo(const TString& data) + : Data(data) { + + } + + ui32 GetNullRowsCount() const noexcept { + return NullRowsCount; + } + + const TString& GetData() const noexcept { + return Data; + } + + std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(const TColumnLoader& loader) const { + if (NullRowsCount) { + Y_VERIFY(!Data); + return NArrow::MakeEmptyBatch(loader.GetExpectedSchema(), NullRowsCount); + } else { + auto result = loader.Apply(Data); + if (!result.ok()) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "cannot unpack batch")("error", result.status().ToString())("loader", loader.DebugString()); + return nullptr; + } + return *result; + } + } + }; + + class TPreparedColumn { + private: + std::shared_ptr<TColumnLoader> Loader; + std::vector<TAssembleBlobInfo> Blobs; + public: + ui32 GetColumnId() const { + return Loader->GetColumnId(); + } + + const std::string& GetName() const { + return Loader->GetExpectedSchema()->field(0)->name(); + } + + std::shared_ptr<arrow::Field> GetField() const { + return Loader->GetExpectedSchema()->field(0); + } + + TPreparedColumn(std::vector<TAssembleBlobInfo>&& blobs, const std::shared_ptr<TColumnLoader>& loader) + : Loader(loader) + , Blobs(std::move(blobs)) + { + Y_VERIFY(Loader); + Y_VERIFY(Loader->GetExpectedSchema()->num_fields() == 1); + } + + std::shared_ptr<arrow::ChunkedArray> Assemble() const; + }; + + class TPreparedBatchData { + private: + std::vector<TPreparedColumn> Columns; + std::shared_ptr<arrow::Schema> Schema; + size_t RowsCount = 0; + + public: + struct TAssembleOptions { + std::optional<std::set<ui32>> IncludedColumnIds; + std::optional<std::set<ui32>> ExcludedColumnIds; + + bool IsAcceptedColumn(const ui32 columnId) const { + if (IncludedColumnIds && !IncludedColumnIds->contains(columnId)) { + return false; + } + if (ExcludedColumnIds && ExcludedColumnIds->contains(columnId)) { + return false; + } + return true; + } + }; + + std::vector<std::string> GetSchemaColumnNames() const { + return Schema->field_names(); + } + + size_t GetColumnsCount() const { + return Columns.size(); + } + + size_t GetRowsCount() const { + return RowsCount; + } + + TPreparedBatchData(std::vector<TPreparedColumn>&& columns, std::shared_ptr<arrow::Schema> schema, const size_t rowsCount) + : Columns(std::move(columns)) + , Schema(schema) + , RowsCount(rowsCount) + { + } + + std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = {}) const; + }; + + template <class TExternalBlobInfo> + TPreparedBatchData PrepareForAssemble(const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema, + const THashMap<TBlobRange, TExternalBlobInfo>& blobsData) const { + std::vector<TPreparedColumn> columns; + columns.reserve(resultSchema.GetSchema()->num_fields()); + + Y_VERIFY(!Meta.ColumnMeta.empty()); + const ui32 rowsCount = Meta.ColumnMeta.begin()->second.NumRows; + for (auto&& field : resultSchema.GetSchema()->fields()) { + columns.emplace_back(TPreparedColumn({ TAssembleBlobInfo(rowsCount) }, resultSchema.GetColumnLoader(field->name()))); + } + + TMap<size_t, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks + TMap<size_t, size_t> positionsMap; + + for (auto& rec : Records) { + auto resulPos = resultSchema.GetFieldIndex(rec.ColumnId); + if (resulPos < 0) { + continue; + } + auto pos = dataSchema.GetFieldIndex(rec.ColumnId); + Y_ASSERT(pos >= 0); + positionsMap[resulPos] = pos; + columnChunks[resulPos][rec.Chunk] = rec.BlobRange; + auto columnMeta = Meta.ColumnMeta.FindPtr(rec.ColumnId); + if (columnMeta) { + Y_VERIFY_S(rowsCount == columnMeta->NumRows, TStringBuilder() << "Inconsistent rows " << rowsCount << "/" << columnMeta->NumRows); + } + } + + // Make chunked arrays for columns + for (auto& [pos, orderedChunks] : columnChunks) { + Y_VERIFY(positionsMap.contains(pos)); + size_t dataPos = positionsMap[pos]; + auto portionField = dataSchema.GetFieldByIndex(dataPos); + auto resultField = resultSchema.GetFieldByIndex(pos); + + Y_VERIFY(portionField->IsCompatibleWith(*resultField)); + + std::vector<TAssembleBlobInfo> blobs; + blobs.reserve(orderedChunks.size()); + ui32 expected = 0; + for (auto& [chunk, blobRange] : orderedChunks) { + Y_VERIFY(chunk == expected); + ++expected; + + auto it = blobsData.find(blobRange); + Y_VERIFY(it != blobsData.end()); + blobs.emplace_back(it->second); + } + + Y_VERIFY(pos < columns.size()); + columns[pos] = TPreparedColumn(std::move(blobs), dataSchema.GetColumnLoader(resultField->name())); + } + + return TPreparedBatchData(std::move(columns), resultSchema.GetSchema(), rowsCount); + } + + std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const ISnapshotSchema& dataSchema, + const ISnapshotSchema& resultSchema, + const THashMap<TBlobRange, TString>& data) const { + auto batch = PrepareForAssemble(dataSchema, resultSchema, data).Assemble(); + Y_VERIFY(batch->Validate().ok()); + return batch; + } + + static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array, + const std::shared_ptr<arrow::Field>& field, + const TColumnSaver saver); + + void AppendOneChunkColumn(TColumnRecord&& record); + + friend IOutputStream& operator << (IOutputStream& out, const TPortionInfo& info) { + for (auto& rec : info.Records) { + out << " " << rec; + out << " (1 of " << info.Records.size() << " blobs shown)"; + break; + } + out << ";activity=" << info.IsActive() << ";"; + if (!info.TierName.empty()) { + out << " tier: " << info.TierName; + } + out << " " << info.Meta; + return out; + } +}; + +/// Ensure that TPortionInfo can be effectively assigned by moving the value. +static_assert(std::is_nothrow_move_assignable<TPortionInfo>::value); + +/// Ensure that TPortionInfo can be effectively constructed by moving the value. +static_assert(std::is_nothrow_move_constructible<TPortionInfo>::value); + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/portions/ya.make b/ydb/core/tx/columnshard/engines/portions/ya.make new file mode 100644 index 0000000000..c1be81651a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + portion_info.cpp + column_record.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/engines/scheme +) + +GENERATE_ENUM_SERIALIZATION(portion_info.h) + +END() diff --git a/ydb/core/tx/columnshard/engines/scalars.h b/ydb/core/tx/columnshard/engines/scalars.h index a7fc346f68..ed70b1299a 100644 --- a/ydb/core/tx/columnshard/engines/scalars.h +++ b/ydb/core/tx/columnshard/engines/scalars.h @@ -1,17 +1,2 @@ #pragma once - -#include "defs.h" - -#include <ydb/core/protos/tx_columnshard.pb.h> -#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h> - -namespace NKikimr::NOlap { - -void ScalarToConstant(const arrow::Scalar& scalar, NKikimrSSA::TProgram_TConstant& value); -std::shared_ptr<arrow::Scalar> ConstantToScalar(const NKikimrSSA::TProgram_TConstant& value, - const std::shared_ptr<arrow::DataType>& type); - -TString SerializeKeyScalar(const std::shared_ptr<arrow::Scalar>& key); -std::shared_ptr<arrow::Scalar> DeserializeKeyScalar(const TString& key, const std::shared_ptr<arrow::DataType>& type); - -} +#include <ydb/core/tx/columnshard/common/scalars.h> diff --git a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.darwin-x86_64.txt index 3aeec490fa..ef6c197306 100644 --- a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.darwin-x86_64.txt @@ -22,4 +22,7 @@ target_sources(columnshard-engines-scheme PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/index_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/column_features.cpp ) diff --git a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-aarch64.txt index 9568802bdb..f2b4f18273 100644 --- a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-aarch64.txt @@ -23,4 +23,7 @@ target_sources(columnshard-engines-scheme PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/index_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/column_features.cpp ) diff --git a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-x86_64.txt index 9568802bdb..f2b4f18273 100644 --- a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.linux-x86_64.txt @@ -23,4 +23,7 @@ target_sources(columnshard-engines-scheme PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/index_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/column_features.cpp ) diff --git a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.windows-x86_64.txt index 3aeec490fa..ef6c197306 100644 --- a/ydb/core/tx/columnshard/engines/scheme/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/scheme/CMakeLists.windows-x86_64.txt @@ -22,4 +22,7 @@ target_sources(columnshard-engines-scheme PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/index_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scheme/column_features.cpp ) diff --git a/ydb/core/tx/columnshard/engines/scheme/column_features.cpp b/ydb/core/tx/columnshard/engines/scheme/column_features.cpp new file mode 100644 index 0000000000..dd6341ec7a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/column_features.cpp @@ -0,0 +1,79 @@ +#include "column_features.h" +#include "index_info.h" +#include <ydb/core/formats/arrow/serializer/full.h> +#include <ydb/core/formats/arrow/serializer/batch_only.h> +#include <util/string/builder.h> + +namespace NKikimr::NOlap { + +NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetSaveTransformer() const { + NArrow::NTransformation::ITransformer::TPtr transformer; + if (DictionaryEncoding) { + transformer = DictionaryEncoding->BuildEncoder(); + } + return transformer; +} + +NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetLoadTransformer() const { + NArrow::NTransformation::ITransformer::TPtr transformer; + if (DictionaryEncoding) { + transformer = DictionaryEncoding->BuildDecoder(); + } + return transformer; +} + +std::shared_ptr<NKikimr::NOlap::TColumnLoader> TColumnFeatures::GetLoader(const TIndexInfo& info) const { + if (!LoaderCache) { + NArrow::NTransformation::ITransformer::TPtr transformer = GetLoadTransformer(); + auto schema = info.GetColumnSchema(ColumnId); + if (!transformer) { + LoaderCache = std::make_shared<TColumnLoader>(transformer, + std::make_shared<NArrow::NSerialization::TBatchPayloadDeserializer>(schema), + schema, ColumnId); + } else { + LoaderCache = std::make_shared<TColumnLoader>(transformer, + std::make_shared<NArrow::NSerialization::TFullDataDeserializer>(), + schema, ColumnId); + } + } + return LoaderCache; +} + +std::optional<NKikimr::NOlap::TColumnFeatures> TColumnFeatures::BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo, const ui32 columnId) { + TColumnFeatures result(columnId); + if (columnInfo.HasCompression()) { + auto settings = NArrow::TCompression::BuildFromProto(columnInfo.GetCompression()); + Y_VERIFY(settings.IsSuccess()); + result.Compression = *settings; + } + if (columnInfo.HasDictionaryEncoding()) { + auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnInfo.GetDictionaryEncoding()); + Y_VERIFY(settings.IsSuccess()); + result.DictionaryEncoding = *settings; + } + return result; +} + +std::unique_ptr<arrow::util::Codec> TColumnFeatures::GetCompressionCodec() const { + if (Compression) { + return Compression->BuildArrowCodec(); + } else { + return nullptr; + } +} + +TString TColumnLoader::DebugString() const { + TStringBuilder result; + if (ExpectedSchema) { + result << "schema:" << ExpectedSchema->ToString() << ";"; + } + if (Transformer) { + result << "transformer:" << Transformer->DebugString() << ";"; + } + if (Deserializer) { + result << "deserializer:" << Deserializer->DebugString() << ";"; + } + return result; +} + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/column_features.h b/ydb/core/tx/columnshard/engines/scheme/column_features.h new file mode 100644 index 0000000000..bb5e79428a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/column_features.h @@ -0,0 +1,120 @@ +#pragma once +#include <ydb/core/formats/arrow/compression/object.h> +#include <ydb/core/formats/arrow/dictionary/object.h> +#include <ydb/core/formats/arrow/serializer/abstract.h> +#include <ydb/core/formats/arrow/transformer/abstract.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> + +namespace NKikimr::NOlap { + +class TSaverContext { +private: + TString TierName; + std::optional<NArrow::TCompression> ExternalCompression; +public: + const std::optional<NArrow::TCompression>& GetExternalCompression() const { + return ExternalCompression; + } + TSaverContext& SetExternalCompression(const std::optional<NArrow::TCompression>& value) { + ExternalCompression = value; + return *this; + } + const TString& GetTierName() const { + return TierName; + } + TSaverContext& SetTierName(const TString& value) { + TierName = value; + return *this; + } +}; + +class TColumnSaver { +private: + NArrow::NTransformation::ITransformer::TPtr Transformer; + NArrow::NSerialization::ISerializer::TPtr Serializer; +public: + TColumnSaver() = default; + TColumnSaver(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::ISerializer::TPtr serializer) + : Transformer(transformer) + , Serializer(serializer) { + Y_VERIFY(Serializer); + } + + TString Apply(const std::shared_ptr<arrow::RecordBatch>& data) const { + Y_VERIFY(Serializer); + if (Transformer) { + return Serializer->Serialize(Transformer->Transform(data)); + } else { + return Serializer->Serialize(data); + } + } +}; + +class TColumnLoader { +private: + NArrow::NTransformation::ITransformer::TPtr Transformer; + NArrow::NSerialization::IDeserializer::TPtr Deserializer; + std::shared_ptr<arrow::Schema> ExpectedSchema; + const ui32 ColumnId; +public: + TString DebugString() const; + + TColumnLoader(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::IDeserializer::TPtr deserializer, + const std::shared_ptr<arrow::Schema>& expectedSchema, const ui32 columnId) + : Transformer(transformer) + , Deserializer(deserializer) + , ExpectedSchema(expectedSchema) + , ColumnId(columnId) + { + Y_VERIFY(ExpectedSchema); + Y_VERIFY(Deserializer); + } + + ui32 GetColumnId() const { + return ColumnId; + } + + std::shared_ptr<arrow::Schema> GetExpectedSchema() const { + return ExpectedSchema; + } + + arrow::Result<std::shared_ptr<arrow::RecordBatch>> Apply(const TString& data) const { + Y_VERIFY(Deserializer); + arrow::Result<std::shared_ptr<arrow::RecordBatch>> columnArray = Deserializer->Deserialize(data); + if (!columnArray.ok()) { + return columnArray; + } + if (Transformer) { + return Transformer->Transform(*columnArray); + } else { + return columnArray; + } + } +}; + +struct TIndexInfo; + +class TColumnFeatures { +private: + const ui32 ColumnId; + std::optional<NArrow::TCompression> Compression; + std::optional<NArrow::NDictionary::TEncodingSettings> DictionaryEncoding; + mutable std::shared_ptr<TColumnLoader> LoaderCache; +public: + TColumnFeatures(const ui32 columnId) + : ColumnId(columnId) + { + + } + static std::optional<TColumnFeatures> BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo, const ui32 columnId); + + NArrow::NTransformation::ITransformer::TPtr GetSaveTransformer() const; + NArrow::NTransformation::ITransformer::TPtr GetLoadTransformer() const; + + std::unique_ptr<arrow::util::Codec> GetCompressionCodec() const; + + std::shared_ptr<TColumnLoader> GetLoader(const TIndexInfo& info) const; + +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp new file mode 100644 index 0000000000..b87054b82c --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -0,0 +1,429 @@ +#include "index_info.h" + +#include <ydb/core/formats/arrow/arrow_batch_builder.h> +#include <ydb/core/formats/arrow/sort_cursor.h> +#include <ydb/core/sys_view/common/schema.h> +#include <ydb/core/formats/arrow/serializer/batch_only.h> +#include <ydb/core/formats/arrow/transformer/dictionary.h> +#include <ydb/core/formats/arrow/serializer/full.h> +#include <ydb/core/base/appdata.h> + +namespace NKikimr::NOlap { + +const TString TIndexInfo::STORE_INDEX_STATS_TABLE = TString("/") + NSysView::SysPathName + "/" + NSysView::StorePrimaryIndexStatsName; +const TString TIndexInfo::TABLE_INDEX_STATS_TABLE = TString("/") + NSysView::SysPathName + "/" + NSysView::TablePrimaryIndexStatsName; + +static std::vector<TString> NamesOnly(const std::vector<TNameTypeInfo>& columns) { + std::vector<TString> out; + out.reserve(columns.size()); + for (const auto& [name, _] : columns) { + out.push_back(name); + } + return out; +} + +TIndexInfo::TIndexInfo(const TString& name, ui32 id) + : NTable::TScheme::TTableSchema() + , Id(id) + , Name(name) +{} + +std::shared_ptr<arrow::RecordBatch> TIndexInfo::AddSpecialColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) { + Y_VERIFY(batch); + i64 numColumns = batch->num_columns(); + i64 numRows = batch->num_rows(); + + auto res = batch->AddColumn(numColumns, arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), + NArrow::MakeUI64Array(snapshot.GetPlanStep(), numRows)); + Y_VERIFY(res.ok()); + res = (*res)->AddColumn(numColumns + 1, arrow::field(SPEC_COL_TX_ID, arrow::uint64()), + NArrow::MakeUI64Array(snapshot.GetTxId(), numRows)); + Y_VERIFY(res.ok()); + Y_VERIFY((*res)->num_columns() == numColumns + 2); + return *res; +} + +std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchemaSnapshot() { + static std::shared_ptr<arrow::Schema> result = std::make_shared<arrow::Schema>(arrow::FieldVector{ + arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), + arrow::field(SPEC_COL_TX_ID, arrow::uint64()) + }); + return result; +} + +bool TIndexInfo::IsSpecialColumn(const arrow::Field& field) { + return IsSpecialColumn(field.name()); +} + +bool TIndexInfo::IsSpecialColumn(const std::string& fieldName) { + return fieldName == SPEC_COL_PLAN_STEP + || fieldName == SPEC_COL_TX_ID; +} + +bool TIndexInfo::IsSpecialColumn(const ui32 fieldId) { + return fieldId == (ui32)ESpecialColumn::PLAN_STEP + || fieldId == (ui32)ESpecialColumn::TX_ID; +} + +ui32 TIndexInfo::GetColumnId(const std::string& name) const { + auto id = GetColumnIdOptional(name); + Y_VERIFY(!!id, "undefined column %s", name.data()); + return *id; +} + +std::optional<ui32> TIndexInfo::GetColumnIdOptional(const std::string& name) const { + const auto ni = ColumnNames.find(name); + + if (ni != ColumnNames.end()) { + return ni->second; + } + if (name == SPEC_COL_PLAN_STEP) { + return ui32(ESpecialColumn::PLAN_STEP); + } else if (name == SPEC_COL_TX_ID) { + return ui32(ESpecialColumn::TX_ID); + } + return {}; +} + +TString TIndexInfo::GetColumnName(ui32 id, bool required) const { + if (ESpecialColumn(id) == ESpecialColumn::PLAN_STEP) { + return SPEC_COL_PLAN_STEP; + } else if (ESpecialColumn(id) == ESpecialColumn::TX_ID) { + return SPEC_COL_TX_ID; + } else { + const auto ci = Columns.find(id); + + if (!required && ci == Columns.end()) { + return {}; + } + + Y_VERIFY(ci != Columns.end()); + return ci->second.Name; + } +} + +std::vector<ui32> TIndexInfo::GetColumnIds() const { + std::vector<ui32> result; + for (auto&& i : Columns) { + result.emplace_back(i.first); + } + result.emplace_back((ui32)ESpecialColumn::PLAN_STEP); + result.emplace_back((ui32)ESpecialColumn::TX_ID); + return result; +} + +std::vector<TString> TIndexInfo::GetColumnNames(const std::vector<ui32>& ids) const { + std::vector<TString> out; + out.reserve(ids.size()); + for (ui32 id : ids) { + const auto ci = Columns.find(id); + Y_VERIFY(ci != Columns.end()); + out.push_back(ci->second.Name); + } + return out; +} + +std::vector<TNameTypeInfo> TIndexInfo::GetColumns(const std::vector<ui32>& ids) const { + return NOlap::GetColumns(*this, ids); +} + +std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema() const { + if (!Schema) { + std::vector<ui32> ids; + ids.reserve(Columns.size()); + for (const auto& [id, _] : Columns) { + ids.push_back(id); + } + + // The ids had a set type before so we keep them sorted. + std::sort(ids.begin(), ids.end()); + Schema = MakeArrowSchema(Columns, ids); + } + + return Schema; +} + +std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchemaWithSpecials() const { + if (SchemaWithSpecials) { + return SchemaWithSpecials; + } + + const auto& schema = ArrowSchema(); + + std::vector<std::shared_ptr<arrow::Field>> extended; + extended.reserve(schema->num_fields() + 3); + + // Place special fields at the beginning of the schema. + extended.push_back(arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64())); + extended.push_back(arrow::field(SPEC_COL_TX_ID, arrow::uint64())); + // Append fields from the regular schema afterward. + extended.insert(extended.end(), schema->fields().begin(), schema->fields().end()); + + SchemaWithSpecials = std::make_shared<arrow::Schema>(std::move(extended)); + return SchemaWithSpecials; +} + +std::shared_ptr<arrow::Schema> TIndexInfo::AddColumns( + const std::shared_ptr<arrow::Schema>& src, + const std::vector<TString>& columns) const +{ + std::shared_ptr<arrow::Schema> all = ArrowSchemaWithSpecials(); + auto fields = src->fields(); + + for (const auto& col : columns) { + const std::string name(col.data(), col.size()); + if (!src->GetFieldByName(name)) { + auto field = all->GetFieldByName(name); + if (!field) { + return {}; + } + fields.push_back(field); + } + } + return std::make_shared<arrow::Schema>(std::move(fields)); +} + +std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const std::vector<ui32>& columnIds, bool withSpecials) const { + return MakeArrowSchema(Columns, columnIds, withSpecials); +} + +std::vector<ui32> TIndexInfo::GetColumnIds(const std::vector<TString>& columnNames) const { + std::vector<ui32> ids; + ids.reserve(columnNames.size()); + for (auto& name : columnNames) { + auto columnId = GetColumnIdOptional(name); + if (!columnId) { + return {}; + } + ids.emplace_back(*columnId); + } + return ids; +} + +std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema(const std::vector<TString>& names) const { + auto columnIds = GetColumnIds(names); + if (columnIds.empty()) { + return {}; + } + return MakeArrowSchema(Columns, columnIds); +} + +std::shared_ptr<arrow::Field> TIndexInfo::ArrowColumnField(ui32 columnId) const { + auto it = ArrowColumnByColumnIdCache.find(columnId); + if (it == ArrowColumnByColumnIdCache.end()) { + it = ArrowColumnByColumnIdCache.emplace(columnId, ArrowSchema()->GetFieldByName(GetColumnName(columnId, true))).first; + } + return it->second; +} + +void TIndexInfo::SetAllKeys() { + /// @note Setting replace and sorting key to PK we are able to: + /// * apply REPLACE by MergeSort + /// * apply PK predicate before REPLACE + const auto& primaryKeyNames = NamesOnly(GetPrimaryKey()); + // Update set of required columns with names from primary key. + for (const auto& name: primaryKeyNames) { + RequiredColumns.insert(name); + } + + std::vector<std::shared_ptr<arrow::Field>> fields; + if (primaryKeyNames.size()) { + SortingKey = ArrowSchema(primaryKeyNames); + ReplaceKey = SortingKey; + fields = ReplaceKey->fields(); + if (CompositeIndexKey) { + IndexKey = ReplaceKey; + } else { + IndexKey = std::make_shared<arrow::Schema>(arrow::FieldVector({ fields[0] })); + } + } + + fields.push_back(arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64())); + fields.push_back(arrow::field(SPEC_COL_TX_ID, arrow::uint64())); + ExtendedKey = std::make_shared<arrow::Schema>(std::move(fields)); + + for (const auto& [colId, column] : Columns) { + if (NArrow::IsPrimitiveYqlType(column.PType)) { + MinMaxIdxColumnsIds.insert(colId); + } + } + MinMaxIdxColumnsIds.insert(GetPKFirstColumnId()); +} + +std::shared_ptr<NArrow::TSortDescription> TIndexInfo::SortDescription() const { + if (GetSortingKey()) { + auto key = GetExtendedKey(); // Sort with extended key, greater snapshot first + Y_VERIFY(key && key->num_fields() > 2); + auto description = std::make_shared<NArrow::TSortDescription>(key); + description->Directions[key->num_fields() - 1] = -1; + description->Directions[key->num_fields() - 2] = -1; + description->NotNull = true; // TODO + return description; + } + return {}; +} + +std::shared_ptr<NArrow::TSortDescription> TIndexInfo::SortReplaceDescription() const { + if (GetSortingKey()) { + auto key = GetExtendedKey(); // Sort with extended key, greater snapshot first + Y_VERIFY(key && key->num_fields() > 2); + auto description = std::make_shared<NArrow::TSortDescription>(key, GetReplaceKey()); + description->Directions[key->num_fields() - 1] = -1; + description->Directions[key->num_fields() - 2] = -1; + description->NotNull = true; // TODO + return description; + } + return {}; +} + +bool TIndexInfo::AllowTtlOverColumn(const TString& name) const { + auto it = ColumnNames.find(name); + if (it == ColumnNames.end()) { + return false; + } + return MinMaxIdxColumnsIds.contains(it->second); +} + +TColumnSaver TIndexInfo::GetColumnSaver(const ui32 columnId, const TSaverContext& context) const { + arrow::ipc::IpcWriteOptions options; + options.use_threads = false; + + NArrow::NTransformation::ITransformer::TPtr transformer; + std::unique_ptr<arrow::util::Codec> columnCodec; + { + auto it = ColumnFeatures.find(columnId); + if (it != ColumnFeatures.end()) { + transformer = it->second.GetSaveTransformer(); + columnCodec = it->second.GetCompressionCodec(); + } + } + + if (context.GetExternalCompression()) { + options.codec = context.GetExternalCompression()->BuildArrowCodec(); + } else if (columnCodec) { + options.codec = std::move(columnCodec); + } else if (DefaultCompression) { + options.codec = DefaultCompression->BuildArrowCodec(); + } else { + options.codec = NArrow::TCompression::BuildDefaultCodec(); + } + + if (!transformer) { + return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TBatchPayloadSerializer>(options)); + } else { + return TColumnSaver(transformer, std::make_shared<NArrow::NSerialization::TFullDataSerializer>(options)); + } +} + +TColumnFeatures& TIndexInfo::GetOrCreateColumnFeatures(const ui32 columnId) const { + auto it = ColumnFeatures.find(columnId); + if (it == ColumnFeatures.end()) { + it = ColumnFeatures.emplace(columnId, TColumnFeatures(columnId)).first; + } + return it->second; +} + +std::shared_ptr<TColumnLoader> TIndexInfo::GetColumnLoader(const ui32 columnId) const { + TColumnFeatures& features = GetOrCreateColumnFeatures(columnId); + return features.GetLoader(*this); +} + +std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId) const { + std::shared_ptr<arrow::Schema> schema = Schema; + if (IsSpecialColumn(columnId)) { + schema = ArrowSchemaSnapshot(); + } + auto field = schema->GetFieldByName(GetColumnName(columnId)); + Y_VERIFY(field); + std::vector<std::shared_ptr<arrow::Field>> fields = { field }; + return std::make_shared<arrow::Schema>(fields); +} + +bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) { + if (schema.GetEngine() != NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", "incorrect_engine_in_schema"); + return false; + } + + for (const auto& col : schema.GetColumns()) { + const ui32 id = col.GetId(); + const TString& name = col.GetName(); + auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(), + col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr); + Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod); + ColumnNames[name] = id; + std::optional<TColumnFeatures> cFeatures = TColumnFeatures::BuildFromProto(col, id); + if (!cFeatures) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_column_feature"); + return false; + } + ColumnFeatures.emplace(id, *cFeatures); + } + + for (const auto& keyName : schema.GetKeyColumnNames()) { + Y_VERIFY(ColumnNames.contains(keyName)); + KeyColumns.push_back(ColumnNames[keyName]); + } + + if (schema.HasDefaultCompression()) { + auto result = NArrow::TCompression::BuildFromProto(schema.GetDefaultCompression()); + if (!result) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", result.GetErrorMessage()); + return false; + } + DefaultCompression = *result; + } + + CompositeMarks = schema.GetCompositeMarks(); + CompositeIndexKey = AppData()->FeatureFlags.GetForceColumnTablesCompositeMarks() ? true : CompositeMarks; + return true; +} + +bool TIndexInfo::CheckAlterScheme(const NKikimrSchemeOp::TColumnTableSchema& scheme) const { + return CompositeMarks == scheme.GetCompositeMarks(); +} + +std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials) { + std::vector<std::shared_ptr<arrow::Field>> fields; + fields.reserve(withSpecials ? ids.size() + 2 : ids.size()); + + if (withSpecials) { + // Place special fields at the beginning of the schema. + fields.push_back(arrow::field(TIndexInfo::SPEC_COL_PLAN_STEP, arrow::uint64())); + fields.push_back(arrow::field(TIndexInfo::SPEC_COL_TX_ID, arrow::uint64())); + } + + for (const ui32 id: ids) { + auto it = columns.find(id); + if (it == columns.end()) { + continue; + } + + const auto& column = it->second; + std::string colName(column.Name.data(), column.Name.size()); + fields.emplace_back(std::make_shared<arrow::Field>(colName, NArrow::GetArrowType(column.PType))); + } + + return std::make_shared<arrow::Schema>(std::move(fields)); +} + +std::vector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const std::vector<ui32>& ids) { + std::vector<std::pair<TString, NScheme::TTypeInfo>> out; + out.reserve(ids.size()); + for (const ui32 id : ids) { + const auto ci = tableSchema.Columns.find(id); + Y_VERIFY(ci != tableSchema.Columns.end()); + out.emplace_back(ci->second.Name, ci->second.PType); + } + return out; +} + +std::optional<TIndexInfo> TIndexInfo::BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) { + TIndexInfo result("", 0); + if (!result.DeserializeFromProto(schema)) { + return std::nullopt; + } + return result; +} + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h new file mode 100644 index 0000000000..7bb0b9bdda --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -0,0 +1,203 @@ +#pragma once + +#include "column_features.h" +#include "tier_info.h" + +#include <ydb/core/tx/columnshard/common/snapshot.h> + +#include <ydb/core/sys_view/common/schema.h> +#include <ydb/core/tablet_flat/flat_dbase_scheme.h> +#include <ydb/core/tx/columnshard/common/scalars.h> +#include <ydb/core/formats/arrow/dictionary/object.h> +#include <ydb/core/formats/arrow/serializer/abstract.h> +#include <ydb/core/formats/arrow/transformer/abstract.h> +#include <ydb/core/scheme/scheme_types_proto.h> + +namespace arrow { + class Array; + class Field; + class Schema; +} + +namespace NKikimr::NArrow { + struct TSortDescription; +} + +namespace NKikimr::NOlap { + +struct TInsertedData; +class TSnapshotColumnInfo; +using TNameTypeInfo = std::pair<TString, NScheme::TTypeInfo>; + +/// Column engine index description in terms of tablet's local table. +/// We have to use YDB types for keys here. +struct TIndexInfo : public NTable::TScheme::TTableSchema { +private: + mutable THashMap<ui32, TColumnFeatures> ColumnFeatures; + mutable THashMap<ui32, std::shared_ptr<arrow::Field>> ArrowColumnByColumnIdCache; + TIndexInfo(const TString& name, ui32 id); + bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); + TColumnFeatures& GetOrCreateColumnFeatures(const ui32 columnId) const; +public: + static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step"; + static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id"; + static const TString STORE_INDEX_STATS_TABLE; + static const TString TABLE_INDEX_STATS_TABLE; + + enum class ESpecialColumn : ui32 { + PLAN_STEP = 0xffffff00, + TX_ID, + }; + + /// Appends the special columns to the batch. + static std::shared_ptr<arrow::RecordBatch> AddSpecialColumns( + const std::shared_ptr<arrow::RecordBatch>& batch, + const TSnapshot& snapshot); + + /// Makes schema as set of the special columns. + static std::shared_ptr<arrow::Schema> ArrowSchemaSnapshot(); + + /// Matches name of the filed with names of the special columns. + static bool IsSpecialColumn(const arrow::Field& field); + static bool IsSpecialColumn(const ui32 field); + static ui32 GetSpecialColumnByteWidth(const ui32 field) { + Y_VERIFY(IsSpecialColumn(field)); + return 8; + } + static bool IsSpecialColumn(const std::string& fieldName); + template <class TContainer> + static bool IsSpecialColumns(const TContainer& c) { + for (auto&& i : c) { + if (!IsSpecialColumn(i)) { + return false; + } + } + return true; + } + + bool CheckAlterScheme(const NKikimrSchemeOp::TColumnTableSchema& scheme) const; +public: + + static TIndexInfo BuildDefault() { + TIndexInfo result("dummy", 0); + return result; + } + + static std::optional<TIndexInfo> BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); + + /// Returns id of the index. + ui32 GetId() const noexcept { + return Id; + } + + std::shared_ptr<arrow::Schema> GetColumnSchema(const ui32 columnId) const; + TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const; + std::shared_ptr<TColumnLoader> GetColumnLoader(const ui32 columnId) const; + + /// Returns an id of the column located by name. The name should exists in the schema. + ui32 GetColumnId(const std::string& name) const; + std::optional<ui32> GetColumnIdOptional(const std::string& name) const; + + /// Returns a name of the column located by id. + TString GetColumnName(ui32 id, bool required = true) const; + + /// Returns names of columns defined by the specific ids. + std::vector<TString> GetColumnNames(const std::vector<ui32>& ids) const; + std::vector<ui32> GetColumnIds() const; + + /// Returns info of columns defined by specific ids. + std::vector<TNameTypeInfo> GetColumns(const std::vector<ui32>& ids) const; + + /// Traditional Primary Key (includes uniqueness, search and sorting logic) + std::vector<TNameTypeInfo> GetPrimaryKey() const { + return GetColumns(KeyColumns); + } + + /// Returns id of the first column of the primary key. + ui32 GetPKFirstColumnId() const { + Y_VERIFY(KeyColumns.size()); + return KeyColumns[0]; + } + + // Sorting key: could be less or greater then traditional PK + // It could be empty for append-only tables. It could be greater then PK for better columns compression. + // If sorting key includes uniqueness key as a prefix we are able to use MergeSort for REPLACE. + const std::shared_ptr<arrow::Schema>& GetSortingKey() const { return SortingKey; } + const std::shared_ptr<arrow::Schema>& GetReplaceKey() const { return ReplaceKey; } + const std::shared_ptr<arrow::Schema>& GetExtendedKey() const { return ExtendedKey; } + const std::shared_ptr<arrow::Schema>& GetIndexKey() const { return IndexKey; } + + /// Initializes sorting, replace, index and extended keys. + void SetAllKeys(); + + void CheckTtlColumn(const TString& ttlColumn) const { + Y_VERIFY(!ttlColumn.empty()); + Y_VERIFY(MinMaxIdxColumnsIds.contains(GetColumnId(ttlColumn))); + } + + std::vector<ui32> GetColumnIds(const std::vector<TString>& columnNames) const; + + std::shared_ptr<arrow::Schema> ArrowSchema() const; + std::shared_ptr<arrow::Schema> ArrowSchemaWithSpecials() const; + std::shared_ptr<arrow::Schema> AddColumns(const std::shared_ptr<arrow::Schema>& schema, + const std::vector<TString>& columns) const; + + std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<ui32>& columnIds, bool withSpecials = false) const; + std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<TString>& columnNames) const; + std::shared_ptr<arrow::Field> ArrowColumnField(ui32 columnId) const; + + const THashSet<TString>& GetRequiredColumns() const { + return RequiredColumns; + } + + const THashSet<ui32>& GetMinMaxIdxColumns() const { + return MinMaxIdxColumnsIds; + } + + bool AllowTtlOverColumn(const TString& name) const; + + /// Returns whether the sorting keys defined. + bool IsSorted() const { return SortingKey.get(); } + + /// Returns whether the replace keys defined. + bool IsReplacing() const { return ReplaceKey.get(); } + + bool IsCompositeIndexKey() const { + return CompositeIndexKey; + } + + std::shared_ptr<NArrow::TSortDescription> SortDescription() const; + std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription() const; + + static const std::vector<std::string>& GetSpecialColumnNames() { + static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) }; + return result; + } + + static const std::vector<ui32>& GetSpecialColumnIds() { + static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID }; + return result; + } + +private: + ui32 Id; + TString Name; + bool CompositeIndexKey = false; + mutable std::shared_ptr<arrow::Schema> Schema; + mutable std::shared_ptr<arrow::Schema> SchemaWithSpecials; + std::shared_ptr<arrow::Schema> SortingKey; + std::shared_ptr<arrow::Schema> ReplaceKey; + std::shared_ptr<arrow::Schema> ExtendedKey; // Extend PK with snapshot columns to allow old shapshot reads + std::shared_ptr<arrow::Schema> IndexKey; + THashSet<TString> RequiredColumns; + THashSet<ui32> MinMaxIdxColumnsIds; + std::optional<NArrow::TCompression> DefaultCompression; + bool CompositeMarks = false; +}; + +std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, bool withSpecials = false); + +/// Extracts columns with the specific ids from the schema. +std::vector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const std::vector<ui32>& ids); + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp b/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp new file mode 100644 index 0000000000..8114b5f11a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp @@ -0,0 +1,5 @@ +#include "tier_info.h" + +namespace NKikimr::NOlap { + +} diff --git a/ydb/core/tx/columnshard/engines/scheme/tier_info.h b/ydb/core/tx/columnshard/engines/scheme/tier_info.h new file mode 100644 index 0000000000..b05a1d7655 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/tier_info.h @@ -0,0 +1,234 @@ +#pragma once + +#include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/common/validation.h> +#include <ydb/core/formats/arrow/serializer/abstract.h> +#include <ydb/core/formats/arrow/compression/object.h> +#include <ydb/core/tx/columnshard/common/scalars.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h> + +namespace NKikimr::NOlap { + +class TTierInfo { +private: + TString Name; + TString EvictColumnName; + TInstant EvictBorder; + bool NeedExport = false; + + ui32 TtlUnitsInSecond; + std::optional<NArrow::TCompression> Compression; + mutable std::shared_ptr<arrow::Scalar> Scalar; + +public: + TTierInfo(const TString& tierName, TInstant evictBorder, const TString& column, ui32 unitsInSecond = 0) + : Name(tierName) + , EvictColumnName(column) + , EvictBorder(evictBorder) + , TtlUnitsInSecond(unitsInSecond) + { + Y_VERIFY(!Name.empty()); + Y_VERIFY(!EvictColumnName.empty()); + } + + const TString& GetName() const { + return Name; + } + + const TString& GetEvictColumnName() const { + return EvictColumnName; + } + + const TInstant GetEvictBorder() const { + return EvictBorder; + } + + bool GetNeedExport() const { + return NeedExport; + } + + TTierInfo& SetNeedExport(const bool value) { + NeedExport = value; + return *this; + } + + TTierInfo& SetCompression(const NArrow::TCompression& value) { + Compression = value; + return *this; + } + + const std::optional<NArrow::TCompression> GetCompression() const { + if (NeedExport) { + return {}; + } + return Compression; + } + + std::shared_ptr<arrow::Field> GetEvictColumn(const std::shared_ptr<arrow::Schema>& schema) const { + return schema->GetFieldByName(EvictColumnName); + } + + std::shared_ptr<arrow::Scalar> EvictScalar(const std::shared_ptr<arrow::Schema>& schema) const { + if (Scalar) { + return Scalar; + } + auto evictColumn = GetEvictColumn(schema); + Y_VERIFY(evictColumn); + + ui32 multiplier = TtlUnitsInSecond ? TtlUnitsInSecond : 1; + switch (evictColumn->type()->id()) { + case arrow::Type::TIMESTAMP: + Scalar = std::make_shared<arrow::TimestampScalar>( + EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO)); + break; + case arrow::Type::UINT16: // YQL Date + Scalar = std::make_shared<arrow::UInt16Scalar>(EvictBorder.Days()); + break; + case arrow::Type::UINT32: // YQL Datetime or Uint32 + Scalar = std::make_shared<arrow::UInt32Scalar>(EvictBorder.Seconds() * multiplier); + break; + case arrow::Type::UINT64: + Scalar = std::make_shared<arrow::UInt64Scalar>(EvictBorder.Seconds() * multiplier); + break; + default: + break; + } + + return Scalar; + } + + static std::shared_ptr<TTierInfo> MakeTtl(TInstant ttlBorder, const TString& ttlColumn, ui32 unitsInSecond = 0) { + return std::make_shared<TTierInfo>("TTL", ttlBorder, ttlColumn, unitsInSecond); + } + + TString GetDebugString() const { + TStringBuilder sb; + sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << EvictColumnName << "' "; + if (Compression) { + sb << Compression->DebugString(); + } else { + sb << "NOT_SPECIFIED(Default)"; + } + return sb; + } +}; + +class TTierRef { +public: + TTierRef(const std::shared_ptr<TTierInfo>& tierInfo) + : Info(tierInfo) + { + Y_VERIFY(tierInfo); + } + + bool operator < (const TTierRef& b) const { + if (Info->GetEvictBorder() < b.Info->GetEvictBorder()) { + return true; + } else if (Info->GetEvictBorder() == b.Info->GetEvictBorder()) { + return Info->GetName() > b.Info->GetName(); // add stability: smaller name is hotter + } + return false; + } + + bool operator == (const TTierRef& b) const { + return Info->GetEvictBorder() == b.Info->GetEvictBorder() + && Info->GetName() == b.Info->GetName(); + } + + const TTierInfo& Get() const { + return *Info; + } + +private: + std::shared_ptr<TTierInfo> Info; +}; + +class TTiering { + using TTiersMap = THashMap<TString, std::shared_ptr<TTierInfo>>; + TTiersMap TierByName; + TSet<TTierRef> OrderedTiers; +public: + std::shared_ptr<TTierInfo> Ttl; + + const TTiersMap& GetTierByName() const { + return TierByName; + } + + const TSet<TTierRef>& GetOrderedTiers() const { + return OrderedTiers; + } + + bool HasTiers() const { + return !OrderedTiers.empty(); + } + + void Add(const std::shared_ptr<TTierInfo>& tier) { + if (HasTiers()) { + // TODO: support different ttl columns + Y_VERIFY(tier->GetEvictColumnName() == OrderedTiers.begin()->Get().GetEvictColumnName()); + } + + TierByName.emplace(tier->GetName(), tier); + OrderedTiers.emplace(tier); + } + + TString GetHottestTierName() const { + if (OrderedTiers.size()) { + return OrderedTiers.rbegin()->Get().GetName(); // hottest one + } + return {}; + } + + std::shared_ptr<arrow::Scalar> EvictScalar(const std::shared_ptr<arrow::Schema>& schema) const { + auto ttlTs = Ttl ? Ttl->EvictScalar(schema) : nullptr; + auto tierTs = OrderedTiers.empty() ? nullptr : OrderedTiers.begin()->Get().EvictScalar(schema); + if (!ttlTs) { + return tierTs; + } else if (!tierTs) { + return ttlTs; + } + return NArrow::ScalarLess(ttlTs, tierTs) ? tierTs : ttlTs; // either TTL or tier border appear + } + + std::optional<NArrow::TCompression> GetCompression(const TString& name) const { + auto it = TierByName.find(name); + if (it != TierByName.end()) { + Y_VERIFY(!name.empty()); + return it->second->GetCompression(); + } + return {}; + } + + bool NeedExport(const TString& name) const { + auto it = TierByName.find(name); + if (it != TierByName.end()) { + Y_VERIFY(!name.empty()); + return it->second->GetNeedExport(); + } + return false; + } + + THashSet<TString> GetTtlColumns() const { + THashSet<TString> out; + if (Ttl) { + out.insert(Ttl->GetEvictColumnName()); + } + for (auto& [tierName, tier] : TierByName) { + out.insert(tier->GetEvictColumnName()); + } + return out; + } + + TString GetDebugString() const { + TStringBuilder sb; + if (Ttl) { + sb << Ttl->GetDebugString() << "; "; + } + for (auto&& i : OrderedTiers) { + sb << i.Get().GetDebugString() << "; "; + } + return sb; + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/scheme/ya.make b/ydb/core/tx/columnshard/engines/scheme/ya.make index f9839bb0f1..73a7766802 100644 --- a/ydb/core/tx/columnshard/engines/scheme/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/ya.make @@ -4,6 +4,9 @@ SRCS( abstract_scheme.cpp snapshot_scheme.cpp filtered_scheme.cpp + index_info.cpp + tier_info.cpp + column_features.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/tier_info.h b/ydb/core/tx/columnshard/engines/tier_info.h index e1d7fb5309..beaf9cefc2 100644 --- a/ydb/core/tx/columnshard/engines/tier_info.h +++ b/ydb/core/tx/columnshard/engines/tier_info.h @@ -1,236 +1,3 @@ #pragma once -#include "defs.h" -#include "scalars.h" - -#include <ydb/core/formats/arrow/arrow_helpers.h> -#include <ydb/core/formats/arrow/common/validation.h> -#include <ydb/core/formats/arrow/serializer/abstract.h> -#include <ydb/core/formats/arrow/compression/object.h> -#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h> - -namespace NKikimr::NOlap { - -class TTierInfo { -private: - TString Name; - TString EvictColumnName; - TInstant EvictBorder; - bool NeedExport = false; - - ui32 TtlUnitsInSecond; - std::optional<NArrow::TCompression> Compression; - mutable std::shared_ptr<arrow::Scalar> Scalar; - -public: - TTierInfo(const TString& tierName, TInstant evictBorder, const TString& column, ui32 unitsInSecond = 0) - : Name(tierName) - , EvictColumnName(column) - , EvictBorder(evictBorder) - , TtlUnitsInSecond(unitsInSecond) - { - Y_VERIFY(!Name.empty()); - Y_VERIFY(!EvictColumnName.empty()); - } - - const TString& GetName() const { - return Name; - } - - const TString& GetEvictColumnName() const { - return EvictColumnName; - } - - const TInstant GetEvictBorder() const { - return EvictBorder; - } - - bool GetNeedExport() const { - return NeedExport; - } - - TTierInfo& SetNeedExport(const bool value) { - NeedExport = value; - return *this; - } - - TTierInfo& SetCompression(const NArrow::TCompression& value) { - Compression = value; - return *this; - } - - const std::optional<NArrow::TCompression> GetCompression() const { - if (NeedExport) { - return {}; - } - return Compression; - } - - std::shared_ptr<arrow::Field> GetEvictColumn(const std::shared_ptr<arrow::Schema>& schema) const { - return schema->GetFieldByName(EvictColumnName); - } - - std::shared_ptr<arrow::Scalar> EvictScalar(const std::shared_ptr<arrow::Schema>& schema) const { - if (Scalar) { - return Scalar; - } - auto evictColumn = GetEvictColumn(schema); - Y_VERIFY(evictColumn); - - ui32 multiplier = TtlUnitsInSecond ? TtlUnitsInSecond : 1; - switch (evictColumn->type()->id()) { - case arrow::Type::TIMESTAMP: - Scalar = std::make_shared<arrow::TimestampScalar>( - EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO)); - break; - case arrow::Type::UINT16: // YQL Date - Scalar = std::make_shared<arrow::UInt16Scalar>(EvictBorder.Days()); - break; - case arrow::Type::UINT32: // YQL Datetime or Uint32 - Scalar = std::make_shared<arrow::UInt32Scalar>(EvictBorder.Seconds() * multiplier); - break; - case arrow::Type::UINT64: - Scalar = std::make_shared<arrow::UInt64Scalar>(EvictBorder.Seconds() * multiplier); - break; - default: - break; - } - - return Scalar; - } - - static std::shared_ptr<TTierInfo> MakeTtl(TInstant ttlBorder, const TString& ttlColumn, ui32 unitsInSecond = 0) { - return std::make_shared<TTierInfo>("TTL", ttlBorder, ttlColumn, unitsInSecond); - } - - TString GetDebugString() const { - TStringBuilder sb; - sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << EvictColumnName << "' "; - if (Compression) { - sb << Compression->DebugString(); - } else { - sb << "NOT_SPECIFIED(Default)"; - } - return sb; - } -}; - -class TTierRef { -public: - TTierRef(const std::shared_ptr<TTierInfo>& tierInfo) - : Info(tierInfo) - { - Y_VERIFY(tierInfo); - } - - bool operator < (const TTierRef& b) const { - if (Info->GetEvictBorder() < b.Info->GetEvictBorder()) { - return true; - } else if (Info->GetEvictBorder() == b.Info->GetEvictBorder()) { - return Info->GetName() > b.Info->GetName(); // add stability: smaller name is hotter - } - return false; - } - - bool operator == (const TTierRef& b) const { - return Info->GetEvictBorder() == b.Info->GetEvictBorder() - && Info->GetName() == b.Info->GetName(); - } - - const TTierInfo& Get() const { - return *Info; - } - -private: - std::shared_ptr<TTierInfo> Info; -}; - -class TTiering { - using TTiersMap = THashMap<TString, std::shared_ptr<TTierInfo>>; - TTiersMap TierByName; - TSet<TTierRef> OrderedTiers; -public: - std::shared_ptr<TTierInfo> Ttl; - - const TTiersMap& GetTierByName() const { - return TierByName; - } - - const TSet<TTierRef>& GetOrderedTiers() const { - return OrderedTiers; - } - - bool HasTiers() const { - return !OrderedTiers.empty(); - } - - void Add(const std::shared_ptr<TTierInfo>& tier) { - if (HasTiers()) { - // TODO: support different ttl columns - Y_VERIFY(tier->GetEvictColumnName() == OrderedTiers.begin()->Get().GetEvictColumnName()); - } - - TierByName.emplace(tier->GetName(), tier); - OrderedTiers.emplace(tier); - } - - TString GetHottestTierName() const { - if (OrderedTiers.size()) { - return OrderedTiers.rbegin()->Get().GetName(); // hottest one - } - return {}; - } - - std::shared_ptr<arrow::Scalar> EvictScalar(const std::shared_ptr<arrow::Schema>& schema) const { - auto ttlTs = Ttl ? Ttl->EvictScalar(schema) : nullptr; - auto tierTs = OrderedTiers.empty() ? nullptr : OrderedTiers.begin()->Get().EvictScalar(schema); - if (!ttlTs) { - return tierTs; - } else if (!tierTs) { - return ttlTs; - } - return NArrow::ScalarLess(ttlTs, tierTs) ? tierTs : ttlTs; // either TTL or tier border appear - } - - std::optional<NArrow::TCompression> GetCompression(const TString& name) const { - auto it = TierByName.find(name); - if (it != TierByName.end()) { - Y_VERIFY(!name.empty()); - return it->second->GetCompression(); - } - return {}; - } - - bool NeedExport(const TString& name) const { - auto it = TierByName.find(name); - if (it != TierByName.end()) { - Y_VERIFY(!name.empty()); - return it->second->GetNeedExport(); - } - return false; - } - - THashSet<TString> GetTtlColumns() const { - THashSet<TString> out; - if (Ttl) { - out.insert(Ttl->GetEvictColumnName()); - } - for (auto& [tierName, tier] : TierByName) { - out.insert(tier->GetEvictColumnName()); - } - return out; - } - - TString GetDebugString() const { - TStringBuilder sb; - if (Ttl) { - sb << Ttl->GetDebugString() << "; "; - } - for (auto&& i : OrderedTiers) { - sb << i.Get().GetDebugString() << "; "; - } - return sb; - } -}; - -} +#include "scheme/tier_info.h" diff --git a/ydb/core/tx/columnshard/engines/ya.make b/ydb/core/tx/columnshard/engines/ya.make index a2582a1657..e409fd8830 100644 --- a/ydb/core/tx/columnshard/engines/ya.make +++ b/ydb/core/tx/columnshard/engines/ya.make @@ -15,7 +15,6 @@ SRCS( index_logic_logs.cpp filter.cpp portion_info.cpp - scalars.cpp tier_info.cpp ) @@ -31,6 +30,7 @@ PEERDIR( ydb/core/tx/columnshard/engines/predicate ydb/core/tx/columnshard/engines/storage ydb/core/tx/columnshard/engines/insert_table + ydb/core/tx/columnshard/engines/portions ydb/core/formats/arrow/compression ydb/core/tx/program @@ -38,7 +38,6 @@ PEERDIR( ydb/library/yql/public/udf/service/exception_policy ) -GENERATE_ENUM_SERIALIZATION(portion_info.h) YQL_LAST_ABI_VERSION() END() |