diff options
author | chertus <[email protected]> | 2023-05-12 13:32:35 +0300 |
---|---|---|
committer | chertus <[email protected]> | 2023-05-12 13:32:35 +0300 |
commit | c2bcd70fd53887e9507f247a1cba6ba0f02285b7 (patch) | |
tree | 795e7942b1570578fd8f4cfe39fddd59a9259782 | |
parent | 7b6b16cd7c7f6d5549674a9d409d03e1435a0f58 (diff) |
composite default mark in ColumnShard engine
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.h | 32 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 17 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 30 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_logic_logs.cpp | 18 |
7 files changed, 87 insertions, 47 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp index 7cdc7c9144e..430cbea2aa3 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine.cpp @@ -53,8 +53,20 @@ std::shared_ptr<arrow::Scalar> TMark::MinScalar(const std::shared_ptr<arrow::Dat } NArrow::TReplaceKey TMark::MinBorder(const std::shared_ptr<arrow::Schema>& schema) { - Y_VERIFY_DEBUG(schema->num_fields() == 1); - return NArrow::TReplaceKey::FromScalar(MinScalar(schema->field(0)->type())); + if (schema->num_fields() == 1) { + return NArrow::TReplaceKey::FromScalar(MinScalar(schema->field(0)->type())); + } else { + std::vector<std::shared_ptr<arrow::Array>> columns; + columns.reserve(schema->num_fields()); + for (const auto& field : schema->fields()) { + auto scalar = MinScalar(field->type()); + Y_VERIFY_DEBUG(scalar); + auto res = arrow::MakeArrayFromScalar(*scalar, 1); + Y_VERIFY_DEBUG(res.ok()); + columns.emplace_back(*res); + } + return NArrow::TReplaceKey::FromBatch(arrow::RecordBatch::Make(schema, 1, columns), 0); + } } } diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 1941fe45678..b15c7489155 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -31,19 +31,12 @@ struct TCompactionLimits { ui32 InGranuleCompactSeconds{2 * 60}; // Trigger in-granule comcation to guarantee no PK intersections }; -struct TMark { - /// @note It's possible to share columns in TReplaceKey between multiple marks: - /// read all marks as a batch; create TMark for each row - NArrow::TReplaceKey Border; - +class TMark { +public: explicit TMark(const NArrow::TReplaceKey& key) : Border(key) {} - explicit TMark(const std::shared_ptr<arrow::Schema>& schema) - : Border(MinBorder(schema)) - {} - TMark(const TMark& m) = default; TMark& operator = (const TMark& m) = default; @@ -55,6 +48,10 @@ struct TMark { return Border <=> m.Border; } + const NArrow::TReplaceKey& GetBorder() const noexcept { + return Border; + } + ui64 Hash() const { return Border.Hash(); } @@ -71,11 +68,16 @@ struct TMark { static TString SerializeComposite(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema); static NArrow::TReplaceKey DeserializeComposite(const TString& key, const std::shared_ptr<arrow::Schema>& schema); + static NArrow::TReplaceKey MinBorder(const std::shared_ptr<arrow::Schema>& schema); + std::string ToString() const; private: + /// @note It's possible to share columns in TReplaceKey between multiple marks: + /// read all marks as a batch; create TMark for each row + NArrow::TReplaceKey Border; + static std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type); - static NArrow::TReplaceKey MinBorder(const std::shared_ptr<arrow::Schema>& schema); }; struct TCompactionInfo { @@ -337,6 +339,7 @@ struct TColumnEngineStats { class TVersionedIndex { std::map<TSnapshot, ISnapshotSchema::TPtr> Snapshots; + std::shared_ptr<arrow::Schema> IndexKey; public: ISnapshotSchema::TPtr GetSchema(const TSnapshot& version) const { for (auto it = Snapshots.rbegin(); it != Snapshots.rend(); ++it) { @@ -354,7 +357,16 @@ public: return Snapshots.rbegin()->second; } + const std::shared_ptr<arrow::Schema>& GetIndexKey() const noexcept { + return IndexKey; + } + void AddIndex(const TSnapshot& version, TIndexInfo&& indexInfo) { + if (Snapshots.empty()) { + IndexKey = indexInfo.GetIndexKey(); + } else { + Y_VERIFY(IndexKey->Equals(indexInfo.GetIndexKey())); + } Snapshots.emplace(version, std::make_shared<TSnapshotSchema>(std::move(indexInfo), version)); } }; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 445a2d3bc54..885af522025 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -160,7 +160,7 @@ TColumnEngineForLogs::TMarksGranules::TMarksGranules(const TSelectInfo& selectIn bool TColumnEngineForLogs::TMarksGranules::MakePrecedingMark(const TIndexInfo& indexInfo) { ui64 minGranule = 0; - TMark minMark(indexInfo.GetEffectiveKey()); + TMark minMark(TMark::MinBorder(indexInfo.GetIndexKey())); if (Marks.empty()) { Marks.emplace_back(std::move(minMark), minGranule); return true; @@ -322,7 +322,6 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) { if (!GranulesTable) { - MarkSchema = info.GetEffectiveKey(); ui32 indexId = info.GetId(); GranulesTable = std::make_shared<TGranulesTable>(*this, indexId); ColumnsTable = std::make_shared<TColumnsTable>(indexId); @@ -455,7 +454,7 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) { std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(std::vector<TInsertedData>&& dataToIndex) { Y_VERIFY(dataToIndex.size()); - auto changes = std::make_shared<TChanges>(TMark(MarkSchema), std::move(dataToIndex), Limits); + auto changes = std::make_shared<TChanges>(DefaultMark(), std::move(dataToIndex), Limits); ui32 reserveGranules = 0; changes->InitSnapshot = LastSnapshot; @@ -496,7 +495,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: Y_VERIFY(info); Y_VERIFY(info->Granules.size() == 1); - auto changes = std::make_shared<TChanges>(TMark(MarkSchema), std::move(info), Limits, LastSnapshot); + auto changes = std::make_shared<TChanges>(DefaultMark(), std::move(info), Limits, LastSnapshot); const ui64 granule = *changes->CompactionInfo->Granules.begin(); const auto gi = Granules.find(granule); @@ -538,7 +537,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop, ui32 maxRecords) { - auto changes = std::make_shared<TChanges>(TMark(MarkSchema), snapshot, Limits); + auto changes = std::make_shared<TChanges>(DefaultMark(), snapshot, Limits); ui32 affectedRecords = 0; // Add all portions from dropped paths @@ -625,7 +624,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash } TSnapshot fakeSnapshot(1, 1); // TODO: better snapshot - auto changes = std::make_shared<TChanges>(TMark(MarkSchema), TColumnEngineChanges::TTL, fakeSnapshot); + auto changes = std::make_shared<TChanges>(DefaultMark(), TColumnEngineChanges::TTL, fakeSnapshot); ui64 evicttionSize = 0; bool allowEviction = true; ui64 dropBlobs = 0; @@ -878,7 +877,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, for (auto& [granule, p] : changes.NewGranules) { ui64 pathId = p.first; TMark mark = p.second; - TGranuleRecord rec(pathId, granule, snapshot, mark.Border); + TGranuleRecord rec(pathId, granule, snapshot, mark.GetBorder()); if (!SetGranule(rec, apply)) { LOG_S_ERROR("Cannot insert granule " << rec << " at tablet " << TabletId); @@ -1021,7 +1020,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, // granule vs portion minPK NArrow::TReplaceKey granuleStart = Granules.contains(granule) ? Granules[granule]->Record.Mark - : changes.NewGranules.find(granule)->second.second.Border; + : changes.NewGranules.find(granule)->second.second.GetBorder(); auto portionStart = portionInfo.EffKeyStart(); if (portionStart < granuleStart) { @@ -1239,7 +1238,7 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot auto& mark = it->first; ui64 granule = it->second; - if (keyTo && *keyTo < mark.Border) { + if (keyTo && *keyTo < mark.GetBorder()) { break; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index df2ba006f8e..d4f26822848 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -191,18 +191,18 @@ public: bool HasOverloadedGranules() const override { return !PathsGranulesOverloaded.empty(); } TString SerializeMark(const NArrow::TReplaceKey& key) const override { - if (UseCompositeMarks) { - return TMark::SerializeComposite(key, MarkSchema); + if (UseCompositeMarks()) { + return TMark::SerializeComposite(key, MarkSchema()); } else { - return TMark::SerializeScalar(key, MarkSchema); + return TMark::SerializeScalar(key, MarkSchema()); } } NArrow::TReplaceKey DeserializeMark(const TString& key) const override { - if (UseCompositeMarks) { - return TMark::DeserializeComposite(key, MarkSchema); + if (UseCompositeMarks()) { + return TMark::DeserializeComposite(key, MarkSchema()); } else { - return TMark::DeserializeScalar(key, MarkSchema); + return TMark::DeserializeScalar(key, MarkSchema()); } } @@ -253,7 +253,6 @@ private: TVersionedIndex VersionedIndex; TCompactionLimits Limits; ui64 TabletId; - std::shared_ptr<arrow::Schema> MarkSchema; std::shared_ptr<TGranulesTable> GranulesTable; std::shared_ptr<TColumnsTable> ColumnsTable; std::shared_ptr<TCountersTable> CountersTable; @@ -271,9 +270,24 @@ private: ui64 LastPortion; ui64 LastGranule; TSnapshot LastSnapshot = TSnapshot::Zero(); - bool UseCompositeMarks = false; + mutable std::optional<TMark> CachedDefaultMark; private: + const std::shared_ptr<arrow::Schema>& MarkSchema() const noexcept { + return VersionedIndex.GetIndexKey(); + } + + const TMark& DefaultMark() const { + if (!CachedDefaultMark) { + CachedDefaultMark = TMark(TMark::MinBorder(MarkSchema())); + } + return *CachedDefaultMark; + } + + bool UseCompositeMarks() const { + return MarkSchema()->num_fields() > 1; + } + void ClearIndex() { Granules.clear(); PathGranules.clear(); diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index cbdafb694db..920c7f3b6ab 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -20,10 +20,11 @@ static std::vector<TString> NamesOnly(const std::vector<TNameTypeInfo>& columns) return out; } -TIndexInfo::TIndexInfo(const TString& name, ui32 id) +TIndexInfo::TIndexInfo(const TString& name, ui32 id, bool compositeIndexKey) : NTable::TScheme::TTableSchema() , Id(id) , Name(name) + , CompositeIndexKey(compositeIndexKey) {} std::shared_ptr<arrow::RecordBatch> TIndexInfo::AddSpecialColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) { @@ -277,7 +278,11 @@ void TIndexInfo::SetAllKeys() { SortingKey = ArrowSchema(primaryKeyNames); ReplaceKey = SortingKey; fields = ReplaceKey->fields(); - IndexKey = std::make_shared<arrow::Schema>(arrow::FieldVector({ fields[0] })); + if (CompositeIndexKey) { + IndexKey = ReplaceKey; + } else { + IndexKey = std::make_shared<arrow::Schema>(arrow::FieldVector({ fields[0] })); + } } fields.push_back(arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64())); diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 5b745b4f398..3312a40905a 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -59,7 +59,7 @@ public: return true; } public: - TIndexInfo(const TString& name, ui32 id); + TIndexInfo(const TString& name, ui32 id, bool compositeIndexKey = false); /// Returns id of the index. ui32 GetId() const noexcept { @@ -98,12 +98,6 @@ public: const std::shared_ptr<arrow::Schema>& GetExtendedKey() const { return ExtendedKey; } const std::shared_ptr<arrow::Schema>& GetIndexKey() const { return IndexKey; } - const std::shared_ptr<arrow::Schema> GetEffectiveKey() const { - // TODO: composite key - Y_VERIFY(IndexKey->num_fields() == 1); - return std::make_shared<arrow::Schema>(arrow::FieldVector{GetIndexKey()->field(0)}); - } - /// Initializes sorting, replace, index and extended keys. void SetAllKeys(); @@ -146,17 +140,21 @@ public: void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; } const TCompression& GetDefaultCompression() const { return DefaultCompression; } + static const std::vector<std::string>& GetSpecialColumnNames() { static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) }; return result; } + static const std::vector<ui32>& GetSpecialColumnIds() { static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID }; return result; } + private: ui32 Id; TString Name; + const bool CompositeIndexKey; mutable std::shared_ptr<arrow::Schema> Schema; mutable std::shared_ptr<arrow::Schema> SchemaWithSpecials; std::shared_ptr<arrow::Schema> SortingKey; diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp index 4bccce87cc3..fc8828637df 100644 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp @@ -6,10 +6,10 @@ namespace NKikimr::NOlap { std::shared_ptr<arrow::RecordBatch> TIndexLogicBase::GetEffectiveKey(const std::shared_ptr<arrow::RecordBatch>& batch, const TIndexInfo& indexInfo) { - // TODO: composite effective key - auto columnName = indexInfo.GetPrimaryKey()[0].first; - auto resBatch = NArrow::ExtractColumns(batch, {std::string(columnName.data(), columnName.size())}); - Y_VERIFY_S(resBatch, "No column '" << columnName << "' in batch " << batch->schema()->ToString()); + const auto& key = indexInfo.GetIndexKey(); + auto resBatch = NArrow::ExtractColumns(batch, key); + Y_VERIFY_S(resBatch, "Cannot extract effective key " << key->ToString() + << " from batch " << batch->schema()->ToString()); return resBatch; } @@ -209,7 +209,7 @@ THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> TIndexLogicBase::SliceIntoGr // Just take the number of elements in the key column for the last granule. ? effKey->num_rows() // Locate position of the next granule in the key. - : NArrow::LowerBound(keys, granules[i + 1].first.Border, offset); + : NArrow::LowerBound(keys, granules[i + 1].first.GetBorder(), offset); if (const i64 size = end - offset) { Y_VERIFY(out.emplace(granules[i].second, batch->Slice(offset, size)).second); @@ -227,7 +227,7 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange Y_VERIFY(changes->AppendedPortions.empty()); - TSnapshot minSnapshot = changes->ApplySnapshot; + TSnapshot minSnapshot = changes->ApplySnapshot; for (auto& inserted : changes->DataToIndex) { TSnapshot insertSnap = inserted.GetSnapshot(); Y_VERIFY(insertSnap.Valid()); @@ -238,7 +238,7 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange Y_VERIFY(minSnapshot.Valid()); auto& indexInfo = IndexInfo.GetSchema(minSnapshot)->GetIndexInfo(); Y_VERIFY(indexInfo.IsSorted()); - + THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches; for (auto& inserted : changes->DataToIndex) { TBlobRange blobRange(inserted.BlobId, 0, inserted.BlobId.BlobSize()); @@ -377,7 +377,7 @@ TCompactionLogic::SliceGranuleBatches(const TIndexInfo& indexInfo, Y_VERIFY(uniqKeyCount.size()); auto minTs = uniqKeyCount.begin()->first; auto maxTs = uniqKeyCount.rbegin()->first; - Y_VERIFY(minTs >= ts0.Border); + Y_VERIFY(minTs >= ts0.GetBorder()); // It's an estimation of needed count cause numRows calculated before key replaces ui32 numSplitInto = changes.NumSplitInto(numRows); @@ -493,7 +493,7 @@ TCompactionLogic::SliceGranuleBatches(const TIndexInfo& indexInfo, for (auto& batch : merged) { Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey())); - auto startKey = ts0.Border; + auto startKey = ts0.GetBorder(); if (granuleNo) { startKey = borders[granuleNo - 1]; } |