aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorInnokentii Mokin <innokentii@ydb.tech>2024-07-17 17:52:00 +0700
committerGitHub <noreply@github.com>2024-07-17 13:52:00 +0300
commit6302faa47a2c55a855b47becb2182fe49e74bd57 (patch)
tree39a09e81d6f22add7993ff4879833276e6569afc
parentde3d9124fb8ae0226ff15c8a8d78f7f2769d9712 (diff)
downloadydb-6302faa47a2c55a855b47becb2182fe49e74bd57.tar.gz
Add specialized backup writer for repl (#6748)
-rw-r--r--ydb/core/backup/impl/table_writer.cpp152
-rw-r--r--ydb/core/backup/impl/table_writer.h7
-rw-r--r--ydb/core/backup/impl/table_writer_impl.h235
-rw-r--r--ydb/core/backup/impl/table_writer_ut.cpp159
-rw-r--r--ydb/core/backup/impl/ut_table_writer/ya.make20
-rw-r--r--ydb/core/backup/impl/ya.make1
-rw-r--r--ydb/core/tx/replication/service/json_change_record.h5
-rw-r--r--ydb/core/tx/replication/service/table_writer_impl.h21
8 files changed, 445 insertions, 155 deletions
diff --git a/ydb/core/backup/impl/table_writer.cpp b/ydb/core/backup/impl/table_writer.cpp
index 157e30c75e..4449a50a47 100644
--- a/ydb/core/backup/impl/table_writer.cpp
+++ b/ydb/core/backup/impl/table_writer.cpp
@@ -1,150 +1,4 @@
-#include "table_writer.h"
-
-#include <ydb/core/tx/replication/service/table_writer_impl.h>
-
-#include <ydb/core/change_exchange/change_record.h>
-#include <ydb/core/protos/change_exchange.pb.h>
-
-namespace NKikimr::NBackup::NImpl {
-
-class TChangeRecord: public NChangeExchange::TChangeRecordBase {
- friend class TChangeRecordBuilder;
-
-public:
- using TPtr = TIntrusivePtr<TChangeRecord>;
- const static NKikimrSchemeOp::ECdcStreamFormat StreamType = NKikimrSchemeOp::ECdcStreamFormatProto;
-
- ui64 GetGroup() const override {
- return ProtoBody.GetGroup();
- }
- ui64 GetStep() const override {
- return ProtoBody.GetStep();
- }
- ui64 GetTxId() const override {
- return ProtoBody.GetTxId();
- }
- EKind GetKind() const override {
- return EKind::CdcDataChange;
- }
- TString GetSourceId() const {
- return SourceId;
- }
-
- void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const {
- record.SetSourceOffset(GetOrder());
- // TODO: fill WriteTxId
-
- record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData());
-
- auto& upsert = *record.MutableUpsert();
- *upsert.MutableTags() = {
- ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(),
- ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()};
- upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData());
- }
-
- void Serialize(
- NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record,
- TChangeRecordBuilderContextTrait<TChangeRecord>&) const
- {
- return Serialize(record);
- }
-
- ui64 ResolvePartitionId(NChangeExchange::IChangeSenderResolver* const resolver) const override {
- const auto& partitions = resolver->GetPartitions();
- Y_ABORT_UNLESS(partitions);
- const auto& schema = resolver->GetSchema();
- const auto streamFormat = resolver->GetStreamFormat();
- Y_ABORT_UNLESS(streamFormat == NKikimrSchemeOp::ECdcStreamFormatProto);
-
- const auto range = TTableRange(GetKey());
- Y_ABORT_UNLESS(range.Point);
-
- const auto it = LowerBound(
- partitions.cbegin(), partitions.cend(), true,
- [&](const auto& partition, bool) {
- const int compares = CompareBorders<true, false>(
- partition.Range->EndKeyPrefix.GetCells(), range.From,
- partition.Range->IsInclusive || partition.Range->IsPoint,
- range.InclusiveFrom || range.Point, schema
- );
-
- return (compares < 0);
- }
- );
-
- Y_ABORT_UNLESS(it != partitions.end());
- return it->ShardId;
- }
-
- TConstArrayRef<TCell> GetKey() const {
- Y_ABORT_UNLESS(ProtoBody.HasCdcDataChange());
- Y_ABORT_UNLESS(ProtoBody.GetCdcDataChange().HasKey());
- Y_ABORT_UNLESS(ProtoBody.GetCdcDataChange().GetKey().HasData());
- TSerializedCellVec keyCellVec;
- Y_ABORT_UNLESS(TSerializedCellVec::TryParse(ProtoBody.GetCdcDataChange().GetKey().GetData(), keyCellVec));
- Key = keyCellVec;
-
- Y_ABORT_UNLESS(Key);
- return Key->GetCells();
- }
-private:
- TString SourceId;
- NKikimrChangeExchange::TChangeRecord ProtoBody;
- NReplication::NService::TLightweightSchema::TCPtr Schema;
-
- mutable TMaybe<TSerializedCellVec> Key;
-}; // TChangeRecord
-
-class TChangeRecordBuilder: public NChangeExchange::TChangeRecordBuilder<TChangeRecord, TChangeRecordBuilder> {
-public:
- using TBase::TBase;
-
- TSelf& WithSourceId(const TString& sourceId) {
- GetRecord()->SourceId = sourceId;
- return static_cast<TSelf&>(*this);
- }
-
- template <typename T>
- TSelf& WithBody(T&& body) {
- Y_ABORT_UNLESS(GetRecord()->ProtoBody.ParseFromString(body));
- return static_cast<TBase*>(this)->WithBody(std::forward<T>(body));
- }
-
- TSelf& WithSchema(NReplication::NService::TLightweightSchema::TCPtr schema) {
- GetRecord()->Schema = schema;
- return static_cast<TSelf&>(*this);
- }
-
-}; // TChangeRecordBuilder
-
-}
-
-namespace NKikimr {
-
-template <>
-struct TChangeRecordContainer<NBackup::NImpl::TChangeRecord>
- : public TBaseChangeRecordContainer
-{
- TChangeRecordContainer() = default;
-
- explicit TChangeRecordContainer(TVector<NBackup::NImpl::TChangeRecord::TPtr>&& records)
- : Records(std::move(records))
- {}
-
- TVector<NBackup::NImpl::TChangeRecord::TPtr> Records;
-
- TString Out() override {
- return TStringBuilder() << "[" << JoinSeq(",", Records) << "]";
- }
-};
-
-template <>
-struct TChangeRecordBuilderTrait<NBackup::NImpl::TChangeRecord>
- : public NBackup::NImpl::TChangeRecordBuilder
-{};
-
-}
+#include "table_writer_impl.h"
Y_DECLARE_OUT_SPEC(inline, NKikimr::NBackup::NImpl::TChangeRecord, out, value) {
return value.Out(out);
@@ -156,8 +10,8 @@ Y_DECLARE_OUT_SPEC(inline, NKikimr::NBackup::NImpl::TChangeRecord::TPtr, out, va
namespace NKikimr::NBackup::NImpl {
-IActor* CreateLocalTableWriter(const TPathId& tablePathId) {
- return new NReplication::NService::TLocalTableWriter<NBackup::NImpl::TChangeRecord>(tablePathId);
+IActor* CreateLocalTableWriter(const TPathId& tablePathId, EWriterType type) {
+ return new NReplication::NService::TLocalTableWriter<NBackup::NImpl::TChangeRecord>(tablePathId, type);
}
}
diff --git a/ydb/core/backup/impl/table_writer.h b/ydb/core/backup/impl/table_writer.h
index e5ca1dbc60..b1285ae4a8 100644
--- a/ydb/core/backup/impl/table_writer.h
+++ b/ydb/core/backup/impl/table_writer.h
@@ -8,6 +8,11 @@ namespace NKikimr {
namespace NKikimr::NBackup::NImpl {
-IActor* CreateLocalTableWriter(const TPathId& tablePathId);
+enum class EWriterType {
+ Backup,
+ Restore,
+};
+
+IActor* CreateLocalTableWriter(const TPathId& tablePathId, EWriterType type = EWriterType::Backup);
}
diff --git a/ydb/core/backup/impl/table_writer_impl.h b/ydb/core/backup/impl/table_writer_impl.h
new file mode 100644
index 0000000000..dc97073650
--- /dev/null
+++ b/ydb/core/backup/impl/table_writer_impl.h
@@ -0,0 +1,235 @@
+#pragma once
+
+#include "table_writer.h"
+
+#include <ydb/core/tx/replication/service/table_writer_impl.h>
+
+#include <ydb/core/change_exchange/change_record.h>
+#include <ydb/core/protos/change_exchange.pb.h>
+
+namespace NKikimr {
+
+namespace NBackup::NImpl {
+
+class TChangeRecord;
+
+} // namespace NBackup::NImpl
+
+template <>
+struct TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord> {
+ NBackup::NImpl::EWriterType Type;
+
+ TChangeRecordBuilderContextTrait(NBackup::NImpl::EWriterType type)
+ : Type(type)
+ {}
+
+ // just copy type
+ TChangeRecordBuilderContextTrait(const TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord>& other) = default;
+};
+
+} // namespace NKikimr
+
+namespace NKikimr::NBackup::NImpl {
+
+class TChangeRecord: public NChangeExchange::TChangeRecordBase {
+ friend class TChangeRecordBuilder;
+
+public:
+ using TPtr = TIntrusivePtr<TChangeRecord>;
+ const static NKikimrSchemeOp::ECdcStreamFormat StreamType = NKikimrSchemeOp::ECdcStreamFormatProto;
+
+ ui64 GetGroup() const override {
+ return ProtoBody.GetGroup();
+ }
+ ui64 GetStep() const override {
+ return ProtoBody.GetStep();
+ }
+ ui64 GetTxId() const override {
+ return ProtoBody.GetTxId();
+ }
+ EKind GetKind() const override {
+ return EKind::CdcDataChange;
+ }
+ TString GetSourceId() const {
+ return SourceId;
+ }
+
+ void Serialize(
+ NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record,
+ TChangeRecordBuilderContextTrait<TChangeRecord>& ctx) const
+ {
+ switch (ctx.Type) {
+ case EWriterType::Backup:
+ return SerializeBackup(record);
+ case EWriterType::Restore:
+ return SerializeRestore(record);
+ }
+ }
+
+ ui64 ResolvePartitionId(NChangeExchange::IChangeSenderResolver* const resolver) const override {
+ const auto& partitions = resolver->GetPartitions();
+ Y_ABORT_UNLESS(partitions);
+ const auto& schema = resolver->GetSchema();
+ const auto streamFormat = resolver->GetStreamFormat();
+ Y_ABORT_UNLESS(streamFormat == NKikimrSchemeOp::ECdcStreamFormatProto);
+
+ const auto range = TTableRange(GetKey());
+ Y_ABORT_UNLESS(range.Point);
+
+ const auto it = LowerBound(
+ partitions.cbegin(), partitions.cend(), true,
+ [&](const auto& partition, bool) {
+ const int compares = CompareBorders<true, false>(
+ partition.Range->EndKeyPrefix.GetCells(), range.From,
+ partition.Range->IsInclusive || partition.Range->IsPoint,
+ range.InclusiveFrom || range.Point, schema
+ );
+
+ return (compares < 0);
+ }
+ );
+
+ Y_ABORT_UNLESS(it != partitions.end());
+ return it->ShardId;
+ }
+
+ TConstArrayRef<TCell> GetKey() const {
+ Y_ABORT_UNLESS(ProtoBody.HasCdcDataChange());
+ Y_ABORT_UNLESS(ProtoBody.GetCdcDataChange().HasKey());
+ Y_ABORT_UNLESS(ProtoBody.GetCdcDataChange().GetKey().HasData());
+ TSerializedCellVec keyCellVec;
+ Y_ABORT_UNLESS(TSerializedCellVec::TryParse(ProtoBody.GetCdcDataChange().GetKey().GetData(), keyCellVec));
+ Key = keyCellVec;
+
+ Y_ABORT_UNLESS(Key);
+ return Key->GetCells();
+ }
+private:
+ TString SourceId;
+ NKikimrChangeExchange::TChangeRecord ProtoBody;
+ NReplication::NService::TLightweightSchema::TCPtr Schema;
+
+ mutable TMaybe<TSerializedCellVec> Key;
+
+ void SerializeBackup(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const {
+ record.SetSourceOffset(GetOrder());
+ // TODO: fill WriteTxId
+
+ record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData());
+
+ auto& upsert = *record.MutableUpsert();
+
+ switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) {
+ case NKikimrChangeExchange::TDataChange::kUpsert: {
+ *upsert.MutableTags() = {
+ ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(),
+ ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()};
+ auto it = Schema->ValueColumns.find("__incrBackupImpl_deleted");
+ Y_ABORT_UNLESS(it != Schema->ValueColumns.end(), "Invariant violation");
+ upsert.AddTags(it->second.Tag);
+
+ TString serializedCellVec = ProtoBody.GetCdcDataChange().GetUpsert().GetData();
+ Y_ABORT_UNLESS(
+ TSerializedCellVec::UnsafeAppendCells({TCell::Make<bool>(false)}, serializedCellVec),
+ "Invalid cell format, can't append cells");
+
+ upsert.SetData(serializedCellVec);
+ break;
+ }
+ case NKikimrChangeExchange::TDataChange::kErase: {
+ size_t size = Schema->ValueColumns.size();
+ TVector<NTable::TTag> tags;
+ TVector<TCell> cells;
+
+ tags.reserve(size);
+ cells.reserve(size);
+
+ for (const auto& [name, value] : Schema->ValueColumns) {
+ tags.push_back(value.Tag);
+ if (name != "__incrBackupImpl_deleted") {
+ cells.emplace_back();
+ } else {
+ cells.emplace_back(TCell::Make<bool>(true));
+ }
+ }
+
+ *upsert.MutableTags() = {tags.begin(), tags.end()};
+ upsert.SetData(TSerializedCellVec::Serialize(cells));
+
+ break;
+ }
+ case NKikimrChangeExchange::TDataChange::kReset:
+ default:
+ Y_FAIL_S("Unexpected row operation: " << static_cast<int>(ProtoBody.GetCdcDataChange().GetRowOperationCase()));
+ }
+ }
+
+ // just pass through, all conversions are on level above
+ void SerializeRestore(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const {
+ Y_ABORT_UNLESS(
+ ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kUpsert,
+ "Invariant violation");
+
+ record.SetSourceOffset(GetOrder());
+ // TODO: fill WriteTxId
+
+ record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData());
+
+ auto& upsert = *record.MutableUpsert();
+ *upsert.MutableTags() = {
+ ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(),
+ ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()};
+ upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData());
+ }
+
+}; // TChangeRecord
+
+class TChangeRecordBuilder: public NChangeExchange::TChangeRecordBuilder<TChangeRecord, TChangeRecordBuilder> {
+public:
+ using TBase::TBase;
+
+ TSelf& WithSourceId(const TString& sourceId) {
+ GetRecord()->SourceId = sourceId;
+ return static_cast<TSelf&>(*this);
+ }
+
+ template <typename T>
+ TSelf& WithBody(T&& body) {
+ Y_ABORT_UNLESS(GetRecord()->ProtoBody.ParseFromString(body));
+ return static_cast<TBase*>(this)->WithBody(std::forward<T>(body));
+ }
+
+ TSelf& WithSchema(NReplication::NService::TLightweightSchema::TCPtr schema) {
+ GetRecord()->Schema = schema;
+ return static_cast<TSelf&>(*this);
+ }
+
+}; // TChangeRecordBuilder
+
+}
+
+namespace NKikimr {
+
+template <>
+struct TChangeRecordContainer<NBackup::NImpl::TChangeRecord>
+ : public TBaseChangeRecordContainer
+{
+ TChangeRecordContainer() = default;
+
+ explicit TChangeRecordContainer(TVector<NBackup::NImpl::TChangeRecord::TPtr>&& records)
+ : Records(std::move(records))
+ {}
+
+ TVector<NBackup::NImpl::TChangeRecord::TPtr> Records;
+
+ TString Out() override {
+ return TStringBuilder() << "[" << JoinSeq(",", Records) << "]";
+ }
+};
+
+template <>
+struct TChangeRecordBuilderTrait<NBackup::NImpl::TChangeRecord>
+ : public NBackup::NImpl::TChangeRecordBuilder
+{};
+
+}
diff --git a/ydb/core/backup/impl/table_writer_ut.cpp b/ydb/core/backup/impl/table_writer_ut.cpp
new file mode 100644
index 0000000000..042c4de5d5
--- /dev/null
+++ b/ydb/core/backup/impl/table_writer_ut.cpp
@@ -0,0 +1,159 @@
+#include "table_writer_impl.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NBackup::NImpl {
+
+using TLightweightSchema = NReplication::NService::TLightweightSchema;
+
+Y_UNIT_TEST_SUITE(TableWriter) {
+ Y_UNIT_TEST(Backup) {
+ TLightweightSchema::TPtr schema = MakeIntrusive<TLightweightSchema>();
+ schema->KeyColumns.emplace_back(NScheme::TTypeInfo{NScheme::NTypeIds::Uint64});
+ schema->ValueColumns.emplace("value", TLightweightSchema::TColumn{
+ .Tag = 1,
+ .Type = NScheme::TTypeInfo{NScheme::NTypeIds::Uint64},
+ });
+ schema->ValueColumns.emplace("__incrBackupImpl_deleted", TLightweightSchema::TColumn{
+ .Tag = 123,
+ .Type = NScheme::TTypeInfo{NScheme::NTypeIds::Bool},
+ });
+
+ {
+ NKikimrChangeExchange::TChangeRecord changeRecord;
+ auto& change = *changeRecord.MutableCdcDataChange();
+
+ auto& key = *change.MutableKey();
+ TVector<TCell> keyCells{
+ TCell::Make<ui64>(1234)
+ };
+ key.SetData(TSerializedCellVec::Serialize(keyCells));
+ key.AddTags(0);
+
+ auto& upsert = *change.MutableUpsert();
+ TVector<TCell> cells{
+ TCell::Make<ui64>(4567),
+ };
+ upsert.SetData(TSerializedCellVec::Serialize(cells));
+ upsert.AddTags(1);
+
+ TString data;
+ UNIT_ASSERT(changeRecord.SerializeToString(&data));
+
+ auto record = TChangeRecordBuilder()
+ .WithSourceId("test")
+ .WithOrder(0)
+ .WithBody(data)
+ .WithSchema(schema)
+ .Build();
+
+ NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result;
+ TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord> ctx(EWriterType::Backup);
+ record->Serialize(result, ctx);
+
+ TVector<TCell> outCells{
+ TCell::Make<ui64>(4567),
+ TCell::Make<bool>(false),
+ };
+
+ TString out = TSerializedCellVec::Serialize(outCells);
+
+ UNIT_ASSERT_VALUES_EQUAL(TSerializedCellVec::Serialize(keyCells), result.GetKey());
+ UNIT_ASSERT_VALUES_EQUAL(out, result.GetUpsert().GetData());
+ UNIT_ASSERT(result.GetUpsert().TagsSize() == 2);
+ UNIT_ASSERT(result.GetUpsert().GetTags(0) == 1);
+ UNIT_ASSERT(result.GetUpsert().GetTags(1) == 123);
+ }
+
+ {
+ NKikimrChangeExchange::TChangeRecord changeRecord;
+ auto& change = *changeRecord.MutableCdcDataChange();
+
+ auto& key = *change.MutableKey();
+ TVector<TCell> keyCells{
+ TCell::Make<ui64>(1234)
+ };
+ key.SetData(TSerializedCellVec::Serialize(keyCells));
+ key.AddTags(0);
+
+ change.MutableErase();
+
+ TString data;
+ UNIT_ASSERT(changeRecord.SerializeToString(&data));
+
+ auto record = TChangeRecordBuilder()
+ .WithSourceId("test")
+ .WithOrder(0)
+ .WithBody(data)
+ .WithSchema(schema)
+ .Build();
+
+ NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result;
+ TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord> ctx(EWriterType::Backup);
+ record->Serialize(result, ctx);
+
+ TVector<TCell> outCells{
+ TCell::Make<bool>(true),
+ TCell(),
+ };
+
+ TString out = TSerializedCellVec::Serialize(outCells);
+
+ UNIT_ASSERT_VALUES_EQUAL(TSerializedCellVec::Serialize(keyCells), result.GetKey());
+ UNIT_ASSERT_VALUES_EQUAL(out, result.GetUpsert().GetData());
+ UNIT_ASSERT(result.GetUpsert().TagsSize() == 2);
+ UNIT_ASSERT(result.GetUpsert().GetTags(0) == 123);
+ UNIT_ASSERT(result.GetUpsert().GetTags(1) == 1);
+ }
+ }
+
+ Y_UNIT_TEST(Restore) {
+ TLightweightSchema::TPtr schema = MakeIntrusive<TLightweightSchema>();
+ schema->KeyColumns.emplace_back(NScheme::TTypeInfo{NScheme::NTypeIds::Uint64});
+ schema->ValueColumns.emplace("value", TLightweightSchema::TColumn{
+ .Tag = 1,
+ .Type = NScheme::TTypeInfo{NScheme::NTypeIds::Uint64},
+ });
+
+ {
+ NKikimrChangeExchange::TChangeRecord changeRecord;
+ auto& change = *changeRecord.MutableCdcDataChange();
+
+ auto& key = *change.MutableKey();
+ TVector<TCell> keyCells{
+ TCell::Make<ui64>(1234)
+ };
+ key.SetData(TSerializedCellVec::Serialize(keyCells));
+ key.AddTags(0);
+
+ auto& upsert = *change.MutableUpsert();
+ TVector<TCell> cells{
+ TCell::Make<ui64>(4567),
+ };
+ upsert.SetData(TSerializedCellVec::Serialize(cells));
+ upsert.AddTags(1);
+
+ TString data;
+ UNIT_ASSERT(changeRecord.SerializeToString(&data));
+
+ auto record = TChangeRecordBuilder()
+ .WithSourceId("test")
+ .WithOrder(0)
+ .WithBody(data)
+ .WithSchema(schema)
+ .Build();
+
+ NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result;
+ TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord> ctx(EWriterType::Restore);
+ record->Serialize(result, ctx);
+
+ UNIT_ASSERT_VALUES_EQUAL(TSerializedCellVec::Serialize(keyCells), result.GetKey());
+ UNIT_ASSERT_VALUES_EQUAL(upsert.GetData(), result.GetUpsert().GetData());
+ UNIT_ASSERT(result.GetUpsert().TagsSize() == 1);
+ UNIT_ASSERT(result.GetUpsert().GetTags(0) == 1);
+ }
+
+ }
+}
+
+} // namespace NKikimr::NBackup::NImpl
diff --git a/ydb/core/backup/impl/ut_table_writer/ya.make b/ydb/core/backup/impl/ut_table_writer/ya.make
new file mode 100644
index 0000000000..e9634d6754
--- /dev/null
+++ b/ydb/core/backup/impl/ut_table_writer/ya.make
@@ -0,0 +1,20 @@
+UNITTEST_FOR(ydb/core/backup/impl)
+
+FORK_SUBTESTS()
+
+SIZE(MEDIUM)
+
+TIMEOUT(600)
+
+PEERDIR(
+ library/cpp/testing/unittest
+ ydb/core/tx/replication/ut_helpers
+)
+
+SRCS(
+ table_writer_ut.cpp
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/backup/impl/ya.make b/ydb/core/backup/impl/ya.make
index 029bdf6ed5..1e3b06c2a3 100644
--- a/ydb/core/backup/impl/ya.make
+++ b/ydb/core/backup/impl/ya.make
@@ -15,4 +15,5 @@ END()
RECURSE_FOR_TESTS(
ut_local_partition_reader
+ ut_table_writer
)
diff --git a/ydb/core/tx/replication/service/json_change_record.h b/ydb/core/tx/replication/service/json_change_record.h
index f94091d02c..d85f262cbb 100644
--- a/ydb/core/tx/replication/service/json_change_record.h
+++ b/ydb/core/tx/replication/service/json_change_record.h
@@ -139,6 +139,11 @@ struct TChangeRecordBuilderContextTrait<NReplication::NService::TChangeRecord> {
TChangeRecordBuilderContextTrait()
: MemoryPool(256)
{}
+
+ // do not preserve any state between writers, just construct new one.
+ TChangeRecordBuilderContextTrait(const TChangeRecordBuilderContextTrait<NReplication::NService::TChangeRecord>&)
+ : MemoryPool(256)
+ {}
};
}
diff --git a/ydb/core/tx/replication/service/table_writer_impl.h b/ydb/core/tx/replication/service/table_writer_impl.h
index 1fd77232fb..5642bc0bbc 100644
--- a/ydb/core/tx/replication/service/table_writer_impl.h
+++ b/ydb/core/tx/replication/service/table_writer_impl.h
@@ -178,12 +178,16 @@ public:
return NKikimrServices::TActivity::REPLICATION_TABLE_PARTITION_WRITER;
}
- explicit TTablePartitionWriter(const TActorId& parent, ui64 tabletId, const TTableId& tableId)
+ explicit TTablePartitionWriter(
+ const TActorId& parent,
+ ui64 tabletId,
+ const TTableId& tableId,
+ TChangeRecordBuilderContextTrait<TChangeRecord> builderContext)
: Parent(parent)
, TabletId(tabletId)
, TableId(tableId)
- {
- }
+ , BuilderContext(builderContext)
+ {}
void Bootstrap() {
GetProxyServices();
@@ -430,7 +434,11 @@ class TLocalTableWriter
}
IActor* CreateSender(ui64 partitionId) const override {
- return new TTablePartitionWriter<TChangeRecord>(this->SelfId(), partitionId, TTableId(this->PathId, Schema->Version));
+ return new TTablePartitionWriter<TChangeRecord>(
+ this->SelfId(),
+ partitionId,
+ TTableId(this->PathId, Schema->Version),
+ BuilderContext);
}
const TVector<TKeyDesc::TPartitionInfo>& GetPartitions() const override { return KeyDesc->GetPartitions(); }
@@ -521,9 +529,11 @@ public:
return NKikimrServices::TActivity::REPLICATION_LOCAL_TABLE_WRITER;
}
- explicit TLocalTableWriter(const TPathId& tablePathId)
+ template <class... TArgs>
+ explicit TLocalTableWriter(const TPathId& tablePathId, TArgs&&... args)
: TBase(&TThis::StateWork)
, TBaseSender(this, this, this, TActorId(), tablePathId)
+ , BuilderContext(std::forward<TArgs>(args)...)
{
}
@@ -544,6 +554,7 @@ public:
private:
mutable TMaybe<TString> LogPrefix;
+ TChangeRecordBuilderContextTrait<TChangeRecord> BuilderContext;
TActorId Worker;
ui64 TableVersion = 0;