diff options
| author | ivanmorozov <[email protected]> | 2023-08-11 16:58:24 +0300 |
|---|---|---|
| committer | ivanmorozov <[email protected]> | 2023-08-11 18:18:14 +0300 |
| commit | e54d370f82947014c450750f266e5696241decc0 (patch) | |
| tree | 06d353a1a6b06997c7657aaaa10d4f05e32ac843 | |
| parent | 98300f1884159e59d7cad62ce0bcff3d89100a7d (diff) | |
KIKIMR-18932: split portion file for meta/portion and with_blobs in future
12 files changed, 188 insertions, 148 deletions
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract.h index d695592bd52..6217608c81e 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract.h @@ -1,5 +1,6 @@ #pragma once #include "mark.h" +#include <ydb/core/tx/columnshard/counters/indexation.h> #include <ydb/core/tx/columnshard/engines/columns_table.h> #include <ydb/core/tx/columnshard/engines/portions/portion_info.h> #include <ydb/core/protos/counters_columnshard.pb.h> diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp index f7ba7bdc5e4..3ab41a14645 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp +++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp @@ -192,7 +192,7 @@ bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfo& portionInfo, TP auto field = resultSchema->GetFieldByIndex(pos); auto columnSaver = resultSchema->GetColumnSaver(rec.ColumnId, saverContext); - auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(field->name()), field, columnSaver); + auto blob = columnSaver.Apply(batch->GetColumnByName(field->name()), field); if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) { portionInfo = undo; newBlobs.resize(undoSize); diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index 00e1ebbd917..a6894d538c4 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -1,6 +1,8 @@ #pragma once #include "abstract.h" #include <util/generic/hash.h> +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> +#include <ydb/core/tx/columnshard/engines/scheme/tier_info.h> namespace NKikimr::NOlap { diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt index 914aa5278b0..dced170ff9f 100644 --- a/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt @@ -18,11 +18,14 @@ target_link_libraries(columnshard-engines-portions PUBLIC contrib-libs-cxxsupp yutil columnshard-engines-scheme + tx-columnshard-splitter + tx-columnshard-common tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-portions PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/meta.cpp ) generate_enum_serilization(columnshard-engines-portions ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt index 86efcb228b1..10b9a0177de 100644 --- a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt @@ -19,11 +19,14 @@ target_link_libraries(columnshard-engines-portions PUBLIC contrib-libs-cxxsupp yutil columnshard-engines-scheme + tx-columnshard-splitter + tx-columnshard-common tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-portions PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/meta.cpp ) generate_enum_serilization(columnshard-engines-portions ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt index 86efcb228b1..10b9a0177de 100644 --- a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt @@ -19,11 +19,14 @@ target_link_libraries(columnshard-engines-portions PUBLIC contrib-libs-cxxsupp yutil columnshard-engines-scheme + tx-columnshard-splitter + tx-columnshard-common tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-portions PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/meta.cpp ) generate_enum_serilization(columnshard-engines-portions ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt index 914aa5278b0..dced170ff9f 100644 --- a/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt @@ -18,11 +18,14 @@ target_link_libraries(columnshard-engines-portions PUBLIC contrib-libs-cxxsupp yutil columnshard-engines-scheme + tx-columnshard-splitter + tx-columnshard-common tools-enum_parser-enum_serialization_runtime ) target_sources(columnshard-engines-portions PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/meta.cpp ) generate_enum_serilization(columnshard-engines-portions ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp new file mode 100644 index 00000000000..ac2c17ad662 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp @@ -0,0 +1,5 @@ +#include "meta.h" + +namespace NKikimr::NOlap { + +} diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h new file mode 100644 index 00000000000..8095be85cae --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/meta.h @@ -0,0 +1,91 @@ +#pragma once +#include <ydb/core/tx/columnshard/common/portion.h> +#include <ydb/core/formats/arrow/replace_key.h> +#include <util/stream/output.h> + +namespace NKikimr::NOlap { + +struct TPortionMeta { + using EProduced = NPortion::EProduced; + + struct TColumnMeta { + ui32 NumRows{0}; + ui32 RawBytes{0}; + std::shared_ptr<arrow::Scalar> Min; + std::shared_ptr<arrow::Scalar> Max; + + bool HasMinMax() const noexcept { + return Min.get() && Max.get(); + } + }; + + EProduced GetProduced() const { + return Produced; + } + + EProduced Produced{EProduced::UNSPECIFIED}; + THashMap<ui32, TColumnMeta> ColumnMeta; + ui32 FirstPkColumn = 0; + std::shared_ptr<arrow::RecordBatch> ReplaceKeyEdges; // first and last PK rows + std::optional<NArrow::TReplaceKey> IndexKeyStart; + std::optional<NArrow::TReplaceKey> IndexKeyEnd; + + TString DebugString() const { + return TStringBuilder() << + "produced:" << Produced << ";" + ; + } + + bool HasMinMax(ui32 columnId) const { + if (!ColumnMeta.contains(columnId)) { + return false; + } + return ColumnMeta.find(columnId)->second.HasMinMax(); + } + + bool HasPkMinMax() const { + return HasMinMax(FirstPkColumn); + } + + ui32 NumRows() const { + if (FirstPkColumn) { + Y_VERIFY(ColumnMeta.contains(FirstPkColumn)); + return ColumnMeta.find(FirstPkColumn)->second.NumRows; + } + return 0; + } + + friend IOutputStream& operator << (IOutputStream& out, const TPortionMeta& info) { + out << "reason" << (ui32)info.Produced; + for (const auto& [_, meta] : info.ColumnMeta) { + if (meta.NumRows) { + out << " " << meta.NumRows << " rows"; + break; + } + } + return out; + } +}; + +class TPortionAddress { +private: + YDB_READONLY(ui64, GranuleId, 0); + YDB_READONLY(ui64, PortionId, 0); +public: + TPortionAddress(const ui64 granuleId, const ui64 portionId) + : GranuleId(granuleId) + , PortionId(portionId) + { + + } + + bool operator<(const TPortionAddress& item) const { + return std::tie(GranuleId, PortionId) < std::tie(item.GranuleId, item.PortionId); + } + + bool operator==(const TPortionAddress& item) const { + return std::tie(GranuleId, PortionId) == std::tie(item.GranuleId, item.PortionId); + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 9670b34dba2..1f152049fc2 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -1,22 +1,29 @@ #include "portion_info.h" -#include <ydb/core/protos/tx_columnshard.pb.h> +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> #include <ydb/core/formats/arrow/arrow_filter.h> namespace NKikimr::NOlap { -TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array, - const std::shared_ptr<arrow::Field>& field, - const TColumnSaver saver) { - auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{ field }); - auto batch = arrow::RecordBatch::Make(schema, array->length(), { array }); - Y_VERIFY(batch); - - return saver.Apply(batch); -} - -void TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record) { - record.Chunk = 0; +const TColumnRecord& TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record) { + Y_VERIFY(record.ColumnId); + std::optional<ui32> maxChunk; + for (auto&& i : Records) { + if (i.ColumnId == record.ColumnId) { + if (!maxChunk) { + maxChunk = i.Chunk; + } else { + Y_VERIFY(*maxChunk + 1 == i.Chunk); + maxChunk = i.Chunk; + } + } + } + if (maxChunk) { + Y_VERIFY(*maxChunk + 1 == record.Chunk); + } else { + Y_VERIFY(0 == record.Chunk); + } Records.emplace_back(std::move(record)); + return Records.back(); } void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) { @@ -242,6 +249,48 @@ TPortionInfo TPortionInfo::CopyWithFilteredColumns(const THashSet<ui32>& columnI return result; } +ui64 TPortionInfo::GetRawBytes(const std::vector<ui32>& columnIds) const { + ui64 sum = 0; + const ui32 numRows = NumRows(); + for (auto&& i : columnIds) { + if (TIndexInfo::IsSpecialColumn(i)) { + sum += numRows * TIndexInfo::GetSpecialColumnByteWidth(i); + } else { + auto it = Meta.ColumnMeta.find(i); + if (it != Meta.ColumnMeta.end()) { + sum += it->second.RawBytes; + } + } + } + return sum; +} + +int TPortionInfo::CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const { + return CompareByColumnIdsImpl<TMaxGetter, TMinGetter>(item, info.KeyColumns); +} + +int TPortionInfo::CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const { + return CompareMinByColumnIds(item, info.KeyColumns); +} + +TString TPortionInfo::DebugString() const { + TStringBuilder sb; + sb << "(portion_id:" << Portion << ";" << + "granule_id:" << Granule << ";records_count:" << NumRows() << ";" + "min_snapshot:(" << MinSnapshot.DebugString() << ");" << + "size:" << BlobsBytes() << ";" << + "meta:(" << Meta.DebugString() << ");"; + if (RemoveSnapshot.Valid()) { + sb << "remove_snapshot:(" << RemoveSnapshot.DebugString() << ");"; + } + sb << "meta:(" << Meta << ");"; + sb << "chunks:(" << Records.size() << ");"; + if (TierName) { + sb << "tier:" << TierName << ";"; + } + return sb << ")"; +} + std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const { Y_VERIFY(!Blobs.empty()); diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index ab2ab681821..455e1024972 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -1,99 +1,14 @@ #pragma once #include "column_record.h" - -#include <ydb/core/formats/arrow/replace_key.h> -#include <ydb/core/formats/arrow/serializer/abstract.h> -#include <ydb/core/formats/arrow/dictionary/conversion.h> -#include <ydb/core/tx/columnshard/common/portion.h> -#include <ydb/core/tx/columnshard/counters/indexation.h> +#include "meta.h" +#include <ydb/core/tx/columnshard/common/snapshot.h> +#include <ydb/core/tx/columnshard/engines/scheme/column_features.h> #include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> -#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> #include <ydb/library/yverify_stream/yverify_stream.h> namespace NKikimr::NOlap { -struct TPortionMeta { - using EProduced = NPortion::EProduced; - - struct TColumnMeta { - ui32 NumRows{0}; - ui32 RawBytes{0}; - std::shared_ptr<arrow::Scalar> Min; - std::shared_ptr<arrow::Scalar> Max; - - bool HasMinMax() const noexcept { - return Min.get() && Max.get(); - } - }; - - EProduced GetProduced() const { - return Produced; - } - - EProduced Produced{EProduced::UNSPECIFIED}; - THashMap<ui32, TColumnMeta> ColumnMeta; - ui32 FirstPkColumn = 0; - std::shared_ptr<arrow::RecordBatch> ReplaceKeyEdges; // first and last PK rows - std::optional<NArrow::TReplaceKey> IndexKeyStart; - std::optional<NArrow::TReplaceKey> IndexKeyEnd; - - TString DebugString() const { - return TStringBuilder() << - "produced:" << Produced << ";" - ; - } - - bool HasMinMax(ui32 columnId) const { - if (!ColumnMeta.contains(columnId)) { - return false; - } - return ColumnMeta.find(columnId)->second.HasMinMax(); - } - - bool HasPkMinMax() const { - return HasMinMax(FirstPkColumn); - } - - ui32 NumRows() const { - if (FirstPkColumn) { - Y_VERIFY(ColumnMeta.contains(FirstPkColumn)); - return ColumnMeta.find(FirstPkColumn)->second.NumRows; - } - return 0; - } - - friend IOutputStream& operator << (IOutputStream& out, const TPortionMeta& info) { - out << "reason" << (ui32)info.Produced; - for (const auto& [_, meta] : info.ColumnMeta) { - if (meta.NumRows) { - out << " " << meta.NumRows << " rows"; - break; - } - } - return out; - } -}; - -class TPortionAddress { -private: - YDB_READONLY(ui64, GranuleId, 0); - YDB_READONLY(ui64, PortionId, 0); -public: - TPortionAddress(const ui64 granuleId, const ui64 portionId) - : GranuleId(granuleId) - , PortionId(portionId) - { - - } - - bool operator<(const TPortionAddress& item) const { - return std::tie(GranuleId, PortionId) < std::tie(item.GranuleId, item.PortionId); - } - - bool operator==(const TPortionAddress& item) const { - return std::tie(GranuleId, PortionId) == std::tie(item.GranuleId, item.PortionId); - } -}; +struct TIndexInfo; struct TPortionInfo { private: @@ -138,14 +53,7 @@ public: } - TString DebugString() const { - return TStringBuilder() << - "portion_id:" << Portion << ";" << - "granule_id:" << Granule << ";" << - "min_snapshot:" << MinSnapshot.DebugString() << ";" << - "meta:(" << Meta.DebugString() << ");" - ; - } + TString DebugString() const; bool CheckForCleanup(const TSnapshot& snapshot) const { if (!CheckForCleanup()) { @@ -278,21 +186,7 @@ public: return Meta.NumRows(); } - ui64 GetRawBytes(const std::vector<ui32>& columnIds) const { - ui64 sum = 0; - const ui32 numRows = NumRows(); - for (auto&& i : columnIds) { - if (TIndexInfo::IsSpecialColumn(i)) { - sum += numRows * TIndexInfo::GetSpecialColumnByteWidth(i); - } else { - auto it = Meta.ColumnMeta.find(i); - if (it != Meta.ColumnMeta.end()) { - sum += it->second.RawBytes; - } - } - } - return sum; - } + ui64 GetRawBytes(const std::vector<ui32>& columnIds) const; ui64 RawBytesSum() const { ui64 sum = 0; @@ -336,13 +230,9 @@ private: return 0; } public: - int CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const { - return CompareByColumnIdsImpl<TMaxGetter, TMinGetter>(item, info.KeyColumns); - } + int CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const; - int CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const { - return CompareMinByColumnIds(item, info.KeyColumns); - } + int CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const; int CompareMinByColumnIds(const TPortionInfo& item, const std::vector<ui32>& columnIds) const { return CompareByColumnIdsImpl<TMinGetter>(item, columnIds); @@ -481,7 +371,7 @@ public: auto pos = dataSchema.GetFieldIndex(rec.ColumnId); Y_ASSERT(pos >= 0); positionsMap[resulPos] = pos; - columnChunks[resulPos][rec.Chunk] = rec.BlobRange; + Y_VERIFY(columnChunks[resulPos].emplace(rec.Chunk, rec.BlobRange).second); auto columnMeta = Meta.ColumnMeta.FindPtr(rec.ColumnId); if (columnMeta) { Y_VERIFY_S(rowsCount == columnMeta->NumRows, TStringBuilder() << "Inconsistent rows " << rowsCount << "/" << columnMeta->NumRows); @@ -524,23 +414,10 @@ public: return batch; } - static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array, - const std::shared_ptr<arrow::Field>& field, - const TColumnSaver saver); - - void AppendOneChunkColumn(TColumnRecord&& record); + const TColumnRecord& AppendOneChunkColumn(TColumnRecord&& record); friend IOutputStream& operator << (IOutputStream& out, const TPortionInfo& info) { - for (auto& rec : info.Records) { - out << " " << rec; - out << " (1 of " << info.Records.size() << " blobs shown)"; - break; - } - out << ";activity=" << info.IsActive() << ";"; - if (!info.TierName.empty()) { - out << " tier: " << info.TierName; - } - out << " " << info.Meta; + out << info.DebugString(); return out; } }; diff --git a/ydb/core/tx/columnshard/engines/portions/ya.make b/ydb/core/tx/columnshard/engines/portions/ya.make index c1be81651a1..fa0b38aaba0 100644 --- a/ydb/core/tx/columnshard/engines/portions/ya.make +++ b/ydb/core/tx/columnshard/engines/portions/ya.make @@ -3,10 +3,13 @@ LIBRARY() SRCS( portion_info.cpp column_record.cpp + meta.cpp ) PEERDIR( ydb/core/tx/columnshard/engines/scheme + ydb/core/tx/columnshard/splitter + ydb/core/tx/columnshard/common ) GENERATE_ENUM_SERIALIZATION(portion_info.h) |
