aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-04-12 19:12:13 +0300
committerchertus <azuikov@ydb.tech>2023-04-12 19:12:13 +0300
commit73b3c1e86835d644f21ae58280ad4974bba75f05 (patch)
treeddfffc6dbc556638a1fe7afab30783e5427a8f02
parenta5e044c5e12f6054fc4ab00c6176ca403f53d546 (diff)
downloadydb-73b3c1e86835d644f21ae58280ad4974bba75f05.tar.gz
further TMark refactoring
-rw-r--r--ydb/core/formats/arrow_helpers.cpp11
-rw-r--r--ydb/core/formats/arrow_helpers.h6
-rw-r--r--ydb/core/formats/replace_key.h61
-rw-r--r--ydb/core/tx/columnshard/columnshard_costs.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.cpp42
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h54
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp161
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h95
-rw-r--r--ydb/core/tx/columnshard/engines/granules_table.h9
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h6
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h23
11 files changed, 282 insertions, 192 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index 9e7efe684a5..4f65458fd4c 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -754,6 +754,17 @@ i64 LowerBound(const std::shared_ptr<arrow::Array>& array, const arrow::Scalar&
return pos;
}
+// TODO: implement
+i64 LowerBound(const std::shared_ptr<arrow::RecordBatch>& batch, const TReplaceKey& key, i64 offset) {
+ Y_VERIFY(batch->num_columns() == 1);
+ Y_VERIFY(key.Size() == 1);
+
+ auto res = key.Column(0).GetScalar(key.GetPosition());
+ Y_VERIFY_OK(res.status());
+ Y_VERIFY(*res);
+ return LowerBound(batch->column(0), *(*res), offset);
+}
+
std::shared_ptr<arrow::UInt64Array> MakeUI64Array(ui64 value, i64 size) {
auto res = arrow::MakeArrayFromScalar(arrow::UInt64Scalar(value), size);
Y_VERIFY(res.ok());
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index b2570a09532..23361c5ef78 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -10,6 +10,11 @@
namespace NKikimr::NArrow {
+using TArrayVec = std::vector<std::shared_ptr<arrow::Array>>;
+template<typename T>
+class TReplaceKeyTemplate;
+using TReplaceKey = TReplaceKeyTemplate<std::shared_ptr<TArrayVec>>;
+
// Arrow inrernally keeps references to Buffer objects with the data
// This helper class implements arrow::Buffer over TString that owns
// the actual memory
@@ -105,6 +110,7 @@ std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2)
std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2, size_t& count);
TVector<TString> ColumnNames(const std::shared_ptr<arrow::Schema>& schema);
i64 LowerBound(const std::shared_ptr<arrow::Array>& column, const arrow::Scalar& value, i64 offset = 0);
+i64 LowerBound(const std::shared_ptr<arrow::RecordBatch>& batch, const TReplaceKey& key, i64 offset = 0);
bool ReserveData(arrow::ArrayBuilder& builder, const size_t size);
enum class ECompareType {
LESS = 1,
diff --git a/ydb/core/formats/replace_key.h b/ydb/core/formats/replace_key.h
index 3f27dd1982f..195cc4e5f75 100644
--- a/ydb/core/formats/replace_key.h
+++ b/ydb/core/formats/replace_key.h
@@ -5,9 +5,15 @@
namespace NKikimr::NArrow {
+bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x);
+
+using TArrayVec = std::vector<std::shared_ptr<arrow::Array>>;
+
template<typename TArrayVecPtr>
class TReplaceKeyTemplate {
public:
+ static constexpr bool IsOwning = std::is_same_v<TArrayVecPtr, std::shared_ptr<TArrayVec>>;
+
TReplaceKeyTemplate(TArrayVecPtr columns, int position)
: Columns(columns)
, Position(position)
@@ -15,6 +21,14 @@ public:
Y_VERIFY_DEBUG(Size() > 0 && Position < Column(0).length());
}
+ template<typename T = TArrayVecPtr> requires IsOwning
+ TReplaceKeyTemplate(TArrayVec&& columns, int position)
+ : Columns(std::make_shared<TArrayVec>(std::move(columns)))
+ , Position(position)
+ {
+ Y_VERIFY_DEBUG(Size() > 0 && Position < Column(0).length());
+ }
+
size_t Hash() const {
return TypedHash(Column(0), Position, Column(0).type_id());
}
@@ -78,6 +92,7 @@ public:
}
int Size() const {
+ Y_VERIFY_DEBUG(Columns);
return Columns->size();
}
@@ -86,12 +101,53 @@ public:
}
const arrow::Array& Column(int i) const {
+ Y_VERIFY_DEBUG(Columns);
return *(*Columns)[i];
}
+ template<typename T = TArrayVecPtr> requires IsOwning
+ static TReplaceKeyTemplate<TArrayVecPtr> FromBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const std::shared_ptr<arrow::Schema>& key, int row) {
+ Y_VERIFY(key->num_fields() <= batch->num_columns());
+
+ TArrayVec columns;
+ columns.reserve(key->num_fields());
+ for (int i = 0; i < key->num_fields(); ++i) {
+ auto& keyField = key->field(i);
+ auto array = batch->GetColumnByName(keyField->name());
+ Y_VERIFY(array);
+ Y_VERIFY(keyField->type()->Equals(array->type()));
+ columns.push_back(array);
+ }
+
+ return TReplaceKeyTemplate<TArrayVecPtr>(std::move(columns), row);
+ }
+
+ template<typename T = TArrayVecPtr> requires IsOwning
+ static TReplaceKeyTemplate<TArrayVecPtr> FromBatch(const std::shared_ptr<arrow::RecordBatch>& batch, int row) {
+ auto columns = std::make_shared<TArrayVec>(batch->columns());
+ return TReplaceKeyTemplate<TArrayVecPtr>(columns, row);
+ }
+
+ static TReplaceKeyTemplate<TArrayVecPtr> FromScalar(const std::shared_ptr<arrow::Scalar>& s) {
+ Y_VERIFY_DEBUG(IsGoodScalar(s));
+ auto res = MakeArrayFromScalar(*s, 1);
+ Y_VERIFY(res.status().ok(), "%s", res.status().ToString().c_str());
+ return TReplaceKeyTemplate<TArrayVecPtr>(std::make_shared<TArrayVec>(1, *res), 0);
+ }
+
+ static std::shared_ptr<arrow::Scalar> ToScalar(const TReplaceKeyTemplate<TArrayVecPtr>& key) {
+ Y_VERIFY_DEBUG(key.Size() == 1);
+ auto& column = key.Column(0);
+ auto res = column.GetScalar(key.GetPosition());
+ Y_VERIFY(res.status().ok(), "%s", res.status().ToString().c_str());
+ Y_VERIFY_DEBUG(IsGoodScalar(*res));
+ return *res;
+ }
+
private:
- TArrayVecPtr Columns;
- int Position;
+ TArrayVecPtr Columns = nullptr;
+ int Position = 0;
static size_t TypedHash(const arrow::Array& ar, int pos, arrow::Type::type typeId) {
switch (typeId) {
@@ -269,7 +325,6 @@ private:
}
};
-using TArrayVec = std::vector<std::shared_ptr<arrow::Array>>;
using TReplaceKey = TReplaceKeyTemplate<std::shared_ptr<TArrayVec>>;
using TRawReplaceKey = TReplaceKeyTemplate<const TArrayVec*>;
diff --git a/ydb/core/tx/columnshard/columnshard_costs.cpp b/ydb/core/tx/columnshard/columnshard_costs.cpp
index ff0c0781d58..7e45a37043e 100644
--- a/ydb/core/tx/columnshard/columnshard_costs.cpp
+++ b/ydb/core/tx/columnshard/columnshard_costs.cpp
@@ -9,7 +9,11 @@
namespace NKikimr::NOlap::NCosts {
void TKeyRangesBuilder::AddMarkFromGranule(const TGranuleRecord& record) {
- Constructor.StartRecord(true).AddRecordValue(record.Mark);
+ auto& key = record.Mark;
+ Y_VERIFY(key.Size() == 1);
+ auto res = key.Column(0).GetScalar(key.GetPosition());
+ Y_VERIFY(res.status().ok());
+ Constructor.StartRecord(true).AddRecordValue(*res);
Features.emplace_back(TMarkRangeFeatures());
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp
index d24839efc8a..16a55b847c5 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine.cpp
@@ -1,6 +1,48 @@
#include "column_engine.h"
#include <util/stream/output.h>
+namespace NKikimr::NOlap {
+
+TString TMark::Serialize(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema) {
+ Y_VERIFY_DEBUG(key.Size() > 0);
+ if (key.Size() == 1) {
+ Y_VERIFY_S(key.Column(0).type()->Equals(schema->field(0)->type()),
+ key.Column(0).type()->ToString() + ", expected " + schema->ToString());
+ return SerializeKeyScalar(NArrow::TReplaceKey::ToScalar(key));
+ } else {
+ Y_FAIL("not implemented"); // TODO
+ }
+}
+
+NArrow::TReplaceKey TMark::Deserialize(const TString& key, const std::shared_ptr<arrow::Schema>& schema) {
+ Y_VERIFY_DEBUG(schema->num_fields() > 0);
+ if (schema->num_fields() == 1) {
+ return NArrow::TReplaceKey::FromScalar(DeserializeKeyScalar(key, schema->field(0)->type()));
+ } else {
+ Y_FAIL("not implemented"); // TODO
+ }
+}
+
+std::string TMark::ToString() const {
+ Y_VERIFY_DEBUG(Border.Size() == 1);
+ return NArrow::TReplaceKey::ToScalar(Border)->ToString();
+}
+
+std::shared_ptr<arrow::Scalar> TMark::MinScalar(const std::shared_ptr<arrow::DataType>& type) {
+ if (type->id() == arrow::Type::TIMESTAMP) {
+ // TODO: support negative timestamps in index
+ return std::make_shared<arrow::TimestampScalar>(0, type);
+ }
+ return NArrow::MinScalar(type);
+}
+
+NArrow::TReplaceKey TMark::MinBorder(const std::shared_ptr<arrow::Schema>& schema) {
+ Y_VERIFY_DEBUG(schema->num_fields() == 1);
+ return NArrow::TReplaceKey::FromScalar(MinScalar(schema->field(0)->type()));
+}
+
+}
+
template <>
void Out<NKikimr::NOlap::TColumnEngineChanges>(IOutputStream& out, TTypeTraits<NKikimr::NOlap::TColumnEngineChanges>::TFuncParam changes) {
if (ui32 switched = changes.SwitchedPortions.size()) {
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index bb0ef9d7c88..58cbf391acf 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -8,6 +8,7 @@
#include "columns_table.h"
#include "granules_table.h"
+#include <ydb/core/formats/replace_key.h>
#include <ydb/core/tx/columnshard/blob.h>
namespace NKikimr::NOlap {
@@ -29,6 +30,55 @@ struct TCompactionLimits {
ui32 InGranuleCompactSeconds{2 * 60}; // Trigger in-granule comcation to guarantee no PK intersections
};
+struct TMark {
+ /// @note It's possible to share columns in TReplaceKey between multiple marks:
+ /// read all marks as a batch; create TMark for each row
+ NArrow::TReplaceKey Border;
+
+ explicit TMark(const NArrow::TReplaceKey& key)
+ : Border(key)
+ {}
+
+ explicit TMark(const std::shared_ptr<arrow::Schema>& schema)
+ : Border(MinBorder(schema))
+ {}
+
+ TMark(const TString& key, const std::shared_ptr<arrow::Schema>& schema)
+ : Border(Deserialize(key, schema))
+ {}
+
+ TMark(const TMark& m) = default;
+ TMark& operator = (const TMark& m) = default;
+
+ bool operator == (const TMark& m) const {
+ return Border == m.Border;
+ }
+
+ std::partial_ordering operator <=> (const TMark& m) const {
+ return Border <=> m.Border;
+ }
+
+ ui64 Hash() const {
+ return Border.Hash();
+ }
+
+ operator size_t () const {
+ return Hash();
+ }
+
+ operator bool () const {
+ Y_FAIL("unexpected call");
+ }
+
+ static TString Serialize(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema);
+ static NArrow::TReplaceKey Deserialize(const TString& key, const std::shared_ptr<arrow::Schema>& schema);
+ std::string ToString() const;
+
+private:
+ static std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type);
+ static NArrow::TReplaceKey MinBorder(const std::shared_ptr<arrow::Schema>& schema);
+};
+
struct TCompactionInfo {
TSet<ui64> Granules;
bool InGranule{false};
@@ -295,8 +345,8 @@ public:
virtual const THashSet<ui64>* GetOverloadedGranules(ui64 /*pathId*/) const { return nullptr; }
virtual bool HasOverloadedGranules() const { return false; }
- virtual TString SerializeMark(const std::shared_ptr<arrow::Scalar>& scalar) const = 0;
- virtual std::shared_ptr<arrow::Scalar> DeserializeMark(const TString& key) const = 0;
+ virtual TString SerializeMark(const NArrow::TReplaceKey& key) const = 0;
+ virtual NArrow::TReplaceKey DeserializeMark(const TString& key) const = 0;
virtual bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) = 0;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index f8f1008d5b6..668e74a0385 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -12,12 +12,13 @@ namespace NKikimr::NOlap {
namespace {
-using TMark = TColumnEngineForLogs::TMark;
-
-std::shared_ptr<arrow::Array> GetFirstPKColumn(const TIndexInfo& indexInfo,
- const std::shared_ptr<arrow::RecordBatch>& batch) {
- TString columnName = indexInfo.GetPrimaryKey()[0].first;
- return batch->GetColumnByName(std::string(columnName.data(), columnName.size()));
+std::shared_ptr<arrow::RecordBatch> GetEffectiveKey(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const TIndexInfo& indexInfo) {
+ // TODO: composite effective key
+ auto columnName = indexInfo.GetPrimaryKey()[0].first;
+ auto resBatch = NArrow::ExtractColumns(batch, {std::string(columnName.data(), columnName.size())});
+ Y_VERIFY_S(resBatch, "No column '" << columnName << "' in batch " << batch->schema()->ToString());
+ return resBatch;
}
arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) {
@@ -41,16 +42,11 @@ arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) {
return options;
}
-std::shared_ptr<arrow::Scalar> ExtractFirstKey(const std::shared_ptr<TPredicate>& pkPredicate,
- const std::shared_ptr<arrow::Schema>& key) {
+std::optional<NArrow::TReplaceKey> ExtractKey(const std::shared_ptr<TPredicate>& pkPredicate,
+ const std::shared_ptr<arrow::Schema>& key) {
if (pkPredicate) {
Y_VERIFY(pkPredicate->Good());
- Y_VERIFY(key->num_fields() == 1);
- Y_VERIFY(key->field(0)->Equals(pkPredicate->Batch->schema()->field(0)));
-
- auto array = pkPredicate->Batch->column(0);
- Y_VERIFY(array && array->length() == 1);
- return *array->GetScalar(0);
+ return NArrow::TReplaceKey::FromBatch(pkPredicate->Batch, key, 0);
}
return {};
}
@@ -213,7 +209,7 @@ bool InitInGranuleMerge(const TMark& granuleMark, TVector<TPortionInfo>& portion
THashSet<ui64> goodCompacted;
THashSet<ui64> nextToGood;
{
- TMap<TMark, TVector<const TPortionInfo*>> points;
+ TMap<NArrow::TReplaceKey, TVector<const TPortionInfo*>> points;
for (auto& portionInfo : portions) {
if (portionInfo.IsInserted()) {
@@ -225,12 +221,11 @@ bool InitInGranuleMerge(const TMark& granuleMark, TVector<TPortionInfo>& portion
goodCompacted.insert(portionInfo.Portion());
}
- auto start = portionInfo.PkStart();
- auto end = portionInfo.PkEnd();
- Y_VERIFY(start && end);
+ NArrow::TReplaceKey start = portionInfo.EffKeyStart();
+ NArrow::TReplaceKey end = portionInfo.EffKeyEnd();
- points[TMark(start)].push_back(&portionInfo);
- points[TMark(end)].push_back(nullptr);
+ points[start].push_back(&portionInfo);
+ points[end].push_back(nullptr);
}
ui32 countInBucket = 0;
@@ -295,15 +290,13 @@ bool InitInGranuleMerge(const TMark& granuleMark, TVector<TPortionInfo>& portion
// Prevent merge of compacted portions with no intersections
if (filtered.contains(curPortion)) {
- auto start = portionInfo.PkStart();
- Y_VERIFY(start);
+ auto start = portionInfo.EffKeyStart();
borders.emplace_back(TMark(start));
} else {
// nextToGood borders potentially split good compacted portions into 2 parts:
// the first one without intersections and the second with them
if (goodCompacted.contains(curPortion) || nextToGood.contains(curPortion)) {
- auto start = portionInfo.PkStart();
- Y_VERIFY(start);
+ auto start = portionInfo.EffKeyStart();
borders.emplace_back(TMark(start));
}
@@ -337,21 +330,26 @@ SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::vector<std::pair<TMark, ui64>>& granules,
const TIndexInfo& indexInfo)
{
+ Y_VERIFY(batch);
+ if (batch->num_rows() == 0) {
+ return {};
+ }
+
THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> out;
if (granules.size() == 1) {
out.emplace(granules[0].second, batch);
} else {
- const auto keyColumn = GetFirstPKColumn(indexInfo, batch);
- Y_VERIFY(keyColumn && keyColumn->length() > 0);
+ const auto effKey = GetEffectiveKey(batch, indexInfo);
+ Y_VERIFY(effKey->num_columns() && effKey->num_rows());
i64 offset = 0;
for (size_t i = 0; i < granules.size(); ++i) {
const i64 end = (i + 1 == granules.size())
// Just take the number of elements in the key column for the last granule.
- ? keyColumn->length()
+ ? effKey->num_rows()
// Locate position of the next granule in the key.
- : NArrow::LowerBound(keyColumn, *granules[i + 1].first.ToScalar(), offset); // TODO: avoid ToScalar()
+ : NArrow::LowerBound(effKey, granules[i + 1].first.Border, offset);
if (const i64 size = end - offset) {
Y_VERIFY(out.emplace(granules[i].second, batch->Slice(offset, size)).second);
@@ -396,7 +394,7 @@ TColumnEngineForLogs::TMarksGranules::TMarksGranules(const TSelectInfo& selectIn
bool TColumnEngineForLogs::TMarksGranules::MakePrecedingMark(const TIndexInfo& indexInfo) {
ui64 minGranule = 0;
- TMark minMark(indexInfo.GetIndexKey()->field(0)->type());
+ TMark minMark(indexInfo.GetEffectiveKey());
if (Marks.empty()) {
Marks.emplace_back(std::move(minMark), minGranule);
return true;
@@ -434,10 +432,7 @@ TColumnEngineForLogs::TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, con
/// * apply REPLACE by MergeSort
/// * apply PK predicate before REPLACE
IndexInfo.SetAllKeys();
-
- auto& indexKey = IndexInfo.GetIndexKey();
- Y_VERIFY(indexKey->num_fields() == 1);
- MarkType = indexKey->field(0)->type();
+ MarkSchema = IndexInfo.GetEffectiveKey();
ui32 indexId = IndexInfo.GetId();
GranulesTable = std::make_shared<TGranulesTable>(*this, indexId);
@@ -1108,7 +1103,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
for (auto& [granule, p] : changes.NewGranules) {
ui64 pathId = p.first;
TMark mark = p.second;
- TGranuleRecord rec(pathId, granule, snapshot, mark.ToScalar());
+ TGranuleRecord rec(pathId, granule, snapshot, mark.Border);
if (!SetGranule(rec, apply)) {
LOG_S_ERROR("Cannot insert granule " << rec << " at tablet " << TabletId);
@@ -1135,12 +1130,11 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
auto& granuleStart = Granules[granule]->Record.Mark;
if (!apply) { // granule vs portion minPK
- auto portionStart = portionInfo.PkStart();
- Y_VERIFY(portionStart);
- if (TMark(portionStart) < TMark(granuleStart)) {
+ auto portionStart = portionInfo.EffKeyStart();
+ if (portionStart < granuleStart) {
LOG_S_ERROR("Cannot update invalid portion " << portionInfo
- << " start: " << portionStart->ToString()
- << " granule start: " << granuleStart->ToString() << " at tablet " << TabletId);
+ << " start: " << TMark(portionStart).ToString()
+ << " granule start: " << TMark(granuleStart).ToString() << " at tablet " << TabletId);
return false;
}
}
@@ -1249,17 +1243,15 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
}
// granule vs portion minPK
- std::shared_ptr<arrow::Scalar> granuleStart;
- if (Granules.contains(granule)) {
- granuleStart = Granules[granule]->Record.Mark;
- } else {
- granuleStart = changes.NewGranules.find(granule)->second.second.ToScalar();
- }
- auto portionStart = portionInfo.PkStart();
- Y_VERIFY(portionStart);
- if (TMark(portionStart) < TMark(granuleStart)) {
- LOG_S_ERROR("Cannot insert invalid portion " << portionInfo << " start: " << portionStart->ToString()
- << " granule start: " << granuleStart->ToString() << " at tablet " << TabletId);
+ NArrow::TReplaceKey granuleStart = Granules.contains(granule)
+ ? Granules[granule]->Record.Mark
+ : changes.NewGranules.find(granule)->second.second.Border;
+
+ auto portionStart = portionInfo.EffKeyStart();
+ if (portionStart < granuleStart) {
+ LOG_S_ERROR("Cannot insert invalid portion " << portionInfo
+ << " start: " << TMark(portionStart).ToString()
+ << " granule start: " << TMark(granuleStart).ToString() << " at tablet " << TabletId);
return false;
}
}
@@ -1469,13 +1461,13 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
out->Granules.reserve(pathGranules.size());
// TODO: out.Portions.reserve()
- auto keyFrom = ExtractFirstKey(from, GetIndexKey());
- auto keyTo = ExtractFirstKey(to, GetIndexKey());
+ std::optional<NArrow::TReplaceKey> keyFrom = ExtractKey(from, GetIndexKey());
+ std::optional<NArrow::TReplaceKey> keyTo = ExtractKey(to, GetIndexKey());
// Apply FROM
auto it = pathGranules.begin();
if (keyFrom) {
- it = pathGranules.upper_bound(TMark(keyFrom));
+ it = pathGranules.upper_bound(TMark(*keyFrom));
--it;
}
for (; it != pathGranules.end(); ++it) {
@@ -1483,7 +1475,7 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
ui64 granule = it->second;
// Apply TO
- if (keyTo && TMark(keyTo) < mark) {
+ if (keyTo && *keyTo < mark.Border) {
break;
}
@@ -1728,25 +1720,30 @@ SliceGranuleBatches(const TIndexInfo& indexInfo,
const TMark& ts0) {
TVector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> out;
- // Extract unique effective key (timestamp) and their counts
+ // Extract unique effective keys and their counts
i64 numRows = 0;
- TMap<TMark, ui32> uniqKeyCount;
+ TMap<NArrow::TReplaceKey, ui32> uniqKeyCount;
for (auto& batch : batches) {
+ Y_VERIFY(batch);
+ if (batch->num_rows() == 0) {
+ continue;
+ }
+
numRows += batch->num_rows();
- auto keyColumn = GetFirstPKColumn(indexInfo, batch);
- Y_VERIFY(keyColumn && keyColumn->length() > 0);
+ const auto effKey = GetEffectiveKey(batch, indexInfo);
+ Y_VERIFY(effKey->num_columns() && effKey->num_rows());
- for (int pos = 0; pos < keyColumn->length(); ++pos) {
- TMark ts(*keyColumn->GetScalar(pos));
- ++uniqKeyCount[ts];
+ auto effColumns = std::make_shared<NArrow::TArrayVec>(effKey->columns());
+ for (int row = 0; row < effKey->num_rows(); ++row) {
+ ++uniqKeyCount[NArrow::TReplaceKey(effColumns, row)];
}
}
Y_VERIFY(uniqKeyCount.size());
auto minTs = uniqKeyCount.begin()->first;
auto maxTs = uniqKeyCount.rbegin()->first;
- Y_VERIFY(minTs >= ts0);
+ Y_VERIFY(minTs >= ts0.Border);
// It's an estimation of needed count cause numRows calculated before key replaces
ui32 numSplitInto = changes.NumSplitInto(numRows);
@@ -1765,7 +1762,7 @@ SliceGranuleBatches(const TIndexInfo& indexInfo,
}
// Make split borders from uniq keys
- TVector<TMark> borders;
+ TVector<NArrow::TReplaceKey> borders;
borders.reserve(numRows / rowsInGranule);
{
ui32 sumRows = 0;
@@ -1789,12 +1786,12 @@ SliceGranuleBatches(const TIndexInfo& indexInfo,
auto& batchOffsets = offsets[i];
batchOffsets.reserve(borders.size() + 1);
- auto keyColumn = GetFirstPKColumn(indexInfo, batch);
- Y_VERIFY(keyColumn && keyColumn->length() > 0);
+ const auto effKey = GetEffectiveKey(batch, indexInfo);
+ Y_VERIFY(effKey->num_columns() && effKey->num_rows());
batchOffsets.push_back(0);
- for (auto& border : borders) {
- int offset = NArrow::LowerBound(keyColumn, *border.ToScalar(), batchOffsets.back());
+ for (const auto& border : borders) {
+ int offset = NArrow::LowerBound(effKey, border, batchOffsets.back());
Y_VERIFY(offset >= batchOffsets.back());
batchOffsets.push_back(offset);
}
@@ -1826,17 +1823,18 @@ SliceGranuleBatches(const TIndexInfo& indexInfo,
Y_VERIFY(slice->num_rows());
granuleNumRows += slice->num_rows();
#if 1 // Check correctness
- auto keyColumn = GetFirstPKColumn(indexInfo, slice);
- Y_VERIFY(keyColumn && keyColumn->length() > 0);
+ const auto effKey = GetEffectiveKey(slice, indexInfo);
+ Y_VERIFY(effKey->num_columns() && effKey->num_rows());
auto startKey = granuleNo ? borders[granuleNo - 1] : minTs;
- Y_VERIFY(TMark(*keyColumn->GetScalar(0)) >= startKey);
+ Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey);
+ NArrow::TReplaceKey lastSliceKey = NArrow::TReplaceKey::FromBatch(effKey, effKey->num_rows() - 1);
if (granuleNo < borders.size() - 1) {
- auto endKey = borders[granuleNo];
- Y_VERIFY(TMark(*keyColumn->GetScalar(keyColumn->length() - 1)) < endKey);
+ const auto& endKey = borders[granuleNo];
+ Y_VERIFY(lastSliceKey < endKey);
} else {
- Y_VERIFY(TMark(*keyColumn->GetScalar(keyColumn->length() - 1)) <= maxTs);
+ Y_VERIFY(lastSliceKey <= maxTs);
}
#endif
Y_VERIFY_DEBUG(NArrow::IsSorted(slice, indexInfo.GetReplaceKey()));
@@ -1852,16 +1850,17 @@ SliceGranuleBatches(const TIndexInfo& indexInfo,
for (auto& batch : merged) {
Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey()));
- auto startKey = ts0;
+ auto startKey = ts0.Border;
if (granuleNo) {
startKey = borders[granuleNo - 1];
}
#if 1 // Check correctness
- auto keyColumn = GetFirstPKColumn(indexInfo, batch);
- Y_VERIFY(keyColumn && keyColumn->length() > 0);
- Y_VERIFY(TMark(*keyColumn->GetScalar(0)) >= startKey);
+ const auto effKey = GetEffectiveKey(batch, indexInfo);
+ Y_VERIFY(effKey->num_columns() && effKey->num_rows());
+
+ Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey);
#endif
- out.emplace_back(startKey, batch);
+ out.emplace_back(TMark(startKey), batch);
}
}
@@ -1897,11 +1896,11 @@ static ui64 TryMovePortions(const TMark& ts0,
}
// Order compacted portions by primary key.
std::sort(compacted.begin(), compacted.end(), [](const TPortionInfo* a, const TPortionInfo* b) {
- return NArrow::ScalarLess(*a->PkStart(), *b->PkStart());
+ return a->EffKeyStart() < b->EffKeyStart();
});
// Check that there are no gaps between two adjacent portions in term of primary key range.
for (size_t i = 0; i < compacted.size() - 1; ++i) {
- if (!NArrow::ScalarLess(*compacted[i]->PkEnd(), *compacted[i + 1]->PkStart())) {
+ if (compacted[i]->EffKeyEnd() >= compacted[i + 1]->EffKeyStart()) {
return 0;
}
}
@@ -1913,7 +1912,7 @@ static ui64 TryMovePortions(const TMark& ts0,
ui32 rows = portionInfo->NumRows();
Y_VERIFY(rows);
numRows += rows;
- tsIds.emplace_back((counter ? TMark(portionInfo->PkStart()) : ts0), counter + 1);
+ tsIds.emplace_back((counter ? TMark(portionInfo->EffKeyStart()) : ts0), counter + 1);
toMove.emplace_back(std::move(*portionInfo), counter);
++counter;
// Ensure that std::move will take an effect.
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 04bb5e04f40..396daf9cab8 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -1,6 +1,5 @@
#pragma once
#include "defs.h"
-#include <ydb/core/formats/replace_key.h>
#include "column_engine.h"
#include "scalars.h"
@@ -20,87 +19,6 @@ class TCountersTable;
/// - Columns: granule -> blobs
class TColumnEngineForLogs : public IColumnEngine {
public:
- struct TMark {
- // TODO: Grouped marks. Share columns in TReplaceKey between multiple marks.
- NArrow::TReplaceKey Border;
-
- explicit TMark(const std::shared_ptr<arrow::Scalar>& s)
- : Border(FromScalar(s))
- {}
-
- explicit TMark(const std::shared_ptr<arrow::DataType>& type)
- : Border(MinBorder(type))
- {}
-
- TMark(const TString& key, const std::shared_ptr<arrow::DataType>& type)
- : Border(FromScalar(DeserializeKeyScalar(key, type)))
- {}
-
- TMark(const TMark& m) = default;
- TMark& operator = (const TMark& m) = default;
-
- bool operator == (const TMark& m) const {
- return Border == m.Border;
- }
-
- std::partial_ordering operator <=> (const TMark& m) const {
- return Border <=> m.Border;
- }
-
- ui64 Hash() const {
- return Border.Hash();
- }
-
- operator size_t () const {
- return Hash();
- }
-
- operator bool () const {
- Y_VERIFY(false);
- }
-
- TString Serialize() const {
- return SerializeKeyScalar(ToScalar(Border));
- }
-
- void Deserialize(const TString& key, const std::shared_ptr<arrow::DataType>& type) {
- Border = FromScalar(DeserializeKeyScalar(key, type));
- }
-
- std::shared_ptr<arrow::Scalar> ToScalar() const {
- return ToScalar(Border);
- }
-
- private:
- static NArrow::TReplaceKey FromScalar(const std::shared_ptr<arrow::Scalar>& s) {
- Y_VERIFY_DEBUG(NArrow::IsGoodScalar(s));
- auto res = MakeArrayFromScalar(*s, 1);
- Y_VERIFY(res.status().ok(), "%s", res.status().ToString().c_str());
- return NArrow::TReplaceKey(std::make_shared<NArrow::TArrayVec>(1, *res), 0);
- }
-
- static std::shared_ptr<arrow::Scalar> ToScalar(const NArrow::TReplaceKey& key) {
- Y_VERIFY_DEBUG(key.Size() == 1);
- auto& column = key.Column(0);
- auto res = column.GetScalar(key.GetPosition());
- Y_VERIFY(res.status().ok(), "%s", res.status().ToString().c_str());
- Y_VERIFY_DEBUG(NArrow::IsGoodScalar(*res));
- return *res;
- }
-
- static std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type) {
- if (type->id() == arrow::Type::TIMESTAMP) {
- // TODO: support negative timestamps in index
- return std::make_shared<arrow::TimestampScalar>(0, type);
- }
- return NArrow::MinScalar(type);
- }
-
- static NArrow::TReplaceKey MinBorder(const std::shared_ptr<arrow::DataType>& type) {
- return FromScalar(MinScalar(type));
- }
- };
-
class TMarksGranules {
public:
using TPair = std::pair<TMark, ui64>;
@@ -265,17 +183,16 @@ public:
bool HasOverloadedGranules() const override { return !PathsGranulesOverloaded.empty(); }
- TString SerializeMark(const std::shared_ptr<arrow::Scalar>& scalar) const override {
- Y_VERIFY_S(scalar->type->Equals(MarkType), scalar->type->ToString() + ", expected " + MarkType->ToString());
- return TMark(scalar).Serialize();
+ TString SerializeMark(const NArrow::TReplaceKey& key) const override {
+ return TMark::Serialize(key, MarkSchema);
}
- std::shared_ptr<arrow::Scalar> DeserializeMark(const TString& key) const override {
- return TMark(key, MarkType).ToScalar();
+ NArrow::TReplaceKey DeserializeMark(const TString& key) const override {
+ return TMark::Deserialize(key, MarkSchema);
}
TMark GetDefaultMark() const {
- return TMark(MarkType);
+ return TMark(MarkSchema);
}
bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) override;
@@ -329,7 +246,7 @@ private:
TIndexInfo IndexInfo;
TCompactionLimits Limits;
ui64 TabletId;
- std::shared_ptr<arrow::DataType> MarkType;
+ std::shared_ptr<arrow::Schema> MarkSchema;
std::shared_ptr<TGranulesTable> GranulesTable;
std::shared_ptr<TColumnsTable> ColumnsTable;
std::shared_ptr<TCountersTable> CountersTable;
diff --git a/ydb/core/tx/columnshard/engines/granules_table.h b/ydb/core/tx/columnshard/engines/granules_table.h
index 8940a7d3ce5..e2ce21dbc74 100644
--- a/ydb/core/tx/columnshard/engines/granules_table.h
+++ b/ydb/core/tx/columnshard/engines/granules_table.h
@@ -1,5 +1,6 @@
#pragma once
#include "db_wrapper.h"
+#include <ydb/core/formats/replace_key.h>
namespace NKikimr::NOlap {
@@ -7,19 +8,19 @@ struct TGranuleRecord {
ui64 PathId;
ui64 Granule;
TSnapshot CreatedAt;
- std::shared_ptr<arrow::Scalar> Mark;
+ NArrow::TReplaceKey Mark;
- TGranuleRecord(ui64 pathId, ui64 granule, const TSnapshot& createdAt, const std::shared_ptr<arrow::Scalar>& mark)
+ TGranuleRecord(ui64 pathId, ui64 granule, const TSnapshot& createdAt, const NArrow::TReplaceKey& mark)
: PathId(pathId)
, Granule(granule)
, CreatedAt(createdAt)
, Mark(mark)
{
- Y_VERIFY(Mark);
+ Y_VERIFY(Mark.Size());
}
bool operator == (const TGranuleRecord& rec) const {
- return (PathId == rec.PathId) && (Mark->Equals(*rec.Mark));
+ return (PathId == rec.PathId) && (Mark == rec.Mark);
}
friend IOutputStream& operator << (IOutputStream& out, const TGranuleRecord& rec) {
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 8e67436a8da..293cb0cbaac 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -88,6 +88,12 @@ public:
const std::shared_ptr<arrow::Schema>& GetExtendedKey() const { return ExtendedKey; }
const std::shared_ptr<arrow::Schema>& GetIndexKey() const { return IndexKey; }
+ const std::shared_ptr<arrow::Schema> GetEffectiveKey() const {
+ // TODO: composite key
+ Y_VERIFY(IndexKey->num_fields() == 1);
+ return std::make_shared<arrow::Schema>(arrow::FieldVector{GetIndexKey()->field(0)});
+ }
+
/// Initializes sorting, replace, index and extended keys.
void SetAllKeys();
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index d3908ead497..1493ab4988c 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -4,6 +4,9 @@
#include "columns_table.h"
#include "index_info.h"
+#include <ydb/core/formats/replace_key.h>
+
+
namespace NKikimr::NOlap {
struct TPortionMeta {
@@ -156,20 +159,16 @@ struct TPortionInfo {
std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const;
std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const;
- std::shared_ptr<arrow::Scalar> PkStart() const {
- if (FirstPkColumn) {
- Y_VERIFY(Meta.ColumnMeta.contains(FirstPkColumn));
- return MinValue(FirstPkColumn);
- }
- return {};
+ NArrow::TReplaceKey EffKeyStart() const {
+ Y_VERIFY(FirstPkColumn);
+ Y_VERIFY(Meta.ColumnMeta.contains(FirstPkColumn));
+ return NArrow::TReplaceKey::FromScalar(MinValue(FirstPkColumn));
}
- std::shared_ptr<arrow::Scalar> PkEnd() const {
- if (FirstPkColumn) {
- Y_VERIFY(Meta.ColumnMeta.contains(FirstPkColumn));
- return MaxValue(FirstPkColumn);
- }
- return {};
+ NArrow::TReplaceKey EffKeyEnd() const {
+ Y_VERIFY(FirstPkColumn);
+ Y_VERIFY(Meta.ColumnMeta.contains(FirstPkColumn));
+ return NArrow::TReplaceKey::FromScalar(MaxValue(FirstPkColumn));
}
ui32 NumRows() const {