aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-05-11 18:04:35 +0300
committerchertus <azuikov@ydb.tech>2023-05-11 18:04:35 +0300
commit2936c884291ac3d6a3863cc357c3646ba1698543 (patch)
tree8526e4cc93d776c10beda6126215f1ab3dbdc0d3
parent27c981a547473853468293ef69a23f7a3b766edb (diff)
downloadydb-2936c884291ac3d6a3863cc357c3646ba1698543.tar.gz
composite key serialization in ColumnShard
-rw-r--r--ydb/core/formats/arrow/replace_key.h17
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.cpp52
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h12
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h13
4 files changed, 65 insertions, 29 deletions
diff --git a/ydb/core/formats/arrow/replace_key.h b/ydb/core/formats/arrow/replace_key.h
index 5c5a6804e7..1e6ba85778 100644
--- a/ydb/core/formats/arrow/replace_key.h
+++ b/ydb/core/formats/arrow/replace_key.h
@@ -114,6 +114,17 @@ public:
}
template<typename T = TArrayVecPtr> requires IsOwning
+ std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Schema>& schema) const {
+ Y_VERIFY(Size() && Size() == schema->num_fields());
+ const auto& columns = *Columns;
+ i64 numRows = columns[0]->length();
+ Y_VERIFY(Position < numRows);
+
+ auto batch = arrow::RecordBatch::Make(schema, numRows, columns);
+ return batch->Slice(Position, 1);
+ }
+
+ template<typename T = TArrayVecPtr> requires IsOwning
static TReplaceKeyTemplate<TArrayVecPtr> FromBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& key, int row) {
Y_VERIFY(key->num_fields() <= batch->num_columns());
@@ -144,9 +155,9 @@ public:
return TReplaceKeyTemplate<TArrayVecPtr>(std::make_shared<TArrayVec>(1, *res), 0);
}
- static std::shared_ptr<arrow::Scalar> ToScalar(const TReplaceKeyTemplate<TArrayVecPtr>& key) {
- Y_VERIFY_DEBUG(key.Size() == 1);
- auto& column = key.Column(0);
+ static std::shared_ptr<arrow::Scalar> ToScalar(const TReplaceKeyTemplate<TArrayVecPtr>& key, int colNumber = 0) {
+ Y_VERIFY_DEBUG(colNumber < key.Size());
+ auto& column = key.Column(colNumber);
auto res = column.GetScalar(key.GetPosition());
Y_VERIFY(res.status().ok(), "%s", res.status().ToString().c_str());
Y_VERIFY_DEBUG(IsGoodScalar(*res));
diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp
index 16a55b847c..7cdc7c9144 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine.cpp
@@ -3,29 +3,45 @@
namespace NKikimr::NOlap {
-TString TMark::Serialize(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema) {
- Y_VERIFY_DEBUG(key.Size() > 0);
- if (key.Size() == 1) {
- Y_VERIFY_S(key.Column(0).type()->Equals(schema->field(0)->type()),
- key.Column(0).type()->ToString() + ", expected " + schema->ToString());
- return SerializeKeyScalar(NArrow::TReplaceKey::ToScalar(key));
- } else {
- Y_FAIL("not implemented"); // TODO
- }
+TString TMark::SerializeScalar(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema) {
+ Y_VERIFY(key.Size() == 1);
+ Y_VERIFY_S(key.Column(0).type()->Equals(schema->field(0)->type()),
+ key.Column(0).type()->ToString() + ", expected " + schema->ToString());
+ return SerializeKeyScalar(NArrow::TReplaceKey::ToScalar(key));
}
-NArrow::TReplaceKey TMark::Deserialize(const TString& key, const std::shared_ptr<arrow::Schema>& schema) {
- Y_VERIFY_DEBUG(schema->num_fields() > 0);
- if (schema->num_fields() == 1) {
- return NArrow::TReplaceKey::FromScalar(DeserializeKeyScalar(key, schema->field(0)->type()));
- } else {
- Y_FAIL("not implemented"); // TODO
- }
+NArrow::TReplaceKey TMark::DeserializeScalar(const TString& key, const std::shared_ptr<arrow::Schema>& schema) {
+ Y_VERIFY(schema->num_fields() == 1);
+ return NArrow::TReplaceKey::FromScalar(DeserializeKeyScalar(key, schema->field(0)->type()));
+}
+
+TString TMark::SerializeComposite(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema) {
+ auto batch = key.ToBatch(schema);
+ Y_VERIFY(batch && batch->num_rows() == 1);
+ return NArrow::SerializeBatchNoCompression(batch);
+}
+
+NArrow::TReplaceKey TMark::DeserializeComposite(const TString& key, const std::shared_ptr<arrow::Schema>& schema) {
+ auto batch = NArrow::DeserializeBatch(key, schema);
+ Y_VERIFY(batch && batch->num_rows() == 1);
+ return NArrow::TReplaceKey::FromBatch(batch, 0);
}
std::string TMark::ToString() const {
- Y_VERIFY_DEBUG(Border.Size() == 1);
- return NArrow::TReplaceKey::ToScalar(Border)->ToString();
+ if (Border.Size() == 1) {
+ return NArrow::TReplaceKey::ToScalar(Border)->ToString();
+ } else {
+ TStringBuilder out;
+ out << "(";
+ for (int i = 0; i < Border.Size(); ++i) {
+ if (i) {
+ out << ", ";
+ }
+ out << NArrow::TReplaceKey::ToScalar(Border, i)->ToString();
+ }
+ out << ")";
+ return out;
+ }
}
std::shared_ptr<arrow::Scalar> TMark::MinScalar(const std::shared_ptr<arrow::DataType>& type) {
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index faac7b563e..1941fe4567 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -44,10 +44,6 @@ struct TMark {
: Border(MinBorder(schema))
{}
- TMark(const TString& key, const std::shared_ptr<arrow::Schema>& schema)
- : Border(Deserialize(key, schema))
- {}
-
TMark(const TMark& m) = default;
TMark& operator = (const TMark& m) = default;
@@ -69,8 +65,12 @@ struct TMark {
operator bool () const = delete;
- static TString Serialize(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema);
- static NArrow::TReplaceKey Deserialize(const TString& key, const std::shared_ptr<arrow::Schema>& schema);
+ static TString SerializeScalar(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema);
+ static NArrow::TReplaceKey DeserializeScalar(const TString& key, const std::shared_ptr<arrow::Schema>& schema);
+
+ static TString SerializeComposite(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema);
+ static NArrow::TReplaceKey DeserializeComposite(const TString& key, const std::shared_ptr<arrow::Schema>& schema);
+
std::string ToString() const;
private:
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index ff1a042488..df2ba006f8 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -191,11 +191,19 @@ public:
bool HasOverloadedGranules() const override { return !PathsGranulesOverloaded.empty(); }
TString SerializeMark(const NArrow::TReplaceKey& key) const override {
- return TMark::Serialize(key, MarkSchema);
+ if (UseCompositeMarks) {
+ return TMark::SerializeComposite(key, MarkSchema);
+ } else {
+ return TMark::SerializeScalar(key, MarkSchema);
+ }
}
NArrow::TReplaceKey DeserializeMark(const TString& key) const override {
- return TMark::Deserialize(key, MarkSchema);
+ if (UseCompositeMarks) {
+ return TMark::DeserializeComposite(key, MarkSchema);
+ } else {
+ return TMark::DeserializeScalar(key, MarkSchema);
+ }
}
const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const override;
@@ -263,6 +271,7 @@ private:
ui64 LastPortion;
ui64 LastGranule;
TSnapshot LastSnapshot = TSnapshot::Zero();
+ bool UseCompositeMarks = false;
private:
void ClearIndex() {