diff options
author | chertus <azuikov@ydb.tech> | 2023-05-11 18:04:35 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-05-11 18:04:35 +0300 |
commit | 2936c884291ac3d6a3863cc357c3646ba1698543 (patch) | |
tree | 8526e4cc93d776c10beda6126215f1ab3dbdc0d3 | |
parent | 27c981a547473853468293ef69a23f7a3b766edb (diff) | |
download | ydb-2936c884291ac3d6a3863cc357c3646ba1698543.tar.gz |
composite key serialization in ColumnShard
-rw-r--r-- | ydb/core/formats/arrow/replace_key.h | 17 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.cpp | 52 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 13 |
4 files changed, 65 insertions, 29 deletions
diff --git a/ydb/core/formats/arrow/replace_key.h b/ydb/core/formats/arrow/replace_key.h index 5c5a6804e7..1e6ba85778 100644 --- a/ydb/core/formats/arrow/replace_key.h +++ b/ydb/core/formats/arrow/replace_key.h @@ -114,6 +114,17 @@ public: } template<typename T = TArrayVecPtr> requires IsOwning + std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Schema>& schema) const { + Y_VERIFY(Size() && Size() == schema->num_fields()); + const auto& columns = *Columns; + i64 numRows = columns[0]->length(); + Y_VERIFY(Position < numRows); + + auto batch = arrow::RecordBatch::Make(schema, numRows, columns); + return batch->Slice(Position, 1); + } + + template<typename T = TArrayVecPtr> requires IsOwning static TReplaceKeyTemplate<TArrayVecPtr> FromBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& key, int row) { Y_VERIFY(key->num_fields() <= batch->num_columns()); @@ -144,9 +155,9 @@ public: return TReplaceKeyTemplate<TArrayVecPtr>(std::make_shared<TArrayVec>(1, *res), 0); } - static std::shared_ptr<arrow::Scalar> ToScalar(const TReplaceKeyTemplate<TArrayVecPtr>& key) { - Y_VERIFY_DEBUG(key.Size() == 1); - auto& column = key.Column(0); + static std::shared_ptr<arrow::Scalar> ToScalar(const TReplaceKeyTemplate<TArrayVecPtr>& key, int colNumber = 0) { + Y_VERIFY_DEBUG(colNumber < key.Size()); + auto& column = key.Column(colNumber); auto res = column.GetScalar(key.GetPosition()); Y_VERIFY(res.status().ok(), "%s", res.status().ToString().c_str()); Y_VERIFY_DEBUG(IsGoodScalar(*res)); diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp index 16a55b847c..7cdc7c9144 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine.cpp @@ -3,29 +3,45 @@ namespace NKikimr::NOlap { -TString TMark::Serialize(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema) { - Y_VERIFY_DEBUG(key.Size() > 0); - if (key.Size() == 1) { - Y_VERIFY_S(key.Column(0).type()->Equals(schema->field(0)->type()), - key.Column(0).type()->ToString() + ", expected " + schema->ToString()); - return SerializeKeyScalar(NArrow::TReplaceKey::ToScalar(key)); - } else { - Y_FAIL("not implemented"); // TODO - } +TString TMark::SerializeScalar(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema) { + Y_VERIFY(key.Size() == 1); + Y_VERIFY_S(key.Column(0).type()->Equals(schema->field(0)->type()), + key.Column(0).type()->ToString() + ", expected " + schema->ToString()); + return SerializeKeyScalar(NArrow::TReplaceKey::ToScalar(key)); } -NArrow::TReplaceKey TMark::Deserialize(const TString& key, const std::shared_ptr<arrow::Schema>& schema) { - Y_VERIFY_DEBUG(schema->num_fields() > 0); - if (schema->num_fields() == 1) { - return NArrow::TReplaceKey::FromScalar(DeserializeKeyScalar(key, schema->field(0)->type())); - } else { - Y_FAIL("not implemented"); // TODO - } +NArrow::TReplaceKey TMark::DeserializeScalar(const TString& key, const std::shared_ptr<arrow::Schema>& schema) { + Y_VERIFY(schema->num_fields() == 1); + return NArrow::TReplaceKey::FromScalar(DeserializeKeyScalar(key, schema->field(0)->type())); +} + +TString TMark::SerializeComposite(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema) { + auto batch = key.ToBatch(schema); + Y_VERIFY(batch && batch->num_rows() == 1); + return NArrow::SerializeBatchNoCompression(batch); +} + +NArrow::TReplaceKey TMark::DeserializeComposite(const TString& key, const std::shared_ptr<arrow::Schema>& schema) { + auto batch = NArrow::DeserializeBatch(key, schema); + Y_VERIFY(batch && batch->num_rows() == 1); + return NArrow::TReplaceKey::FromBatch(batch, 0); } std::string TMark::ToString() const { - Y_VERIFY_DEBUG(Border.Size() == 1); - return NArrow::TReplaceKey::ToScalar(Border)->ToString(); + if (Border.Size() == 1) { + return NArrow::TReplaceKey::ToScalar(Border)->ToString(); + } else { + TStringBuilder out; + out << "("; + for (int i = 0; i < Border.Size(); ++i) { + if (i) { + out << ", "; + } + out << NArrow::TReplaceKey::ToScalar(Border, i)->ToString(); + } + out << ")"; + return out; + } } std::shared_ptr<arrow::Scalar> TMark::MinScalar(const std::shared_ptr<arrow::DataType>& type) { diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index faac7b563e..1941fe4567 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -44,10 +44,6 @@ struct TMark { : Border(MinBorder(schema)) {} - TMark(const TString& key, const std::shared_ptr<arrow::Schema>& schema) - : Border(Deserialize(key, schema)) - {} - TMark(const TMark& m) = default; TMark& operator = (const TMark& m) = default; @@ -69,8 +65,12 @@ struct TMark { operator bool () const = delete; - static TString Serialize(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema); - static NArrow::TReplaceKey Deserialize(const TString& key, const std::shared_ptr<arrow::Schema>& schema); + static TString SerializeScalar(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema); + static NArrow::TReplaceKey DeserializeScalar(const TString& key, const std::shared_ptr<arrow::Schema>& schema); + + static TString SerializeComposite(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema); + static NArrow::TReplaceKey DeserializeComposite(const TString& key, const std::shared_ptr<arrow::Schema>& schema); + std::string ToString() const; private: diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index ff1a042488..df2ba006f8 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -191,11 +191,19 @@ public: bool HasOverloadedGranules() const override { return !PathsGranulesOverloaded.empty(); } TString SerializeMark(const NArrow::TReplaceKey& key) const override { - return TMark::Serialize(key, MarkSchema); + if (UseCompositeMarks) { + return TMark::SerializeComposite(key, MarkSchema); + } else { + return TMark::SerializeScalar(key, MarkSchema); + } } NArrow::TReplaceKey DeserializeMark(const TString& key) const override { - return TMark::Deserialize(key, MarkSchema); + if (UseCompositeMarks) { + return TMark::DeserializeComposite(key, MarkSchema); + } else { + return TMark::DeserializeScalar(key, MarkSchema); + } } const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const override; @@ -263,6 +271,7 @@ private: ui64 LastPortion; ui64 LastGranule; TSnapshot LastSnapshot = TSnapshot::Zero(); + bool UseCompositeMarks = false; private: void ClearIndex() { |