summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <[email protected]>2023-05-12 13:32:35 +0300
committerchertus <[email protected]>2023-05-12 13:32:35 +0300
commitc2bcd70fd53887e9507f247a1cba6ba0f02285b7 (patch)
tree795e7942b1570578fd8f4cfe39fddd59a9259782
parent7b6b16cd7c7f6d5549674a9d409d03e1435a0f58 (diff)
composite default mark in ColumnShard engine
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.cpp16
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h32
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h30
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h12
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.cpp18
7 files changed, 87 insertions, 47 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp
index 7cdc7c9144e..430cbea2aa3 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine.cpp
@@ -53,8 +53,20 @@ std::shared_ptr<arrow::Scalar> TMark::MinScalar(const std::shared_ptr<arrow::Dat
}
NArrow::TReplaceKey TMark::MinBorder(const std::shared_ptr<arrow::Schema>& schema) {
- Y_VERIFY_DEBUG(schema->num_fields() == 1);
- return NArrow::TReplaceKey::FromScalar(MinScalar(schema->field(0)->type()));
+ if (schema->num_fields() == 1) {
+ return NArrow::TReplaceKey::FromScalar(MinScalar(schema->field(0)->type()));
+ } else {
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ columns.reserve(schema->num_fields());
+ for (const auto& field : schema->fields()) {
+ auto scalar = MinScalar(field->type());
+ Y_VERIFY_DEBUG(scalar);
+ auto res = arrow::MakeArrayFromScalar(*scalar, 1);
+ Y_VERIFY_DEBUG(res.ok());
+ columns.emplace_back(*res);
+ }
+ return NArrow::TReplaceKey::FromBatch(arrow::RecordBatch::Make(schema, 1, columns), 0);
+ }
}
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 1941fe45678..b15c7489155 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -31,19 +31,12 @@ struct TCompactionLimits {
ui32 InGranuleCompactSeconds{2 * 60}; // Trigger in-granule comcation to guarantee no PK intersections
};
-struct TMark {
- /// @note It's possible to share columns in TReplaceKey between multiple marks:
- /// read all marks as a batch; create TMark for each row
- NArrow::TReplaceKey Border;
-
+class TMark {
+public:
explicit TMark(const NArrow::TReplaceKey& key)
: Border(key)
{}
- explicit TMark(const std::shared_ptr<arrow::Schema>& schema)
- : Border(MinBorder(schema))
- {}
-
TMark(const TMark& m) = default;
TMark& operator = (const TMark& m) = default;
@@ -55,6 +48,10 @@ struct TMark {
return Border <=> m.Border;
}
+ const NArrow::TReplaceKey& GetBorder() const noexcept {
+ return Border;
+ }
+
ui64 Hash() const {
return Border.Hash();
}
@@ -71,11 +68,16 @@ struct TMark {
static TString SerializeComposite(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema);
static NArrow::TReplaceKey DeserializeComposite(const TString& key, const std::shared_ptr<arrow::Schema>& schema);
+ static NArrow::TReplaceKey MinBorder(const std::shared_ptr<arrow::Schema>& schema);
+
std::string ToString() const;
private:
+ /// @note It's possible to share columns in TReplaceKey between multiple marks:
+ /// read all marks as a batch; create TMark for each row
+ NArrow::TReplaceKey Border;
+
static std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type);
- static NArrow::TReplaceKey MinBorder(const std::shared_ptr<arrow::Schema>& schema);
};
struct TCompactionInfo {
@@ -337,6 +339,7 @@ struct TColumnEngineStats {
class TVersionedIndex {
std::map<TSnapshot, ISnapshotSchema::TPtr> Snapshots;
+ std::shared_ptr<arrow::Schema> IndexKey;
public:
ISnapshotSchema::TPtr GetSchema(const TSnapshot& version) const {
for (auto it = Snapshots.rbegin(); it != Snapshots.rend(); ++it) {
@@ -354,7 +357,16 @@ public:
return Snapshots.rbegin()->second;
}
+ const std::shared_ptr<arrow::Schema>& GetIndexKey() const noexcept {
+ return IndexKey;
+ }
+
void AddIndex(const TSnapshot& version, TIndexInfo&& indexInfo) {
+ if (Snapshots.empty()) {
+ IndexKey = indexInfo.GetIndexKey();
+ } else {
+ Y_VERIFY(IndexKey->Equals(indexInfo.GetIndexKey()));
+ }
Snapshots.emplace(version, std::make_shared<TSnapshotSchema>(std::move(indexInfo), version));
}
};
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 445a2d3bc54..885af522025 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -160,7 +160,7 @@ TColumnEngineForLogs::TMarksGranules::TMarksGranules(const TSelectInfo& selectIn
bool TColumnEngineForLogs::TMarksGranules::MakePrecedingMark(const TIndexInfo& indexInfo) {
ui64 minGranule = 0;
- TMark minMark(indexInfo.GetEffectiveKey());
+ TMark minMark(TMark::MinBorder(indexInfo.GetIndexKey()));
if (Marks.empty()) {
Marks.emplace_back(std::move(minMark), minGranule);
return true;
@@ -322,7 +322,6 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c
void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) {
if (!GranulesTable) {
- MarkSchema = info.GetEffectiveKey();
ui32 indexId = info.GetId();
GranulesTable = std::make_shared<TGranulesTable>(*this, indexId);
ColumnsTable = std::make_shared<TColumnsTable>(indexId);
@@ -455,7 +454,7 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) {
std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(std::vector<TInsertedData>&& dataToIndex) {
Y_VERIFY(dataToIndex.size());
- auto changes = std::make_shared<TChanges>(TMark(MarkSchema), std::move(dataToIndex), Limits);
+ auto changes = std::make_shared<TChanges>(DefaultMark(), std::move(dataToIndex), Limits);
ui32 reserveGranules = 0;
changes->InitSnapshot = LastSnapshot;
@@ -496,7 +495,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std:
Y_VERIFY(info);
Y_VERIFY(info->Granules.size() == 1);
- auto changes = std::make_shared<TChanges>(TMark(MarkSchema), std::move(info), Limits, LastSnapshot);
+ auto changes = std::make_shared<TChanges>(DefaultMark(), std::move(info), Limits, LastSnapshot);
const ui64 granule = *changes->CompactionInfo->Granules.begin();
const auto gi = Granules.find(granule);
@@ -538,7 +537,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std:
std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot,
THashSet<ui64>& pathsToDrop,
ui32 maxRecords) {
- auto changes = std::make_shared<TChanges>(TMark(MarkSchema), snapshot, Limits);
+ auto changes = std::make_shared<TChanges>(DefaultMark(), snapshot, Limits);
ui32 affectedRecords = 0;
// Add all portions from dropped paths
@@ -625,7 +624,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
}
TSnapshot fakeSnapshot(1, 1); // TODO: better snapshot
- auto changes = std::make_shared<TChanges>(TMark(MarkSchema), TColumnEngineChanges::TTL, fakeSnapshot);
+ auto changes = std::make_shared<TChanges>(DefaultMark(), TColumnEngineChanges::TTL, fakeSnapshot);
ui64 evicttionSize = 0;
bool allowEviction = true;
ui64 dropBlobs = 0;
@@ -878,7 +877,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
for (auto& [granule, p] : changes.NewGranules) {
ui64 pathId = p.first;
TMark mark = p.second;
- TGranuleRecord rec(pathId, granule, snapshot, mark.Border);
+ TGranuleRecord rec(pathId, granule, snapshot, mark.GetBorder());
if (!SetGranule(rec, apply)) {
LOG_S_ERROR("Cannot insert granule " << rec << " at tablet " << TabletId);
@@ -1021,7 +1020,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
// granule vs portion minPK
NArrow::TReplaceKey granuleStart = Granules.contains(granule)
? Granules[granule]->Record.Mark
- : changes.NewGranules.find(granule)->second.second.Border;
+ : changes.NewGranules.find(granule)->second.second.GetBorder();
auto portionStart = portionInfo.EffKeyStart();
if (portionStart < granuleStart) {
@@ -1239,7 +1238,7 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
auto& mark = it->first;
ui64 granule = it->second;
- if (keyTo && *keyTo < mark.Border) {
+ if (keyTo && *keyTo < mark.GetBorder()) {
break;
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index df2ba006f8e..d4f26822848 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -191,18 +191,18 @@ public:
bool HasOverloadedGranules() const override { return !PathsGranulesOverloaded.empty(); }
TString SerializeMark(const NArrow::TReplaceKey& key) const override {
- if (UseCompositeMarks) {
- return TMark::SerializeComposite(key, MarkSchema);
+ if (UseCompositeMarks()) {
+ return TMark::SerializeComposite(key, MarkSchema());
} else {
- return TMark::SerializeScalar(key, MarkSchema);
+ return TMark::SerializeScalar(key, MarkSchema());
}
}
NArrow::TReplaceKey DeserializeMark(const TString& key) const override {
- if (UseCompositeMarks) {
- return TMark::DeserializeComposite(key, MarkSchema);
+ if (UseCompositeMarks()) {
+ return TMark::DeserializeComposite(key, MarkSchema());
} else {
- return TMark::DeserializeScalar(key, MarkSchema);
+ return TMark::DeserializeScalar(key, MarkSchema());
}
}
@@ -253,7 +253,6 @@ private:
TVersionedIndex VersionedIndex;
TCompactionLimits Limits;
ui64 TabletId;
- std::shared_ptr<arrow::Schema> MarkSchema;
std::shared_ptr<TGranulesTable> GranulesTable;
std::shared_ptr<TColumnsTable> ColumnsTable;
std::shared_ptr<TCountersTable> CountersTable;
@@ -271,9 +270,24 @@ private:
ui64 LastPortion;
ui64 LastGranule;
TSnapshot LastSnapshot = TSnapshot::Zero();
- bool UseCompositeMarks = false;
+ mutable std::optional<TMark> CachedDefaultMark;
private:
+ const std::shared_ptr<arrow::Schema>& MarkSchema() const noexcept {
+ return VersionedIndex.GetIndexKey();
+ }
+
+ const TMark& DefaultMark() const {
+ if (!CachedDefaultMark) {
+ CachedDefaultMark = TMark(TMark::MinBorder(MarkSchema()));
+ }
+ return *CachedDefaultMark;
+ }
+
+ bool UseCompositeMarks() const {
+ return MarkSchema()->num_fields() > 1;
+ }
+
void ClearIndex() {
Granules.clear();
PathGranules.clear();
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index cbdafb694db..920c7f3b6ab 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -20,10 +20,11 @@ static std::vector<TString> NamesOnly(const std::vector<TNameTypeInfo>& columns)
return out;
}
-TIndexInfo::TIndexInfo(const TString& name, ui32 id)
+TIndexInfo::TIndexInfo(const TString& name, ui32 id, bool compositeIndexKey)
: NTable::TScheme::TTableSchema()
, Id(id)
, Name(name)
+ , CompositeIndexKey(compositeIndexKey)
{}
std::shared_ptr<arrow::RecordBatch> TIndexInfo::AddSpecialColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) {
@@ -277,7 +278,11 @@ void TIndexInfo::SetAllKeys() {
SortingKey = ArrowSchema(primaryKeyNames);
ReplaceKey = SortingKey;
fields = ReplaceKey->fields();
- IndexKey = std::make_shared<arrow::Schema>(arrow::FieldVector({ fields[0] }));
+ if (CompositeIndexKey) {
+ IndexKey = ReplaceKey;
+ } else {
+ IndexKey = std::make_shared<arrow::Schema>(arrow::FieldVector({ fields[0] }));
+ }
}
fields.push_back(arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()));
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 5b745b4f398..3312a40905a 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -59,7 +59,7 @@ public:
return true;
}
public:
- TIndexInfo(const TString& name, ui32 id);
+ TIndexInfo(const TString& name, ui32 id, bool compositeIndexKey = false);
/// Returns id of the index.
ui32 GetId() const noexcept {
@@ -98,12 +98,6 @@ public:
const std::shared_ptr<arrow::Schema>& GetExtendedKey() const { return ExtendedKey; }
const std::shared_ptr<arrow::Schema>& GetIndexKey() const { return IndexKey; }
- const std::shared_ptr<arrow::Schema> GetEffectiveKey() const {
- // TODO: composite key
- Y_VERIFY(IndexKey->num_fields() == 1);
- return std::make_shared<arrow::Schema>(arrow::FieldVector{GetIndexKey()->field(0)});
- }
-
/// Initializes sorting, replace, index and extended keys.
void SetAllKeys();
@@ -146,17 +140,21 @@ public:
void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; }
const TCompression& GetDefaultCompression() const { return DefaultCompression; }
+
static const std::vector<std::string>& GetSpecialColumnNames() {
static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) };
return result;
}
+
static const std::vector<ui32>& GetSpecialColumnIds() {
static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID };
return result;
}
+
private:
ui32 Id;
TString Name;
+ const bool CompositeIndexKey;
mutable std::shared_ptr<arrow::Schema> Schema;
mutable std::shared_ptr<arrow::Schema> SchemaWithSpecials;
std::shared_ptr<arrow::Schema> SortingKey;
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
index 4bccce87cc3..fc8828637df 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
@@ -6,10 +6,10 @@ namespace NKikimr::NOlap {
std::shared_ptr<arrow::RecordBatch> TIndexLogicBase::GetEffectiveKey(const std::shared_ptr<arrow::RecordBatch>& batch,
const TIndexInfo& indexInfo) {
- // TODO: composite effective key
- auto columnName = indexInfo.GetPrimaryKey()[0].first;
- auto resBatch = NArrow::ExtractColumns(batch, {std::string(columnName.data(), columnName.size())});
- Y_VERIFY_S(resBatch, "No column '" << columnName << "' in batch " << batch->schema()->ToString());
+ const auto& key = indexInfo.GetIndexKey();
+ auto resBatch = NArrow::ExtractColumns(batch, key);
+ Y_VERIFY_S(resBatch, "Cannot extract effective key " << key->ToString()
+ << " from batch " << batch->schema()->ToString());
return resBatch;
}
@@ -209,7 +209,7 @@ THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> TIndexLogicBase::SliceIntoGr
// Just take the number of elements in the key column for the last granule.
? effKey->num_rows()
// Locate position of the next granule in the key.
- : NArrow::LowerBound(keys, granules[i + 1].first.Border, offset);
+ : NArrow::LowerBound(keys, granules[i + 1].first.GetBorder(), offset);
if (const i64 size = end - offset) {
Y_VERIFY(out.emplace(granules[i].second, batch->Slice(offset, size)).second);
@@ -227,7 +227,7 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange
Y_VERIFY(changes->AppendedPortions.empty());
- TSnapshot minSnapshot = changes->ApplySnapshot;
+ TSnapshot minSnapshot = changes->ApplySnapshot;
for (auto& inserted : changes->DataToIndex) {
TSnapshot insertSnap = inserted.GetSnapshot();
Y_VERIFY(insertSnap.Valid());
@@ -238,7 +238,7 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange
Y_VERIFY(minSnapshot.Valid());
auto& indexInfo = IndexInfo.GetSchema(minSnapshot)->GetIndexInfo();
Y_VERIFY(indexInfo.IsSorted());
-
+
THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches;
for (auto& inserted : changes->DataToIndex) {
TBlobRange blobRange(inserted.BlobId, 0, inserted.BlobId.BlobSize());
@@ -377,7 +377,7 @@ TCompactionLogic::SliceGranuleBatches(const TIndexInfo& indexInfo,
Y_VERIFY(uniqKeyCount.size());
auto minTs = uniqKeyCount.begin()->first;
auto maxTs = uniqKeyCount.rbegin()->first;
- Y_VERIFY(minTs >= ts0.Border);
+ Y_VERIFY(minTs >= ts0.GetBorder());
// It's an estimation of needed count cause numRows calculated before key replaces
ui32 numSplitInto = changes.NumSplitInto(numRows);
@@ -493,7 +493,7 @@ TCompactionLogic::SliceGranuleBatches(const TIndexInfo& indexInfo,
for (auto& batch : merged) {
Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey()));
- auto startKey = ts0.Border;
+ auto startKey = ts0.GetBorder();
if (granuleNo) {
startKey = borders[granuleNo - 1];
}