diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-01-19 22:54:49 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-01-19 22:54:49 +0300 |
commit | 899159c396b4ab1dbc5a20f53a110fe8de786c28 (patch) | |
tree | 8917eba40e21f15ac0fb3c1a4d9c014454ba253e | |
parent | 8cc1ea5a1a1ed25b81d80addf426175e1a1640be (diff) | |
download | ydb-899159c396b4ab1dbc5a20f53a110fe8de786c28.tar.gz |
(refactoring) Introduce TChangeRecordBodySerializer
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_collector_base.cpp | 79 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_collector_base.h | 17 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record_body_serializer.cpp | 94 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record_body_serializer.h | 29 |
9 files changed, 135 insertions, 99 deletions
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin.txt b/ydb/core/tx/datashard/CMakeLists.darwin.txt index 9d58b961f0..22870fc754 100644 --- a/ydb/core/tx/datashard/CMakeLists.darwin.txt +++ b/ydb/core/tx/datashard/CMakeLists.darwin.txt @@ -96,6 +96,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_body_serializer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt index 1ff95f10b0..d8d6b37b56 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt @@ -97,6 +97,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_body_serializer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux.txt b/ydb/core/tx/datashard/CMakeLists.linux.txt index 1ff95f10b0..d8d6b37b56 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux.txt @@ -97,6 +97,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_body_serializer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp diff --git a/ydb/core/tx/datashard/change_collector_base.cpp b/ydb/core/tx/datashard/change_collector_base.cpp index 2bbe8daf6e..4af91f658d 100644 --- a/ydb/core/tx/datashard/change_collector_base.cpp +++ b/ydb/core/tx/datashard/change_collector_base.cpp @@ -55,85 +55,6 @@ void TBaseChangeCollector::Reset() { Collected.clear(); } -void TBaseChangeCollector::SerializeCells(TSerializedCells& out, TArrayRef<const TRawTypeValue> in, TArrayRef<const TTag> tags) { - Y_VERIFY_S(in.size() == tags.size(), "Count doesn't match" - << ": in# " << in.size() - << ", tags# " << tags.size()); - - TVector<TCell> cells(Reserve(in.size())); - for (size_t i = 0; i < in.size(); ++i) { - out.AddTags(tags.at(i)); - cells.emplace_back(in.at(i).AsRef()); - } - - out.SetData(TSerializedCellVec::Serialize(cells)); -} - -void TBaseChangeCollector::SerializeCells(TSerializedCells& out, TArrayRef<const TUpdateOp> in) { - if (!in) { - return; - } - - TVector<TCell> cells(Reserve(in.size())); - for (const auto& op : in) { - Y_VERIFY_S(op.Op == ECellOp::Set, "Unexpected cell op: " << op.Op.Raw()); - - out.AddTags(op.Tag); - cells.emplace_back(op.AsCell()); - } - - out.SetData(TSerializedCellVec::Serialize(cells)); -} - -void TBaseChangeCollector::SerializeCells(TSerializedCells& out, const TRowState& state, TArrayRef<const TTag> tags) { - Y_VERIFY_S(state.Size() == tags.size(), "Count doesn't match" - << ": state# " << state.Size() - << ", tags# " << tags.size()); - - TVector<TCell> cells(Reserve(state.Size())); - for (TPos pos = 0; pos < state.Size(); ++pos) { - out.AddTags(tags.at(pos)); - cells.emplace_back(state.Get(pos)); - } - - out.SetData(TSerializedCellVec::Serialize(cells)); -} - -void TBaseChangeCollector::Serialize(TDataChange& out, ERowOp rop, - TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags, TArrayRef<const TUpdateOp> updates) -{ - SerializeCells(*out.MutableKey(), key, keyTags); - - switch (rop) { - case ERowOp::Upsert: - SerializeCells(*out.MutableUpsert(), updates); - break; - case ERowOp::Erase: - out.MutableErase(); - break; - case ERowOp::Reset: - SerializeCells(*out.MutableReset(), updates); - break; - default: - Y_FAIL_S("Unsupported row op: " << static_cast<ui8>(rop)); - } -} - -void TBaseChangeCollector::Serialize(TDataChange& out, ERowOp rop, - TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags, - const TRowState* oldState, const TRowState* newState, TArrayRef<const TTag> valueTags) -{ - Serialize(out, rop, key, keyTags, {}); - - if (oldState) { - SerializeCells(*out.MutableOldImage(), *oldState, valueTags); - } - - if (newState) { - SerializeCells(*out.MutableNewImage(), *newState, valueTags); - } -} - void TBaseChangeCollector::Persist( const TTableId& tableId, // origin table const TPathId& pathId, // target object (table, stream, etc...) diff --git a/ydb/core/tx/datashard/change_collector_base.h b/ydb/core/tx/datashard/change_collector_base.h index 82ba562d5d..361389d6b3 100644 --- a/ydb/core/tx/datashard/change_collector_base.h +++ b/ydb/core/tx/datashard/change_collector_base.h @@ -1,6 +1,7 @@ #pragma once #include "change_record.h" +#include "change_record_body_serializer.h" #include <ydb/core/engine/minikql/change_collector_iface.h> #include <ydb/core/protos/change_exchange.pb.h> @@ -19,21 +20,13 @@ public: virtual void SetGroup(ui64 group) = 0; }; -class TBaseChangeCollector: public IBaseChangeCollector { +class TBaseChangeCollector + : public IBaseChangeCollector + , protected TChangeRecordBodySerializer +{ using TDataChange = NKikimrChangeExchange::TChangeRecord::TDataChange; - using TSerializedCells = TDataChange::TSerializedCells; - - static void SerializeCells(TSerializedCells& out, TArrayRef<const TRawTypeValue> in, TArrayRef<const NTable::TTag> tags); - static void SerializeCells(TSerializedCells& out, TArrayRef<const NTable::TUpdateOp> in); - static void SerializeCells(TSerializedCells& out, const NTable::TRowState& state, TArrayRef<const NTable::TTag> tags); protected: - static void Serialize(TDataChange& out, NTable::ERowOp rop, - TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags, TArrayRef<const NTable::TUpdateOp> updates); - static void Serialize(TDataChange& out, NTable::ERowOp rop, - TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags, - const NTable::TRowState* oldState, const NTable::TRowState* newState, TArrayRef<const NTable::TTag> valueTags); - void Persist(const TTableId& tableId, const TPathId& pathId, TChangeRecord::EKind kind, const TDataChange& body); public: diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index d70f9f02a0..cba5084194 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -13,8 +13,7 @@ #include <util/stream/str.h> -namespace NKikimr { -namespace NDataShard { +namespace NKikimr::NDataShard { void TChangeRecord::SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const { record.SetOrder(Order); @@ -347,5 +346,4 @@ TChangeRecord&& TChangeRecordBuilder::Build() { return std::move(Record); } -} // NDataShard -} // NKikimr +} diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index 8ecad30b32..decfa6dce6 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -14,8 +14,7 @@ namespace NKikimrChangeExchange { class TChangeRecord; } -namespace NKikimr { -namespace NDataShard { +namespace NKikimr::NDataShard { class TChangeRecordBuilder; @@ -101,8 +100,7 @@ private: }; // TChangeRecordBuilder -} // NDataShard -} // NKikimr +} Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TChangeRecord, out, value) { return value.Out(out); diff --git a/ydb/core/tx/datashard/change_record_body_serializer.cpp b/ydb/core/tx/datashard/change_record_body_serializer.cpp new file mode 100644 index 0000000000..b1227992b0 --- /dev/null +++ b/ydb/core/tx/datashard/change_record_body_serializer.cpp @@ -0,0 +1,94 @@ +#include "change_record_body_serializer.h" + +#include <ydb/core/util/yverify_stream.h> + +namespace NKikimr::NDataShard { + +using namespace NTable; + +void TChangeRecordBodySerializer::SerializeCells(TSerializedCells& out, + TArrayRef<const TRawTypeValue> in, TArrayRef<const TTag> tags) +{ + Y_VERIFY_S(in.size() == tags.size(), "Count doesn't match" + << ": in# " << in.size() + << ", tags# " << tags.size()); + + TVector<TCell> cells(Reserve(in.size())); + for (size_t i = 0; i < in.size(); ++i) { + out.AddTags(tags.at(i)); + cells.emplace_back(in.at(i).AsRef()); + } + + out.SetData(TSerializedCellVec::Serialize(cells)); +} + +void TChangeRecordBodySerializer::SerializeCells(TSerializedCells& out, + TArrayRef<const TUpdateOp> in) +{ + if (!in) { + return; + } + + TVector<TCell> cells(Reserve(in.size())); + for (const auto& op : in) { + Y_VERIFY_S(op.Op == ECellOp::Set, "Unexpected cell op: " << op.Op.Raw()); + + out.AddTags(op.Tag); + cells.emplace_back(op.AsCell()); + } + + out.SetData(TSerializedCellVec::Serialize(cells)); +} + +void TChangeRecordBodySerializer::SerializeCells(TSerializedCells& out, + const TRowState& state, TArrayRef<const TTag> tags) +{ + Y_VERIFY_S(state.Size() == tags.size(), "Count doesn't match" + << ": state# " << state.Size() + << ", tags# " << tags.size()); + + TVector<TCell> cells(Reserve(state.Size())); + for (TPos pos = 0; pos < state.Size(); ++pos) { + out.AddTags(tags.at(pos)); + cells.emplace_back(state.Get(pos)); + } + + out.SetData(TSerializedCellVec::Serialize(cells)); +} + +void TChangeRecordBodySerializer::Serialize(TDataChange& out, ERowOp rop, + TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags, TArrayRef<const TUpdateOp> updates) +{ + SerializeCells(*out.MutableKey(), key, keyTags); + + switch (rop) { + case ERowOp::Upsert: + SerializeCells(*out.MutableUpsert(), updates); + break; + case ERowOp::Erase: + out.MutableErase(); + break; + case ERowOp::Reset: + SerializeCells(*out.MutableReset(), updates); + break; + default: + Y_FAIL_S("Unsupported row op: " << static_cast<ui8>(rop)); + } +} + +void TChangeRecordBodySerializer::Serialize(TDataChange& out, ERowOp rop, + TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags, + const TRowState* oldState, const TRowState* newState, TArrayRef<const TTag> valueTags) +{ + Serialize(out, rop, key, keyTags, {}); + + if (oldState) { + SerializeCells(*out.MutableOldImage(), *oldState, valueTags); + } + + if (newState) { + SerializeCells(*out.MutableNewImage(), *newState, valueTags); + } +} + +} diff --git a/ydb/core/tx/datashard/change_record_body_serializer.h b/ydb/core/tx/datashard/change_record_body_serializer.h new file mode 100644 index 0000000000..29f6b53deb --- /dev/null +++ b/ydb/core/tx/datashard/change_record_body_serializer.h @@ -0,0 +1,29 @@ +#pragma once + +#include <ydb/core/protos/change_exchange.pb.h> +#include <ydb/core/scheme_types/scheme_raw_type_value.h> +#include <ydb/core/tablet_flat/flat_database.h> + +namespace NKikimr::NDataShard { + +class TChangeRecordBodySerializer { + using TDataChange = NKikimrChangeExchange::TChangeRecord::TDataChange; + using TSerializedCells = TDataChange::TSerializedCells; + + static void SerializeCells(TSerializedCells& out, + TArrayRef<const TRawTypeValue> in, TArrayRef<const NTable::TTag> tags); + static void SerializeCells(TSerializedCells& out, + TArrayRef<const NTable::TUpdateOp> in); + static void SerializeCells(TSerializedCells& out, + const NTable::TRowState& state, TArrayRef<const NTable::TTag> tags); + +public: + static void Serialize(TDataChange& out, NTable::ERowOp rop, + TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags, TArrayRef<const NTable::TUpdateOp> updates); + static void Serialize(TDataChange& out, NTable::ERowOp rop, + TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags, + const NTable::TRowState* oldState, const NTable::TRowState* newState, TArrayRef<const NTable::TTag> valueTags); + +}; // TChangeRecordBodySerializer + +} |