diff options
author | chertus <azuikov@ydb.tech> | 2023-04-12 19:12:13 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-04-12 19:12:13 +0300 |
commit | 73b3c1e86835d644f21ae58280ad4974bba75f05 (patch) | |
tree | ddfffc6dbc556638a1fe7afab30783e5427a8f02 | |
parent | a5e044c5e12f6054fc4ab00c6176ca403f53d546 (diff) | |
download | ydb-73b3c1e86835d644f21ae58280ad4974bba75f05.tar.gz |
further TMark refactoring
-rw-r--r-- | ydb/core/formats/arrow_helpers.cpp | 11 | ||||
-rw-r--r-- | ydb/core/formats/arrow_helpers.h | 6 | ||||
-rw-r--r-- | ydb/core/formats/replace_key.h | 61 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_costs.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.cpp | 42 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.h | 54 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 161 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 95 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/granules_table.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.h | 23 |
11 files changed, 282 insertions, 192 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index 9e7efe684a5..4f65458fd4c 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -754,6 +754,17 @@ i64 LowerBound(const std::shared_ptr<arrow::Array>& array, const arrow::Scalar& return pos; } +// TODO: implement +i64 LowerBound(const std::shared_ptr<arrow::RecordBatch>& batch, const TReplaceKey& key, i64 offset) { + Y_VERIFY(batch->num_columns() == 1); + Y_VERIFY(key.Size() == 1); + + auto res = key.Column(0).GetScalar(key.GetPosition()); + Y_VERIFY_OK(res.status()); + Y_VERIFY(*res); + return LowerBound(batch->column(0), *(*res), offset); +} + std::shared_ptr<arrow::UInt64Array> MakeUI64Array(ui64 value, i64 size) { auto res = arrow::MakeArrayFromScalar(arrow::UInt64Scalar(value), size); Y_VERIFY(res.ok()); diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index b2570a09532..23361c5ef78 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -10,6 +10,11 @@ namespace NKikimr::NArrow { +using TArrayVec = std::vector<std::shared_ptr<arrow::Array>>; +template<typename T> +class TReplaceKeyTemplate; +using TReplaceKey = TReplaceKeyTemplate<std::shared_ptr<TArrayVec>>; + // Arrow inrernally keeps references to Buffer objects with the data // This helper class implements arrow::Buffer over TString that owns // the actual memory @@ -105,6 +110,7 @@ std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2) std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2, size_t& count); TVector<TString> ColumnNames(const std::shared_ptr<arrow::Schema>& schema); i64 LowerBound(const std::shared_ptr<arrow::Array>& column, const arrow::Scalar& value, i64 offset = 0); +i64 LowerBound(const std::shared_ptr<arrow::RecordBatch>& batch, const TReplaceKey& key, i64 offset = 0); bool ReserveData(arrow::ArrayBuilder& builder, const size_t size); enum class ECompareType { LESS = 1, diff --git a/ydb/core/formats/replace_key.h b/ydb/core/formats/replace_key.h index 3f27dd1982f..195cc4e5f75 100644 --- a/ydb/core/formats/replace_key.h +++ b/ydb/core/formats/replace_key.h @@ -5,9 +5,15 @@ namespace NKikimr::NArrow { +bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x); + +using TArrayVec = std::vector<std::shared_ptr<arrow::Array>>; + template<typename TArrayVecPtr> class TReplaceKeyTemplate { public: + static constexpr bool IsOwning = std::is_same_v<TArrayVecPtr, std::shared_ptr<TArrayVec>>; + TReplaceKeyTemplate(TArrayVecPtr columns, int position) : Columns(columns) , Position(position) @@ -15,6 +21,14 @@ public: Y_VERIFY_DEBUG(Size() > 0 && Position < Column(0).length()); } + template<typename T = TArrayVecPtr> requires IsOwning + TReplaceKeyTemplate(TArrayVec&& columns, int position) + : Columns(std::make_shared<TArrayVec>(std::move(columns))) + , Position(position) + { + Y_VERIFY_DEBUG(Size() > 0 && Position < Column(0).length()); + } + size_t Hash() const { return TypedHash(Column(0), Position, Column(0).type_id()); } @@ -78,6 +92,7 @@ public: } int Size() const { + Y_VERIFY_DEBUG(Columns); return Columns->size(); } @@ -86,12 +101,53 @@ public: } const arrow::Array& Column(int i) const { + Y_VERIFY_DEBUG(Columns); return *(*Columns)[i]; } + template<typename T = TArrayVecPtr> requires IsOwning + static TReplaceKeyTemplate<TArrayVecPtr> FromBatch(const std::shared_ptr<arrow::RecordBatch>& batch, + const std::shared_ptr<arrow::Schema>& key, int row) { + Y_VERIFY(key->num_fields() <= batch->num_columns()); + + TArrayVec columns; + columns.reserve(key->num_fields()); + for (int i = 0; i < key->num_fields(); ++i) { + auto& keyField = key->field(i); + auto array = batch->GetColumnByName(keyField->name()); + Y_VERIFY(array); + Y_VERIFY(keyField->type()->Equals(array->type())); + columns.push_back(array); + } + + return TReplaceKeyTemplate<TArrayVecPtr>(std::move(columns), row); + } + + template<typename T = TArrayVecPtr> requires IsOwning + static TReplaceKeyTemplate<TArrayVecPtr> FromBatch(const std::shared_ptr<arrow::RecordBatch>& batch, int row) { + auto columns = std::make_shared<TArrayVec>(batch->columns()); + return TReplaceKeyTemplate<TArrayVecPtr>(columns, row); + } + + static TReplaceKeyTemplate<TArrayVecPtr> FromScalar(const std::shared_ptr<arrow::Scalar>& s) { + Y_VERIFY_DEBUG(IsGoodScalar(s)); + auto res = MakeArrayFromScalar(*s, 1); + Y_VERIFY(res.status().ok(), "%s", res.status().ToString().c_str()); + return TReplaceKeyTemplate<TArrayVecPtr>(std::make_shared<TArrayVec>(1, *res), 0); + } + + static std::shared_ptr<arrow::Scalar> ToScalar(const TReplaceKeyTemplate<TArrayVecPtr>& key) { + Y_VERIFY_DEBUG(key.Size() == 1); + auto& column = key.Column(0); + auto res = column.GetScalar(key.GetPosition()); + Y_VERIFY(res.status().ok(), "%s", res.status().ToString().c_str()); + Y_VERIFY_DEBUG(IsGoodScalar(*res)); + return *res; + } + private: - TArrayVecPtr Columns; - int Position; + TArrayVecPtr Columns = nullptr; + int Position = 0; static size_t TypedHash(const arrow::Array& ar, int pos, arrow::Type::type typeId) { switch (typeId) { @@ -269,7 +325,6 @@ private: } }; -using TArrayVec = std::vector<std::shared_ptr<arrow::Array>>; using TReplaceKey = TReplaceKeyTemplate<std::shared_ptr<TArrayVec>>; using TRawReplaceKey = TReplaceKeyTemplate<const TArrayVec*>; diff --git a/ydb/core/tx/columnshard/columnshard_costs.cpp b/ydb/core/tx/columnshard/columnshard_costs.cpp index ff0c0781d58..7e45a37043e 100644 --- a/ydb/core/tx/columnshard/columnshard_costs.cpp +++ b/ydb/core/tx/columnshard/columnshard_costs.cpp @@ -9,7 +9,11 @@ namespace NKikimr::NOlap::NCosts { void TKeyRangesBuilder::AddMarkFromGranule(const TGranuleRecord& record) { - Constructor.StartRecord(true).AddRecordValue(record.Mark); + auto& key = record.Mark; + Y_VERIFY(key.Size() == 1); + auto res = key.Column(0).GetScalar(key.GetPosition()); + Y_VERIFY(res.status().ok()); + Constructor.StartRecord(true).AddRecordValue(*res); Features.emplace_back(TMarkRangeFeatures()); } diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp index d24839efc8a..16a55b847c5 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine.cpp @@ -1,6 +1,48 @@ #include "column_engine.h" #include <util/stream/output.h> +namespace NKikimr::NOlap { + +TString TMark::Serialize(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema) { + Y_VERIFY_DEBUG(key.Size() > 0); + if (key.Size() == 1) { + Y_VERIFY_S(key.Column(0).type()->Equals(schema->field(0)->type()), + key.Column(0).type()->ToString() + ", expected " + schema->ToString()); + return SerializeKeyScalar(NArrow::TReplaceKey::ToScalar(key)); + } else { + Y_FAIL("not implemented"); // TODO + } +} + +NArrow::TReplaceKey TMark::Deserialize(const TString& key, const std::shared_ptr<arrow::Schema>& schema) { + Y_VERIFY_DEBUG(schema->num_fields() > 0); + if (schema->num_fields() == 1) { + return NArrow::TReplaceKey::FromScalar(DeserializeKeyScalar(key, schema->field(0)->type())); + } else { + Y_FAIL("not implemented"); // TODO + } +} + +std::string TMark::ToString() const { + Y_VERIFY_DEBUG(Border.Size() == 1); + return NArrow::TReplaceKey::ToScalar(Border)->ToString(); +} + +std::shared_ptr<arrow::Scalar> TMark::MinScalar(const std::shared_ptr<arrow::DataType>& type) { + if (type->id() == arrow::Type::TIMESTAMP) { + // TODO: support negative timestamps in index + return std::make_shared<arrow::TimestampScalar>(0, type); + } + return NArrow::MinScalar(type); +} + +NArrow::TReplaceKey TMark::MinBorder(const std::shared_ptr<arrow::Schema>& schema) { + Y_VERIFY_DEBUG(schema->num_fields() == 1); + return NArrow::TReplaceKey::FromScalar(MinScalar(schema->field(0)->type())); +} + +} + template <> void Out<NKikimr::NOlap::TColumnEngineChanges>(IOutputStream& out, TTypeTraits<NKikimr::NOlap::TColumnEngineChanges>::TFuncParam changes) { if (ui32 switched = changes.SwitchedPortions.size()) { diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index bb0ef9d7c88..58cbf391acf 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -8,6 +8,7 @@ #include "columns_table.h" #include "granules_table.h" +#include <ydb/core/formats/replace_key.h> #include <ydb/core/tx/columnshard/blob.h> namespace NKikimr::NOlap { @@ -29,6 +30,55 @@ struct TCompactionLimits { ui32 InGranuleCompactSeconds{2 * 60}; // Trigger in-granule comcation to guarantee no PK intersections }; +struct TMark { + /// @note It's possible to share columns in TReplaceKey between multiple marks: + /// read all marks as a batch; create TMark for each row + NArrow::TReplaceKey Border; + + explicit TMark(const NArrow::TReplaceKey& key) + : Border(key) + {} + + explicit TMark(const std::shared_ptr<arrow::Schema>& schema) + : Border(MinBorder(schema)) + {} + + TMark(const TString& key, const std::shared_ptr<arrow::Schema>& schema) + : Border(Deserialize(key, schema)) + {} + + TMark(const TMark& m) = default; + TMark& operator = (const TMark& m) = default; + + bool operator == (const TMark& m) const { + return Border == m.Border; + } + + std::partial_ordering operator <=> (const TMark& m) const { + return Border <=> m.Border; + } + + ui64 Hash() const { + return Border.Hash(); + } + + operator size_t () const { + return Hash(); + } + + operator bool () const { + Y_FAIL("unexpected call"); + } + + static TString Serialize(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema); + static NArrow::TReplaceKey Deserialize(const TString& key, const std::shared_ptr<arrow::Schema>& schema); + std::string ToString() const; + +private: + static std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type); + static NArrow::TReplaceKey MinBorder(const std::shared_ptr<arrow::Schema>& schema); +}; + struct TCompactionInfo { TSet<ui64> Granules; bool InGranule{false}; @@ -295,8 +345,8 @@ public: virtual const THashSet<ui64>* GetOverloadedGranules(ui64 /*pathId*/) const { return nullptr; } virtual bool HasOverloadedGranules() const { return false; } - virtual TString SerializeMark(const std::shared_ptr<arrow::Scalar>& scalar) const = 0; - virtual std::shared_ptr<arrow::Scalar> DeserializeMark(const TString& key) const = 0; + virtual TString SerializeMark(const NArrow::TReplaceKey& key) const = 0; + virtual NArrow::TReplaceKey DeserializeMark(const TString& key) const = 0; virtual bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index f8f1008d5b6..668e74a0385 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -12,12 +12,13 @@ namespace NKikimr::NOlap { namespace { -using TMark = TColumnEngineForLogs::TMark; - -std::shared_ptr<arrow::Array> GetFirstPKColumn(const TIndexInfo& indexInfo, - const std::shared_ptr<arrow::RecordBatch>& batch) { - TString columnName = indexInfo.GetPrimaryKey()[0].first; - return batch->GetColumnByName(std::string(columnName.data(), columnName.size())); +std::shared_ptr<arrow::RecordBatch> GetEffectiveKey(const std::shared_ptr<arrow::RecordBatch>& batch, + const TIndexInfo& indexInfo) { + // TODO: composite effective key + auto columnName = indexInfo.GetPrimaryKey()[0].first; + auto resBatch = NArrow::ExtractColumns(batch, {std::string(columnName.data(), columnName.size())}); + Y_VERIFY_S(resBatch, "No column '" << columnName << "' in batch " << batch->schema()->ToString()); + return resBatch; } arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) { @@ -41,16 +42,11 @@ arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) { return options; } -std::shared_ptr<arrow::Scalar> ExtractFirstKey(const std::shared_ptr<TPredicate>& pkPredicate, - const std::shared_ptr<arrow::Schema>& key) { +std::optional<NArrow::TReplaceKey> ExtractKey(const std::shared_ptr<TPredicate>& pkPredicate, + const std::shared_ptr<arrow::Schema>& key) { if (pkPredicate) { Y_VERIFY(pkPredicate->Good()); - Y_VERIFY(key->num_fields() == 1); - Y_VERIFY(key->field(0)->Equals(pkPredicate->Batch->schema()->field(0))); - - auto array = pkPredicate->Batch->column(0); - Y_VERIFY(array && array->length() == 1); - return *array->GetScalar(0); + return NArrow::TReplaceKey::FromBatch(pkPredicate->Batch, key, 0); } return {}; } @@ -213,7 +209,7 @@ bool InitInGranuleMerge(const TMark& granuleMark, TVector<TPortionInfo>& portion THashSet<ui64> goodCompacted; THashSet<ui64> nextToGood; { - TMap<TMark, TVector<const TPortionInfo*>> points; + TMap<NArrow::TReplaceKey, TVector<const TPortionInfo*>> points; for (auto& portionInfo : portions) { if (portionInfo.IsInserted()) { @@ -225,12 +221,11 @@ bool InitInGranuleMerge(const TMark& granuleMark, TVector<TPortionInfo>& portion goodCompacted.insert(portionInfo.Portion()); } - auto start = portionInfo.PkStart(); - auto end = portionInfo.PkEnd(); - Y_VERIFY(start && end); + NArrow::TReplaceKey start = portionInfo.EffKeyStart(); + NArrow::TReplaceKey end = portionInfo.EffKeyEnd(); - points[TMark(start)].push_back(&portionInfo); - points[TMark(end)].push_back(nullptr); + points[start].push_back(&portionInfo); + points[end].push_back(nullptr); } ui32 countInBucket = 0; @@ -295,15 +290,13 @@ bool InitInGranuleMerge(const TMark& granuleMark, TVector<TPortionInfo>& portion // Prevent merge of compacted portions with no intersections if (filtered.contains(curPortion)) { - auto start = portionInfo.PkStart(); - Y_VERIFY(start); + auto start = portionInfo.EffKeyStart(); borders.emplace_back(TMark(start)); } else { // nextToGood borders potentially split good compacted portions into 2 parts: // the first one without intersections and the second with them if (goodCompacted.contains(curPortion) || nextToGood.contains(curPortion)) { - auto start = portionInfo.PkStart(); - Y_VERIFY(start); + auto start = portionInfo.EffKeyStart(); borders.emplace_back(TMark(start)); } @@ -337,21 +330,26 @@ SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::pair<TMark, ui64>>& granules, const TIndexInfo& indexInfo) { + Y_VERIFY(batch); + if (batch->num_rows() == 0) { + return {}; + } + THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> out; if (granules.size() == 1) { out.emplace(granules[0].second, batch); } else { - const auto keyColumn = GetFirstPKColumn(indexInfo, batch); - Y_VERIFY(keyColumn && keyColumn->length() > 0); + const auto effKey = GetEffectiveKey(batch, indexInfo); + Y_VERIFY(effKey->num_columns() && effKey->num_rows()); i64 offset = 0; for (size_t i = 0; i < granules.size(); ++i) { const i64 end = (i + 1 == granules.size()) // Just take the number of elements in the key column for the last granule. - ? keyColumn->length() + ? effKey->num_rows() // Locate position of the next granule in the key. - : NArrow::LowerBound(keyColumn, *granules[i + 1].first.ToScalar(), offset); // TODO: avoid ToScalar() + : NArrow::LowerBound(effKey, granules[i + 1].first.Border, offset); if (const i64 size = end - offset) { Y_VERIFY(out.emplace(granules[i].second, batch->Slice(offset, size)).second); @@ -396,7 +394,7 @@ TColumnEngineForLogs::TMarksGranules::TMarksGranules(const TSelectInfo& selectIn bool TColumnEngineForLogs::TMarksGranules::MakePrecedingMark(const TIndexInfo& indexInfo) { ui64 minGranule = 0; - TMark minMark(indexInfo.GetIndexKey()->field(0)->type()); + TMark minMark(indexInfo.GetEffectiveKey()); if (Marks.empty()) { Marks.emplace_back(std::move(minMark), minGranule); return true; @@ -434,10 +432,7 @@ TColumnEngineForLogs::TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, con /// * apply REPLACE by MergeSort /// * apply PK predicate before REPLACE IndexInfo.SetAllKeys(); - - auto& indexKey = IndexInfo.GetIndexKey(); - Y_VERIFY(indexKey->num_fields() == 1); - MarkType = indexKey->field(0)->type(); + MarkSchema = IndexInfo.GetEffectiveKey(); ui32 indexId = IndexInfo.GetId(); GranulesTable = std::make_shared<TGranulesTable>(*this, indexId); @@ -1108,7 +1103,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, for (auto& [granule, p] : changes.NewGranules) { ui64 pathId = p.first; TMark mark = p.second; - TGranuleRecord rec(pathId, granule, snapshot, mark.ToScalar()); + TGranuleRecord rec(pathId, granule, snapshot, mark.Border); if (!SetGranule(rec, apply)) { LOG_S_ERROR("Cannot insert granule " << rec << " at tablet " << TabletId); @@ -1135,12 +1130,11 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, auto& granuleStart = Granules[granule]->Record.Mark; if (!apply) { // granule vs portion minPK - auto portionStart = portionInfo.PkStart(); - Y_VERIFY(portionStart); - if (TMark(portionStart) < TMark(granuleStart)) { + auto portionStart = portionInfo.EffKeyStart(); + if (portionStart < granuleStart) { LOG_S_ERROR("Cannot update invalid portion " << portionInfo - << " start: " << portionStart->ToString() - << " granule start: " << granuleStart->ToString() << " at tablet " << TabletId); + << " start: " << TMark(portionStart).ToString() + << " granule start: " << TMark(granuleStart).ToString() << " at tablet " << TabletId); return false; } } @@ -1249,17 +1243,15 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, } // granule vs portion minPK - std::shared_ptr<arrow::Scalar> granuleStart; - if (Granules.contains(granule)) { - granuleStart = Granules[granule]->Record.Mark; - } else { - granuleStart = changes.NewGranules.find(granule)->second.second.ToScalar(); - } - auto portionStart = portionInfo.PkStart(); - Y_VERIFY(portionStart); - if (TMark(portionStart) < TMark(granuleStart)) { - LOG_S_ERROR("Cannot insert invalid portion " << portionInfo << " start: " << portionStart->ToString() - << " granule start: " << granuleStart->ToString() << " at tablet " << TabletId); + NArrow::TReplaceKey granuleStart = Granules.contains(granule) + ? Granules[granule]->Record.Mark + : changes.NewGranules.find(granule)->second.second.Border; + + auto portionStart = portionInfo.EffKeyStart(); + if (portionStart < granuleStart) { + LOG_S_ERROR("Cannot insert invalid portion " << portionInfo + << " start: " << TMark(portionStart).ToString() + << " granule start: " << TMark(granuleStart).ToString() << " at tablet " << TabletId); return false; } } @@ -1469,13 +1461,13 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot out->Granules.reserve(pathGranules.size()); // TODO: out.Portions.reserve() - auto keyFrom = ExtractFirstKey(from, GetIndexKey()); - auto keyTo = ExtractFirstKey(to, GetIndexKey()); + std::optional<NArrow::TReplaceKey> keyFrom = ExtractKey(from, GetIndexKey()); + std::optional<NArrow::TReplaceKey> keyTo = ExtractKey(to, GetIndexKey()); // Apply FROM auto it = pathGranules.begin(); if (keyFrom) { - it = pathGranules.upper_bound(TMark(keyFrom)); + it = pathGranules.upper_bound(TMark(*keyFrom)); --it; } for (; it != pathGranules.end(); ++it) { @@ -1483,7 +1475,7 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot ui64 granule = it->second; // Apply TO - if (keyTo && TMark(keyTo) < mark) { + if (keyTo && *keyTo < mark.Border) { break; } @@ -1728,25 +1720,30 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, const TMark& ts0) { TVector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> out; - // Extract unique effective key (timestamp) and their counts + // Extract unique effective keys and their counts i64 numRows = 0; - TMap<TMark, ui32> uniqKeyCount; + TMap<NArrow::TReplaceKey, ui32> uniqKeyCount; for (auto& batch : batches) { + Y_VERIFY(batch); + if (batch->num_rows() == 0) { + continue; + } + numRows += batch->num_rows(); - auto keyColumn = GetFirstPKColumn(indexInfo, batch); - Y_VERIFY(keyColumn && keyColumn->length() > 0); + const auto effKey = GetEffectiveKey(batch, indexInfo); + Y_VERIFY(effKey->num_columns() && effKey->num_rows()); - for (int pos = 0; pos < keyColumn->length(); ++pos) { - TMark ts(*keyColumn->GetScalar(pos)); - ++uniqKeyCount[ts]; + auto effColumns = std::make_shared<NArrow::TArrayVec>(effKey->columns()); + for (int row = 0; row < effKey->num_rows(); ++row) { + ++uniqKeyCount[NArrow::TReplaceKey(effColumns, row)]; } } Y_VERIFY(uniqKeyCount.size()); auto minTs = uniqKeyCount.begin()->first; auto maxTs = uniqKeyCount.rbegin()->first; - Y_VERIFY(minTs >= ts0); + Y_VERIFY(minTs >= ts0.Border); // It's an estimation of needed count cause numRows calculated before key replaces ui32 numSplitInto = changes.NumSplitInto(numRows); @@ -1765,7 +1762,7 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, } // Make split borders from uniq keys - TVector<TMark> borders; + TVector<NArrow::TReplaceKey> borders; borders.reserve(numRows / rowsInGranule); { ui32 sumRows = 0; @@ -1789,12 +1786,12 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, auto& batchOffsets = offsets[i]; batchOffsets.reserve(borders.size() + 1); - auto keyColumn = GetFirstPKColumn(indexInfo, batch); - Y_VERIFY(keyColumn && keyColumn->length() > 0); + const auto effKey = GetEffectiveKey(batch, indexInfo); + Y_VERIFY(effKey->num_columns() && effKey->num_rows()); batchOffsets.push_back(0); - for (auto& border : borders) { - int offset = NArrow::LowerBound(keyColumn, *border.ToScalar(), batchOffsets.back()); + for (const auto& border : borders) { + int offset = NArrow::LowerBound(effKey, border, batchOffsets.back()); Y_VERIFY(offset >= batchOffsets.back()); batchOffsets.push_back(offset); } @@ -1826,17 +1823,18 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, Y_VERIFY(slice->num_rows()); granuleNumRows += slice->num_rows(); #if 1 // Check correctness - auto keyColumn = GetFirstPKColumn(indexInfo, slice); - Y_VERIFY(keyColumn && keyColumn->length() > 0); + const auto effKey = GetEffectiveKey(slice, indexInfo); + Y_VERIFY(effKey->num_columns() && effKey->num_rows()); auto startKey = granuleNo ? borders[granuleNo - 1] : minTs; - Y_VERIFY(TMark(*keyColumn->GetScalar(0)) >= startKey); + Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey); + NArrow::TReplaceKey lastSliceKey = NArrow::TReplaceKey::FromBatch(effKey, effKey->num_rows() - 1); if (granuleNo < borders.size() - 1) { - auto endKey = borders[granuleNo]; - Y_VERIFY(TMark(*keyColumn->GetScalar(keyColumn->length() - 1)) < endKey); + const auto& endKey = borders[granuleNo]; + Y_VERIFY(lastSliceKey < endKey); } else { - Y_VERIFY(TMark(*keyColumn->GetScalar(keyColumn->length() - 1)) <= maxTs); + Y_VERIFY(lastSliceKey <= maxTs); } #endif Y_VERIFY_DEBUG(NArrow::IsSorted(slice, indexInfo.GetReplaceKey())); @@ -1852,16 +1850,17 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, for (auto& batch : merged) { Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey())); - auto startKey = ts0; + auto startKey = ts0.Border; if (granuleNo) { startKey = borders[granuleNo - 1]; } #if 1 // Check correctness - auto keyColumn = GetFirstPKColumn(indexInfo, batch); - Y_VERIFY(keyColumn && keyColumn->length() > 0); - Y_VERIFY(TMark(*keyColumn->GetScalar(0)) >= startKey); + const auto effKey = GetEffectiveKey(batch, indexInfo); + Y_VERIFY(effKey->num_columns() && effKey->num_rows()); + + Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey); #endif - out.emplace_back(startKey, batch); + out.emplace_back(TMark(startKey), batch); } } @@ -1897,11 +1896,11 @@ static ui64 TryMovePortions(const TMark& ts0, } // Order compacted portions by primary key. std::sort(compacted.begin(), compacted.end(), [](const TPortionInfo* a, const TPortionInfo* b) { - return NArrow::ScalarLess(*a->PkStart(), *b->PkStart()); + return a->EffKeyStart() < b->EffKeyStart(); }); // Check that there are no gaps between two adjacent portions in term of primary key range. for (size_t i = 0; i < compacted.size() - 1; ++i) { - if (!NArrow::ScalarLess(*compacted[i]->PkEnd(), *compacted[i + 1]->PkStart())) { + if (compacted[i]->EffKeyEnd() >= compacted[i + 1]->EffKeyStart()) { return 0; } } @@ -1913,7 +1912,7 @@ static ui64 TryMovePortions(const TMark& ts0, ui32 rows = portionInfo->NumRows(); Y_VERIFY(rows); numRows += rows; - tsIds.emplace_back((counter ? TMark(portionInfo->PkStart()) : ts0), counter + 1); + tsIds.emplace_back((counter ? TMark(portionInfo->EffKeyStart()) : ts0), counter + 1); toMove.emplace_back(std::move(*portionInfo), counter); ++counter; // Ensure that std::move will take an effect. diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 04bb5e04f40..396daf9cab8 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -1,6 +1,5 @@ #pragma once #include "defs.h" -#include <ydb/core/formats/replace_key.h> #include "column_engine.h" #include "scalars.h" @@ -20,87 +19,6 @@ class TCountersTable; /// - Columns: granule -> blobs class TColumnEngineForLogs : public IColumnEngine { public: - struct TMark { - // TODO: Grouped marks. Share columns in TReplaceKey between multiple marks. - NArrow::TReplaceKey Border; - - explicit TMark(const std::shared_ptr<arrow::Scalar>& s) - : Border(FromScalar(s)) - {} - - explicit TMark(const std::shared_ptr<arrow::DataType>& type) - : Border(MinBorder(type)) - {} - - TMark(const TString& key, const std::shared_ptr<arrow::DataType>& type) - : Border(FromScalar(DeserializeKeyScalar(key, type))) - {} - - TMark(const TMark& m) = default; - TMark& operator = (const TMark& m) = default; - - bool operator == (const TMark& m) const { - return Border == m.Border; - } - - std::partial_ordering operator <=> (const TMark& m) const { - return Border <=> m.Border; - } - - ui64 Hash() const { - return Border.Hash(); - } - - operator size_t () const { - return Hash(); - } - - operator bool () const { - Y_VERIFY(false); - } - - TString Serialize() const { - return SerializeKeyScalar(ToScalar(Border)); - } - - void Deserialize(const TString& key, const std::shared_ptr<arrow::DataType>& type) { - Border = FromScalar(DeserializeKeyScalar(key, type)); - } - - std::shared_ptr<arrow::Scalar> ToScalar() const { - return ToScalar(Border); - } - - private: - static NArrow::TReplaceKey FromScalar(const std::shared_ptr<arrow::Scalar>& s) { - Y_VERIFY_DEBUG(NArrow::IsGoodScalar(s)); - auto res = MakeArrayFromScalar(*s, 1); - Y_VERIFY(res.status().ok(), "%s", res.status().ToString().c_str()); - return NArrow::TReplaceKey(std::make_shared<NArrow::TArrayVec>(1, *res), 0); - } - - static std::shared_ptr<arrow::Scalar> ToScalar(const NArrow::TReplaceKey& key) { - Y_VERIFY_DEBUG(key.Size() == 1); - auto& column = key.Column(0); - auto res = column.GetScalar(key.GetPosition()); - Y_VERIFY(res.status().ok(), "%s", res.status().ToString().c_str()); - Y_VERIFY_DEBUG(NArrow::IsGoodScalar(*res)); - return *res; - } - - static std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type) { - if (type->id() == arrow::Type::TIMESTAMP) { - // TODO: support negative timestamps in index - return std::make_shared<arrow::TimestampScalar>(0, type); - } - return NArrow::MinScalar(type); - } - - static NArrow::TReplaceKey MinBorder(const std::shared_ptr<arrow::DataType>& type) { - return FromScalar(MinScalar(type)); - } - }; - class TMarksGranules { public: using TPair = std::pair<TMark, ui64>; @@ -265,17 +183,16 @@ public: bool HasOverloadedGranules() const override { return !PathsGranulesOverloaded.empty(); } - TString SerializeMark(const std::shared_ptr<arrow::Scalar>& scalar) const override { - Y_VERIFY_S(scalar->type->Equals(MarkType), scalar->type->ToString() + ", expected " + MarkType->ToString()); - return TMark(scalar).Serialize(); + TString SerializeMark(const NArrow::TReplaceKey& key) const override { + return TMark::Serialize(key, MarkSchema); } - std::shared_ptr<arrow::Scalar> DeserializeMark(const TString& key) const override { - return TMark(key, MarkType).ToScalar(); + NArrow::TReplaceKey DeserializeMark(const TString& key) const override { + return TMark::Deserialize(key, MarkSchema); } TMark GetDefaultMark() const { - return TMark(MarkType); + return TMark(MarkSchema); } bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) override; @@ -329,7 +246,7 @@ private: TIndexInfo IndexInfo; TCompactionLimits Limits; ui64 TabletId; - std::shared_ptr<arrow::DataType> MarkType; + std::shared_ptr<arrow::Schema> MarkSchema; std::shared_ptr<TGranulesTable> GranulesTable; std::shared_ptr<TColumnsTable> ColumnsTable; std::shared_ptr<TCountersTable> CountersTable; diff --git a/ydb/core/tx/columnshard/engines/granules_table.h b/ydb/core/tx/columnshard/engines/granules_table.h index 8940a7d3ce5..e2ce21dbc74 100644 --- a/ydb/core/tx/columnshard/engines/granules_table.h +++ b/ydb/core/tx/columnshard/engines/granules_table.h @@ -1,5 +1,6 @@ #pragma once #include "db_wrapper.h" +#include <ydb/core/formats/replace_key.h> namespace NKikimr::NOlap { @@ -7,19 +8,19 @@ struct TGranuleRecord { ui64 PathId; ui64 Granule; TSnapshot CreatedAt; - std::shared_ptr<arrow::Scalar> Mark; + NArrow::TReplaceKey Mark; - TGranuleRecord(ui64 pathId, ui64 granule, const TSnapshot& createdAt, const std::shared_ptr<arrow::Scalar>& mark) + TGranuleRecord(ui64 pathId, ui64 granule, const TSnapshot& createdAt, const NArrow::TReplaceKey& mark) : PathId(pathId) , Granule(granule) , CreatedAt(createdAt) , Mark(mark) { - Y_VERIFY(Mark); + Y_VERIFY(Mark.Size()); } bool operator == (const TGranuleRecord& rec) const { - return (PathId == rec.PathId) && (Mark->Equals(*rec.Mark)); + return (PathId == rec.PathId) && (Mark == rec.Mark); } friend IOutputStream& operator << (IOutputStream& out, const TGranuleRecord& rec) { diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 8e67436a8da..293cb0cbaac 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -88,6 +88,12 @@ public: const std::shared_ptr<arrow::Schema>& GetExtendedKey() const { return ExtendedKey; } const std::shared_ptr<arrow::Schema>& GetIndexKey() const { return IndexKey; } + const std::shared_ptr<arrow::Schema> GetEffectiveKey() const { + // TODO: composite key + Y_VERIFY(IndexKey->num_fields() == 1); + return std::make_shared<arrow::Schema>(arrow::FieldVector{GetIndexKey()->field(0)}); + } + /// Initializes sorting, replace, index and extended keys. void SetAllKeys(); diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index d3908ead497..1493ab4988c 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -4,6 +4,9 @@ #include "columns_table.h" #include "index_info.h" +#include <ydb/core/formats/replace_key.h> + + namespace NKikimr::NOlap { struct TPortionMeta { @@ -156,20 +159,16 @@ struct TPortionInfo { std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const; std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const; - std::shared_ptr<arrow::Scalar> PkStart() const { - if (FirstPkColumn) { - Y_VERIFY(Meta.ColumnMeta.contains(FirstPkColumn)); - return MinValue(FirstPkColumn); - } - return {}; + NArrow::TReplaceKey EffKeyStart() const { + Y_VERIFY(FirstPkColumn); + Y_VERIFY(Meta.ColumnMeta.contains(FirstPkColumn)); + return NArrow::TReplaceKey::FromScalar(MinValue(FirstPkColumn)); } - std::shared_ptr<arrow::Scalar> PkEnd() const { - if (FirstPkColumn) { - Y_VERIFY(Meta.ColumnMeta.contains(FirstPkColumn)); - return MaxValue(FirstPkColumn); - } - return {}; + NArrow::TReplaceKey EffKeyEnd() const { + Y_VERIFY(FirstPkColumn); + Y_VERIFY(Meta.ColumnMeta.contains(FirstPkColumn)); + return NArrow::TReplaceKey::FromScalar(MaxValue(FirstPkColumn)); } ui32 NumRows() const { |