aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-01-19 22:54:49 +0300
committerilnaz <ilnaz@ydb.tech>2023-01-19 22:54:49 +0300
commit899159c396b4ab1dbc5a20f53a110fe8de786c28 (patch)
tree8917eba40e21f15ac0fb3c1a4d9c014454ba253e
parent8cc1ea5a1a1ed25b81d80addf426175e1a1640be (diff)
downloadydb-899159c396b4ab1dbc5a20f53a110fe8de786c28.tar.gz
(refactoring) Introduce TChangeRecordBodySerializer
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux.txt1
-rw-r--r--ydb/core/tx/datashard/change_collector_base.cpp79
-rw-r--r--ydb/core/tx/datashard/change_collector_base.h17
-rw-r--r--ydb/core/tx/datashard/change_record.cpp6
-rw-r--r--ydb/core/tx/datashard/change_record.h6
-rw-r--r--ydb/core/tx/datashard/change_record_body_serializer.cpp94
-rw-r--r--ydb/core/tx/datashard/change_record_body_serializer.h29
9 files changed, 135 insertions, 99 deletions
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin.txt b/ydb/core/tx/datashard/CMakeLists.darwin.txt
index 9d58b961f0..22870fc754 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin.txt
@@ -96,6 +96,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_body_serializer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
index 1ff95f10b0..d8d6b37b56 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
@@ -97,6 +97,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_body_serializer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux.txt b/ydb/core/tx/datashard/CMakeLists.linux.txt
index 1ff95f10b0..d8d6b37b56 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux.txt
@@ -97,6 +97,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_body_serializer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
diff --git a/ydb/core/tx/datashard/change_collector_base.cpp b/ydb/core/tx/datashard/change_collector_base.cpp
index 2bbe8daf6e..4af91f658d 100644
--- a/ydb/core/tx/datashard/change_collector_base.cpp
+++ b/ydb/core/tx/datashard/change_collector_base.cpp
@@ -55,85 +55,6 @@ void TBaseChangeCollector::Reset() {
Collected.clear();
}
-void TBaseChangeCollector::SerializeCells(TSerializedCells& out, TArrayRef<const TRawTypeValue> in, TArrayRef<const TTag> tags) {
- Y_VERIFY_S(in.size() == tags.size(), "Count doesn't match"
- << ": in# " << in.size()
- << ", tags# " << tags.size());
-
- TVector<TCell> cells(Reserve(in.size()));
- for (size_t i = 0; i < in.size(); ++i) {
- out.AddTags(tags.at(i));
- cells.emplace_back(in.at(i).AsRef());
- }
-
- out.SetData(TSerializedCellVec::Serialize(cells));
-}
-
-void TBaseChangeCollector::SerializeCells(TSerializedCells& out, TArrayRef<const TUpdateOp> in) {
- if (!in) {
- return;
- }
-
- TVector<TCell> cells(Reserve(in.size()));
- for (const auto& op : in) {
- Y_VERIFY_S(op.Op == ECellOp::Set, "Unexpected cell op: " << op.Op.Raw());
-
- out.AddTags(op.Tag);
- cells.emplace_back(op.AsCell());
- }
-
- out.SetData(TSerializedCellVec::Serialize(cells));
-}
-
-void TBaseChangeCollector::SerializeCells(TSerializedCells& out, const TRowState& state, TArrayRef<const TTag> tags) {
- Y_VERIFY_S(state.Size() == tags.size(), "Count doesn't match"
- << ": state# " << state.Size()
- << ", tags# " << tags.size());
-
- TVector<TCell> cells(Reserve(state.Size()));
- for (TPos pos = 0; pos < state.Size(); ++pos) {
- out.AddTags(tags.at(pos));
- cells.emplace_back(state.Get(pos));
- }
-
- out.SetData(TSerializedCellVec::Serialize(cells));
-}
-
-void TBaseChangeCollector::Serialize(TDataChange& out, ERowOp rop,
- TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags, TArrayRef<const TUpdateOp> updates)
-{
- SerializeCells(*out.MutableKey(), key, keyTags);
-
- switch (rop) {
- case ERowOp::Upsert:
- SerializeCells(*out.MutableUpsert(), updates);
- break;
- case ERowOp::Erase:
- out.MutableErase();
- break;
- case ERowOp::Reset:
- SerializeCells(*out.MutableReset(), updates);
- break;
- default:
- Y_FAIL_S("Unsupported row op: " << static_cast<ui8>(rop));
- }
-}
-
-void TBaseChangeCollector::Serialize(TDataChange& out, ERowOp rop,
- TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags,
- const TRowState* oldState, const TRowState* newState, TArrayRef<const TTag> valueTags)
-{
- Serialize(out, rop, key, keyTags, {});
-
- if (oldState) {
- SerializeCells(*out.MutableOldImage(), *oldState, valueTags);
- }
-
- if (newState) {
- SerializeCells(*out.MutableNewImage(), *newState, valueTags);
- }
-}
-
void TBaseChangeCollector::Persist(
const TTableId& tableId, // origin table
const TPathId& pathId, // target object (table, stream, etc...)
diff --git a/ydb/core/tx/datashard/change_collector_base.h b/ydb/core/tx/datashard/change_collector_base.h
index 82ba562d5d..361389d6b3 100644
--- a/ydb/core/tx/datashard/change_collector_base.h
+++ b/ydb/core/tx/datashard/change_collector_base.h
@@ -1,6 +1,7 @@
#pragma once
#include "change_record.h"
+#include "change_record_body_serializer.h"
#include <ydb/core/engine/minikql/change_collector_iface.h>
#include <ydb/core/protos/change_exchange.pb.h>
@@ -19,21 +20,13 @@ public:
virtual void SetGroup(ui64 group) = 0;
};
-class TBaseChangeCollector: public IBaseChangeCollector {
+class TBaseChangeCollector
+ : public IBaseChangeCollector
+ , protected TChangeRecordBodySerializer
+{
using TDataChange = NKikimrChangeExchange::TChangeRecord::TDataChange;
- using TSerializedCells = TDataChange::TSerializedCells;
-
- static void SerializeCells(TSerializedCells& out, TArrayRef<const TRawTypeValue> in, TArrayRef<const NTable::TTag> tags);
- static void SerializeCells(TSerializedCells& out, TArrayRef<const NTable::TUpdateOp> in);
- static void SerializeCells(TSerializedCells& out, const NTable::TRowState& state, TArrayRef<const NTable::TTag> tags);
protected:
- static void Serialize(TDataChange& out, NTable::ERowOp rop,
- TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags, TArrayRef<const NTable::TUpdateOp> updates);
- static void Serialize(TDataChange& out, NTable::ERowOp rop,
- TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags,
- const NTable::TRowState* oldState, const NTable::TRowState* newState, TArrayRef<const NTable::TTag> valueTags);
-
void Persist(const TTableId& tableId, const TPathId& pathId, TChangeRecord::EKind kind, const TDataChange& body);
public:
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index d70f9f02a0..cba5084194 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -13,8 +13,7 @@
#include <util/stream/str.h>
-namespace NKikimr {
-namespace NDataShard {
+namespace NKikimr::NDataShard {
void TChangeRecord::SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const {
record.SetOrder(Order);
@@ -347,5 +346,4 @@ TChangeRecord&& TChangeRecordBuilder::Build() {
return std::move(Record);
}
-} // NDataShard
-} // NKikimr
+}
diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h
index 8ecad30b32..decfa6dce6 100644
--- a/ydb/core/tx/datashard/change_record.h
+++ b/ydb/core/tx/datashard/change_record.h
@@ -14,8 +14,7 @@ namespace NKikimrChangeExchange {
class TChangeRecord;
}
-namespace NKikimr {
-namespace NDataShard {
+namespace NKikimr::NDataShard {
class TChangeRecordBuilder;
@@ -101,8 +100,7 @@ private:
}; // TChangeRecordBuilder
-} // NDataShard
-} // NKikimr
+}
Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TChangeRecord, out, value) {
return value.Out(out);
diff --git a/ydb/core/tx/datashard/change_record_body_serializer.cpp b/ydb/core/tx/datashard/change_record_body_serializer.cpp
new file mode 100644
index 0000000000..b1227992b0
--- /dev/null
+++ b/ydb/core/tx/datashard/change_record_body_serializer.cpp
@@ -0,0 +1,94 @@
+#include "change_record_body_serializer.h"
+
+#include <ydb/core/util/yverify_stream.h>
+
+namespace NKikimr::NDataShard {
+
+using namespace NTable;
+
+void TChangeRecordBodySerializer::SerializeCells(TSerializedCells& out,
+ TArrayRef<const TRawTypeValue> in, TArrayRef<const TTag> tags)
+{
+ Y_VERIFY_S(in.size() == tags.size(), "Count doesn't match"
+ << ": in# " << in.size()
+ << ", tags# " << tags.size());
+
+ TVector<TCell> cells(Reserve(in.size()));
+ for (size_t i = 0; i < in.size(); ++i) {
+ out.AddTags(tags.at(i));
+ cells.emplace_back(in.at(i).AsRef());
+ }
+
+ out.SetData(TSerializedCellVec::Serialize(cells));
+}
+
+void TChangeRecordBodySerializer::SerializeCells(TSerializedCells& out,
+ TArrayRef<const TUpdateOp> in)
+{
+ if (!in) {
+ return;
+ }
+
+ TVector<TCell> cells(Reserve(in.size()));
+ for (const auto& op : in) {
+ Y_VERIFY_S(op.Op == ECellOp::Set, "Unexpected cell op: " << op.Op.Raw());
+
+ out.AddTags(op.Tag);
+ cells.emplace_back(op.AsCell());
+ }
+
+ out.SetData(TSerializedCellVec::Serialize(cells));
+}
+
+void TChangeRecordBodySerializer::SerializeCells(TSerializedCells& out,
+ const TRowState& state, TArrayRef<const TTag> tags)
+{
+ Y_VERIFY_S(state.Size() == tags.size(), "Count doesn't match"
+ << ": state# " << state.Size()
+ << ", tags# " << tags.size());
+
+ TVector<TCell> cells(Reserve(state.Size()));
+ for (TPos pos = 0; pos < state.Size(); ++pos) {
+ out.AddTags(tags.at(pos));
+ cells.emplace_back(state.Get(pos));
+ }
+
+ out.SetData(TSerializedCellVec::Serialize(cells));
+}
+
+void TChangeRecordBodySerializer::Serialize(TDataChange& out, ERowOp rop,
+ TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags, TArrayRef<const TUpdateOp> updates)
+{
+ SerializeCells(*out.MutableKey(), key, keyTags);
+
+ switch (rop) {
+ case ERowOp::Upsert:
+ SerializeCells(*out.MutableUpsert(), updates);
+ break;
+ case ERowOp::Erase:
+ out.MutableErase();
+ break;
+ case ERowOp::Reset:
+ SerializeCells(*out.MutableReset(), updates);
+ break;
+ default:
+ Y_FAIL_S("Unsupported row op: " << static_cast<ui8>(rop));
+ }
+}
+
+void TChangeRecordBodySerializer::Serialize(TDataChange& out, ERowOp rop,
+ TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags,
+ const TRowState* oldState, const TRowState* newState, TArrayRef<const TTag> valueTags)
+{
+ Serialize(out, rop, key, keyTags, {});
+
+ if (oldState) {
+ SerializeCells(*out.MutableOldImage(), *oldState, valueTags);
+ }
+
+ if (newState) {
+ SerializeCells(*out.MutableNewImage(), *newState, valueTags);
+ }
+}
+
+}
diff --git a/ydb/core/tx/datashard/change_record_body_serializer.h b/ydb/core/tx/datashard/change_record_body_serializer.h
new file mode 100644
index 0000000000..29f6b53deb
--- /dev/null
+++ b/ydb/core/tx/datashard/change_record_body_serializer.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include <ydb/core/protos/change_exchange.pb.h>
+#include <ydb/core/scheme_types/scheme_raw_type_value.h>
+#include <ydb/core/tablet_flat/flat_database.h>
+
+namespace NKikimr::NDataShard {
+
+class TChangeRecordBodySerializer {
+ using TDataChange = NKikimrChangeExchange::TChangeRecord::TDataChange;
+ using TSerializedCells = TDataChange::TSerializedCells;
+
+ static void SerializeCells(TSerializedCells& out,
+ TArrayRef<const TRawTypeValue> in, TArrayRef<const NTable::TTag> tags);
+ static void SerializeCells(TSerializedCells& out,
+ TArrayRef<const NTable::TUpdateOp> in);
+ static void SerializeCells(TSerializedCells& out,
+ const NTable::TRowState& state, TArrayRef<const NTable::TTag> tags);
+
+public:
+ static void Serialize(TDataChange& out, NTable::ERowOp rop,
+ TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags, TArrayRef<const NTable::TUpdateOp> updates);
+ static void Serialize(TDataChange& out, NTable::ERowOp rop,
+ TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags,
+ const NTable::TRowState* oldState, const NTable::TRowState* newState, TArrayRef<const NTable::TTag> valueTags);
+
+}; // TChangeRecordBodySerializer
+
+}