diff options
author | Innokentii Mokin <innokentii@ydb.tech> | 2024-07-17 17:52:00 +0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-17 13:52:00 +0300 |
commit | 6302faa47a2c55a855b47becb2182fe49e74bd57 (patch) | |
tree | 39a09e81d6f22add7993ff4879833276e6569afc | |
parent | de3d9124fb8ae0226ff15c8a8d78f7f2769d9712 (diff) | |
download | ydb-6302faa47a2c55a855b47becb2182fe49e74bd57.tar.gz |
Add specialized backup writer for repl (#6748)
-rw-r--r-- | ydb/core/backup/impl/table_writer.cpp | 152 | ||||
-rw-r--r-- | ydb/core/backup/impl/table_writer.h | 7 | ||||
-rw-r--r-- | ydb/core/backup/impl/table_writer_impl.h | 235 | ||||
-rw-r--r-- | ydb/core/backup/impl/table_writer_ut.cpp | 159 | ||||
-rw-r--r-- | ydb/core/backup/impl/ut_table_writer/ya.make | 20 | ||||
-rw-r--r-- | ydb/core/backup/impl/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/json_change_record.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/table_writer_impl.h | 21 |
8 files changed, 445 insertions, 155 deletions
diff --git a/ydb/core/backup/impl/table_writer.cpp b/ydb/core/backup/impl/table_writer.cpp index 157e30c75e..4449a50a47 100644 --- a/ydb/core/backup/impl/table_writer.cpp +++ b/ydb/core/backup/impl/table_writer.cpp @@ -1,150 +1,4 @@ -#include "table_writer.h" - -#include <ydb/core/tx/replication/service/table_writer_impl.h> - -#include <ydb/core/change_exchange/change_record.h> -#include <ydb/core/protos/change_exchange.pb.h> - -namespace NKikimr::NBackup::NImpl { - -class TChangeRecord: public NChangeExchange::TChangeRecordBase { - friend class TChangeRecordBuilder; - -public: - using TPtr = TIntrusivePtr<TChangeRecord>; - const static NKikimrSchemeOp::ECdcStreamFormat StreamType = NKikimrSchemeOp::ECdcStreamFormatProto; - - ui64 GetGroup() const override { - return ProtoBody.GetGroup(); - } - ui64 GetStep() const override { - return ProtoBody.GetStep(); - } - ui64 GetTxId() const override { - return ProtoBody.GetTxId(); - } - EKind GetKind() const override { - return EKind::CdcDataChange; - } - TString GetSourceId() const { - return SourceId; - } - - void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const { - record.SetSourceOffset(GetOrder()); - // TODO: fill WriteTxId - - record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData()); - - auto& upsert = *record.MutableUpsert(); - *upsert.MutableTags() = { - ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(), - ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()}; - upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData()); - } - - void Serialize( - NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record, - TChangeRecordBuilderContextTrait<TChangeRecord>&) const - { - return Serialize(record); - } - - ui64 ResolvePartitionId(NChangeExchange::IChangeSenderResolver* const resolver) const override { - const auto& partitions = resolver->GetPartitions(); - Y_ABORT_UNLESS(partitions); - const auto& schema = resolver->GetSchema(); - const auto streamFormat = resolver->GetStreamFormat(); - Y_ABORT_UNLESS(streamFormat == NKikimrSchemeOp::ECdcStreamFormatProto); - - const auto range = TTableRange(GetKey()); - Y_ABORT_UNLESS(range.Point); - - const auto it = LowerBound( - partitions.cbegin(), partitions.cend(), true, - [&](const auto& partition, bool) { - const int compares = CompareBorders<true, false>( - partition.Range->EndKeyPrefix.GetCells(), range.From, - partition.Range->IsInclusive || partition.Range->IsPoint, - range.InclusiveFrom || range.Point, schema - ); - - return (compares < 0); - } - ); - - Y_ABORT_UNLESS(it != partitions.end()); - return it->ShardId; - } - - TConstArrayRef<TCell> GetKey() const { - Y_ABORT_UNLESS(ProtoBody.HasCdcDataChange()); - Y_ABORT_UNLESS(ProtoBody.GetCdcDataChange().HasKey()); - Y_ABORT_UNLESS(ProtoBody.GetCdcDataChange().GetKey().HasData()); - TSerializedCellVec keyCellVec; - Y_ABORT_UNLESS(TSerializedCellVec::TryParse(ProtoBody.GetCdcDataChange().GetKey().GetData(), keyCellVec)); - Key = keyCellVec; - - Y_ABORT_UNLESS(Key); - return Key->GetCells(); - } -private: - TString SourceId; - NKikimrChangeExchange::TChangeRecord ProtoBody; - NReplication::NService::TLightweightSchema::TCPtr Schema; - - mutable TMaybe<TSerializedCellVec> Key; -}; // TChangeRecord - -class TChangeRecordBuilder: public NChangeExchange::TChangeRecordBuilder<TChangeRecord, TChangeRecordBuilder> { -public: - using TBase::TBase; - - TSelf& WithSourceId(const TString& sourceId) { - GetRecord()->SourceId = sourceId; - return static_cast<TSelf&>(*this); - } - - template <typename T> - TSelf& WithBody(T&& body) { - Y_ABORT_UNLESS(GetRecord()->ProtoBody.ParseFromString(body)); - return static_cast<TBase*>(this)->WithBody(std::forward<T>(body)); - } - - TSelf& WithSchema(NReplication::NService::TLightweightSchema::TCPtr schema) { - GetRecord()->Schema = schema; - return static_cast<TSelf&>(*this); - } - -}; // TChangeRecordBuilder - -} - -namespace NKikimr { - -template <> -struct TChangeRecordContainer<NBackup::NImpl::TChangeRecord> - : public TBaseChangeRecordContainer -{ - TChangeRecordContainer() = default; - - explicit TChangeRecordContainer(TVector<NBackup::NImpl::TChangeRecord::TPtr>&& records) - : Records(std::move(records)) - {} - - TVector<NBackup::NImpl::TChangeRecord::TPtr> Records; - - TString Out() override { - return TStringBuilder() << "[" << JoinSeq(",", Records) << "]"; - } -}; - -template <> -struct TChangeRecordBuilderTrait<NBackup::NImpl::TChangeRecord> - : public NBackup::NImpl::TChangeRecordBuilder -{}; - -} +#include "table_writer_impl.h" Y_DECLARE_OUT_SPEC(inline, NKikimr::NBackup::NImpl::TChangeRecord, out, value) { return value.Out(out); @@ -156,8 +10,8 @@ Y_DECLARE_OUT_SPEC(inline, NKikimr::NBackup::NImpl::TChangeRecord::TPtr, out, va namespace NKikimr::NBackup::NImpl { -IActor* CreateLocalTableWriter(const TPathId& tablePathId) { - return new NReplication::NService::TLocalTableWriter<NBackup::NImpl::TChangeRecord>(tablePathId); +IActor* CreateLocalTableWriter(const TPathId& tablePathId, EWriterType type) { + return new NReplication::NService::TLocalTableWriter<NBackup::NImpl::TChangeRecord>(tablePathId, type); } } diff --git a/ydb/core/backup/impl/table_writer.h b/ydb/core/backup/impl/table_writer.h index e5ca1dbc60..b1285ae4a8 100644 --- a/ydb/core/backup/impl/table_writer.h +++ b/ydb/core/backup/impl/table_writer.h @@ -8,6 +8,11 @@ namespace NKikimr { namespace NKikimr::NBackup::NImpl { -IActor* CreateLocalTableWriter(const TPathId& tablePathId); +enum class EWriterType { + Backup, + Restore, +}; + +IActor* CreateLocalTableWriter(const TPathId& tablePathId, EWriterType type = EWriterType::Backup); } diff --git a/ydb/core/backup/impl/table_writer_impl.h b/ydb/core/backup/impl/table_writer_impl.h new file mode 100644 index 0000000000..dc97073650 --- /dev/null +++ b/ydb/core/backup/impl/table_writer_impl.h @@ -0,0 +1,235 @@ +#pragma once + +#include "table_writer.h" + +#include <ydb/core/tx/replication/service/table_writer_impl.h> + +#include <ydb/core/change_exchange/change_record.h> +#include <ydb/core/protos/change_exchange.pb.h> + +namespace NKikimr { + +namespace NBackup::NImpl { + +class TChangeRecord; + +} // namespace NBackup::NImpl + +template <> +struct TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord> { + NBackup::NImpl::EWriterType Type; + + TChangeRecordBuilderContextTrait(NBackup::NImpl::EWriterType type) + : Type(type) + {} + + // just copy type + TChangeRecordBuilderContextTrait(const TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord>& other) = default; +}; + +} // namespace NKikimr + +namespace NKikimr::NBackup::NImpl { + +class TChangeRecord: public NChangeExchange::TChangeRecordBase { + friend class TChangeRecordBuilder; + +public: + using TPtr = TIntrusivePtr<TChangeRecord>; + const static NKikimrSchemeOp::ECdcStreamFormat StreamType = NKikimrSchemeOp::ECdcStreamFormatProto; + + ui64 GetGroup() const override { + return ProtoBody.GetGroup(); + } + ui64 GetStep() const override { + return ProtoBody.GetStep(); + } + ui64 GetTxId() const override { + return ProtoBody.GetTxId(); + } + EKind GetKind() const override { + return EKind::CdcDataChange; + } + TString GetSourceId() const { + return SourceId; + } + + void Serialize( + NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record, + TChangeRecordBuilderContextTrait<TChangeRecord>& ctx) const + { + switch (ctx.Type) { + case EWriterType::Backup: + return SerializeBackup(record); + case EWriterType::Restore: + return SerializeRestore(record); + } + } + + ui64 ResolvePartitionId(NChangeExchange::IChangeSenderResolver* const resolver) const override { + const auto& partitions = resolver->GetPartitions(); + Y_ABORT_UNLESS(partitions); + const auto& schema = resolver->GetSchema(); + const auto streamFormat = resolver->GetStreamFormat(); + Y_ABORT_UNLESS(streamFormat == NKikimrSchemeOp::ECdcStreamFormatProto); + + const auto range = TTableRange(GetKey()); + Y_ABORT_UNLESS(range.Point); + + const auto it = LowerBound( + partitions.cbegin(), partitions.cend(), true, + [&](const auto& partition, bool) { + const int compares = CompareBorders<true, false>( + partition.Range->EndKeyPrefix.GetCells(), range.From, + partition.Range->IsInclusive || partition.Range->IsPoint, + range.InclusiveFrom || range.Point, schema + ); + + return (compares < 0); + } + ); + + Y_ABORT_UNLESS(it != partitions.end()); + return it->ShardId; + } + + TConstArrayRef<TCell> GetKey() const { + Y_ABORT_UNLESS(ProtoBody.HasCdcDataChange()); + Y_ABORT_UNLESS(ProtoBody.GetCdcDataChange().HasKey()); + Y_ABORT_UNLESS(ProtoBody.GetCdcDataChange().GetKey().HasData()); + TSerializedCellVec keyCellVec; + Y_ABORT_UNLESS(TSerializedCellVec::TryParse(ProtoBody.GetCdcDataChange().GetKey().GetData(), keyCellVec)); + Key = keyCellVec; + + Y_ABORT_UNLESS(Key); + return Key->GetCells(); + } +private: + TString SourceId; + NKikimrChangeExchange::TChangeRecord ProtoBody; + NReplication::NService::TLightweightSchema::TCPtr Schema; + + mutable TMaybe<TSerializedCellVec> Key; + + void SerializeBackup(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const { + record.SetSourceOffset(GetOrder()); + // TODO: fill WriteTxId + + record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData()); + + auto& upsert = *record.MutableUpsert(); + + switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) { + case NKikimrChangeExchange::TDataChange::kUpsert: { + *upsert.MutableTags() = { + ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(), + ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()}; + auto it = Schema->ValueColumns.find("__incrBackupImpl_deleted"); + Y_ABORT_UNLESS(it != Schema->ValueColumns.end(), "Invariant violation"); + upsert.AddTags(it->second.Tag); + + TString serializedCellVec = ProtoBody.GetCdcDataChange().GetUpsert().GetData(); + Y_ABORT_UNLESS( + TSerializedCellVec::UnsafeAppendCells({TCell::Make<bool>(false)}, serializedCellVec), + "Invalid cell format, can't append cells"); + + upsert.SetData(serializedCellVec); + break; + } + case NKikimrChangeExchange::TDataChange::kErase: { + size_t size = Schema->ValueColumns.size(); + TVector<NTable::TTag> tags; + TVector<TCell> cells; + + tags.reserve(size); + cells.reserve(size); + + for (const auto& [name, value] : Schema->ValueColumns) { + tags.push_back(value.Tag); + if (name != "__incrBackupImpl_deleted") { + cells.emplace_back(); + } else { + cells.emplace_back(TCell::Make<bool>(true)); + } + } + + *upsert.MutableTags() = {tags.begin(), tags.end()}; + upsert.SetData(TSerializedCellVec::Serialize(cells)); + + break; + } + case NKikimrChangeExchange::TDataChange::kReset: + default: + Y_FAIL_S("Unexpected row operation: " << static_cast<int>(ProtoBody.GetCdcDataChange().GetRowOperationCase())); + } + } + + // just pass through, all conversions are on level above + void SerializeRestore(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const { + Y_ABORT_UNLESS( + ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kUpsert, + "Invariant violation"); + + record.SetSourceOffset(GetOrder()); + // TODO: fill WriteTxId + + record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData()); + + auto& upsert = *record.MutableUpsert(); + *upsert.MutableTags() = { + ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(), + ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()}; + upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData()); + } + +}; // TChangeRecord + +class TChangeRecordBuilder: public NChangeExchange::TChangeRecordBuilder<TChangeRecord, TChangeRecordBuilder> { +public: + using TBase::TBase; + + TSelf& WithSourceId(const TString& sourceId) { + GetRecord()->SourceId = sourceId; + return static_cast<TSelf&>(*this); + } + + template <typename T> + TSelf& WithBody(T&& body) { + Y_ABORT_UNLESS(GetRecord()->ProtoBody.ParseFromString(body)); + return static_cast<TBase*>(this)->WithBody(std::forward<T>(body)); + } + + TSelf& WithSchema(NReplication::NService::TLightweightSchema::TCPtr schema) { + GetRecord()->Schema = schema; + return static_cast<TSelf&>(*this); + } + +}; // TChangeRecordBuilder + +} + +namespace NKikimr { + +template <> +struct TChangeRecordContainer<NBackup::NImpl::TChangeRecord> + : public TBaseChangeRecordContainer +{ + TChangeRecordContainer() = default; + + explicit TChangeRecordContainer(TVector<NBackup::NImpl::TChangeRecord::TPtr>&& records) + : Records(std::move(records)) + {} + + TVector<NBackup::NImpl::TChangeRecord::TPtr> Records; + + TString Out() override { + return TStringBuilder() << "[" << JoinSeq(",", Records) << "]"; + } +}; + +template <> +struct TChangeRecordBuilderTrait<NBackup::NImpl::TChangeRecord> + : public NBackup::NImpl::TChangeRecordBuilder +{}; + +} diff --git a/ydb/core/backup/impl/table_writer_ut.cpp b/ydb/core/backup/impl/table_writer_ut.cpp new file mode 100644 index 0000000000..042c4de5d5 --- /dev/null +++ b/ydb/core/backup/impl/table_writer_ut.cpp @@ -0,0 +1,159 @@ +#include "table_writer_impl.h" + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NBackup::NImpl { + +using TLightweightSchema = NReplication::NService::TLightweightSchema; + +Y_UNIT_TEST_SUITE(TableWriter) { + Y_UNIT_TEST(Backup) { + TLightweightSchema::TPtr schema = MakeIntrusive<TLightweightSchema>(); + schema->KeyColumns.emplace_back(NScheme::TTypeInfo{NScheme::NTypeIds::Uint64}); + schema->ValueColumns.emplace("value", TLightweightSchema::TColumn{ + .Tag = 1, + .Type = NScheme::TTypeInfo{NScheme::NTypeIds::Uint64}, + }); + schema->ValueColumns.emplace("__incrBackupImpl_deleted", TLightweightSchema::TColumn{ + .Tag = 123, + .Type = NScheme::TTypeInfo{NScheme::NTypeIds::Bool}, + }); + + { + NKikimrChangeExchange::TChangeRecord changeRecord; + auto& change = *changeRecord.MutableCdcDataChange(); + + auto& key = *change.MutableKey(); + TVector<TCell> keyCells{ + TCell::Make<ui64>(1234) + }; + key.SetData(TSerializedCellVec::Serialize(keyCells)); + key.AddTags(0); + + auto& upsert = *change.MutableUpsert(); + TVector<TCell> cells{ + TCell::Make<ui64>(4567), + }; + upsert.SetData(TSerializedCellVec::Serialize(cells)); + upsert.AddTags(1); + + TString data; + UNIT_ASSERT(changeRecord.SerializeToString(&data)); + + auto record = TChangeRecordBuilder() + .WithSourceId("test") + .WithOrder(0) + .WithBody(data) + .WithSchema(schema) + .Build(); + + NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result; + TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord> ctx(EWriterType::Backup); + record->Serialize(result, ctx); + + TVector<TCell> outCells{ + TCell::Make<ui64>(4567), + TCell::Make<bool>(false), + }; + + TString out = TSerializedCellVec::Serialize(outCells); + + UNIT_ASSERT_VALUES_EQUAL(TSerializedCellVec::Serialize(keyCells), result.GetKey()); + UNIT_ASSERT_VALUES_EQUAL(out, result.GetUpsert().GetData()); + UNIT_ASSERT(result.GetUpsert().TagsSize() == 2); + UNIT_ASSERT(result.GetUpsert().GetTags(0) == 1); + UNIT_ASSERT(result.GetUpsert().GetTags(1) == 123); + } + + { + NKikimrChangeExchange::TChangeRecord changeRecord; + auto& change = *changeRecord.MutableCdcDataChange(); + + auto& key = *change.MutableKey(); + TVector<TCell> keyCells{ + TCell::Make<ui64>(1234) + }; + key.SetData(TSerializedCellVec::Serialize(keyCells)); + key.AddTags(0); + + change.MutableErase(); + + TString data; + UNIT_ASSERT(changeRecord.SerializeToString(&data)); + + auto record = TChangeRecordBuilder() + .WithSourceId("test") + .WithOrder(0) + .WithBody(data) + .WithSchema(schema) + .Build(); + + NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result; + TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord> ctx(EWriterType::Backup); + record->Serialize(result, ctx); + + TVector<TCell> outCells{ + TCell::Make<bool>(true), + TCell(), + }; + + TString out = TSerializedCellVec::Serialize(outCells); + + UNIT_ASSERT_VALUES_EQUAL(TSerializedCellVec::Serialize(keyCells), result.GetKey()); + UNIT_ASSERT_VALUES_EQUAL(out, result.GetUpsert().GetData()); + UNIT_ASSERT(result.GetUpsert().TagsSize() == 2); + UNIT_ASSERT(result.GetUpsert().GetTags(0) == 123); + UNIT_ASSERT(result.GetUpsert().GetTags(1) == 1); + } + } + + Y_UNIT_TEST(Restore) { + TLightweightSchema::TPtr schema = MakeIntrusive<TLightweightSchema>(); + schema->KeyColumns.emplace_back(NScheme::TTypeInfo{NScheme::NTypeIds::Uint64}); + schema->ValueColumns.emplace("value", TLightweightSchema::TColumn{ + .Tag = 1, + .Type = NScheme::TTypeInfo{NScheme::NTypeIds::Uint64}, + }); + + { + NKikimrChangeExchange::TChangeRecord changeRecord; + auto& change = *changeRecord.MutableCdcDataChange(); + + auto& key = *change.MutableKey(); + TVector<TCell> keyCells{ + TCell::Make<ui64>(1234) + }; + key.SetData(TSerializedCellVec::Serialize(keyCells)); + key.AddTags(0); + + auto& upsert = *change.MutableUpsert(); + TVector<TCell> cells{ + TCell::Make<ui64>(4567), + }; + upsert.SetData(TSerializedCellVec::Serialize(cells)); + upsert.AddTags(1); + + TString data; + UNIT_ASSERT(changeRecord.SerializeToString(&data)); + + auto record = TChangeRecordBuilder() + .WithSourceId("test") + .WithOrder(0) + .WithBody(data) + .WithSchema(schema) + .Build(); + + NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result; + TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord> ctx(EWriterType::Restore); + record->Serialize(result, ctx); + + UNIT_ASSERT_VALUES_EQUAL(TSerializedCellVec::Serialize(keyCells), result.GetKey()); + UNIT_ASSERT_VALUES_EQUAL(upsert.GetData(), result.GetUpsert().GetData()); + UNIT_ASSERT(result.GetUpsert().TagsSize() == 1); + UNIT_ASSERT(result.GetUpsert().GetTags(0) == 1); + } + + } +} + +} // namespace NKikimr::NBackup::NImpl diff --git a/ydb/core/backup/impl/ut_table_writer/ya.make b/ydb/core/backup/impl/ut_table_writer/ya.make new file mode 100644 index 0000000000..e9634d6754 --- /dev/null +++ b/ydb/core/backup/impl/ut_table_writer/ya.make @@ -0,0 +1,20 @@ +UNITTEST_FOR(ydb/core/backup/impl) + +FORK_SUBTESTS() + +SIZE(MEDIUM) + +TIMEOUT(600) + +PEERDIR( + library/cpp/testing/unittest + ydb/core/tx/replication/ut_helpers +) + +SRCS( + table_writer_ut.cpp +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/backup/impl/ya.make b/ydb/core/backup/impl/ya.make index 029bdf6ed5..1e3b06c2a3 100644 --- a/ydb/core/backup/impl/ya.make +++ b/ydb/core/backup/impl/ya.make @@ -15,4 +15,5 @@ END() RECURSE_FOR_TESTS( ut_local_partition_reader + ut_table_writer ) diff --git a/ydb/core/tx/replication/service/json_change_record.h b/ydb/core/tx/replication/service/json_change_record.h index f94091d02c..d85f262cbb 100644 --- a/ydb/core/tx/replication/service/json_change_record.h +++ b/ydb/core/tx/replication/service/json_change_record.h @@ -139,6 +139,11 @@ struct TChangeRecordBuilderContextTrait<NReplication::NService::TChangeRecord> { TChangeRecordBuilderContextTrait() : MemoryPool(256) {} + + // do not preserve any state between writers, just construct new one. + TChangeRecordBuilderContextTrait(const TChangeRecordBuilderContextTrait<NReplication::NService::TChangeRecord>&) + : MemoryPool(256) + {} }; } diff --git a/ydb/core/tx/replication/service/table_writer_impl.h b/ydb/core/tx/replication/service/table_writer_impl.h index 1fd77232fb..5642bc0bbc 100644 --- a/ydb/core/tx/replication/service/table_writer_impl.h +++ b/ydb/core/tx/replication/service/table_writer_impl.h @@ -178,12 +178,16 @@ public: return NKikimrServices::TActivity::REPLICATION_TABLE_PARTITION_WRITER; } - explicit TTablePartitionWriter(const TActorId& parent, ui64 tabletId, const TTableId& tableId) + explicit TTablePartitionWriter( + const TActorId& parent, + ui64 tabletId, + const TTableId& tableId, + TChangeRecordBuilderContextTrait<TChangeRecord> builderContext) : Parent(parent) , TabletId(tabletId) , TableId(tableId) - { - } + , BuilderContext(builderContext) + {} void Bootstrap() { GetProxyServices(); @@ -430,7 +434,11 @@ class TLocalTableWriter } IActor* CreateSender(ui64 partitionId) const override { - return new TTablePartitionWriter<TChangeRecord>(this->SelfId(), partitionId, TTableId(this->PathId, Schema->Version)); + return new TTablePartitionWriter<TChangeRecord>( + this->SelfId(), + partitionId, + TTableId(this->PathId, Schema->Version), + BuilderContext); } const TVector<TKeyDesc::TPartitionInfo>& GetPartitions() const override { return KeyDesc->GetPartitions(); } @@ -521,9 +529,11 @@ public: return NKikimrServices::TActivity::REPLICATION_LOCAL_TABLE_WRITER; } - explicit TLocalTableWriter(const TPathId& tablePathId) + template <class... TArgs> + explicit TLocalTableWriter(const TPathId& tablePathId, TArgs&&... args) : TBase(&TThis::StateWork) , TBaseSender(this, this, this, TActorId(), tablePathId) + , BuilderContext(std::forward<TArgs>(args)...) { } @@ -544,6 +554,7 @@ public: private: mutable TMaybe<TString> LogPrefix; + TChangeRecordBuilderContextTrait<TChangeRecord> BuilderContext; TActorId Worker; ui64 TableVersion = 0; |