diff options
author | Innokentii Mokin <innokentii@ydb.tech> | 2024-09-10 16:42:51 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-10 16:42:51 +0300 |
commit | 94f3e4ab8519d5c4b97d6ed9e77f28f3e0b219b7 (patch) | |
tree | f01d690febe9c7a5e5d5da7127eaa5fc22f59f33 | |
parent | 69daba2b78745713aee46f9d4b6cc783ecb2f22f (diff) | |
download | ydb-94f3e4ab8519d5c4b97d6ed9e77f28f3e0b219b7.tar.gz |
Make change sender identity explicit (#8995)
-rw-r--r-- | ydb/core/change_exchange/change_sender_common_ops.h | 22 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_async_index.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 15 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/table_writer_impl.h | 21 |
4 files changed, 50 insertions, 24 deletions
diff --git a/ydb/core/change_exchange/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h index 7d774339f4..a220178d80 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.h +++ b/ydb/core/change_exchange/change_sender_common_ops.h @@ -70,6 +70,12 @@ public: virtual IActor* CreateSender(ui64 partitionId) const = 0; }; +class IChangeSenderIdentity { +public: + virtual ~IChangeSenderIdentity() = default; + virtual TPathId GetChangeSenderIdentity() const = 0; +}; + template <typename TChangeRecord> class TBaseChangeSender { using TIncompleteRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; @@ -470,8 +476,8 @@ protected: void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) { for (auto& record : records) { - Y_VERIFY_S(PathId == record.PathId, "Unexpected record's path id" - << ": expected# " << PathId + Y_VERIFY_S(Identity->GetChangeSenderIdentity() == record.PathId, "Unexpected record's path id" + << ": expected# " << Identity->GetChangeSenderIdentity() << ", got# " << record.PathId); Enqueued.emplace(record.Order, record.BodySize); } @@ -561,15 +567,15 @@ protected: explicit TBaseChangeSender( IActorOps* const actorOps, + IChangeSenderIdentity* const identity, IChangeSenderResolver* const resolver, ISenderFactory* const senderFactory, - const TActorId changeServer, - const TPathId& pathId) + const TActorId changeServer) : ActorOps(actorOps) + , Identity(identity) , Resolver(resolver) , SenderFactory(senderFactory) , ChangeServer(changeServer) - , PathId(pathId) , MemLimit(192_KB) , MemUsage(0) {} @@ -585,7 +591,7 @@ protected: ctx.Send(ev->Forward(to)); } else { ActorOps->Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(TStringBuilder() - << "Change sender '" << PathId << ":" << partitionId << "' is not running")); + << "Change sender '" << Identity->GetChangeSenderIdentity() << ":" << partitionId << "' is not running")); } } else { ActorOps->Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(NMonitoring::HTTPNOTFOUND)); @@ -635,7 +641,7 @@ protected: TABLED() { html << sender.Pending.size(); } TABLED() { html << sender.Prepared.size(); } TABLED() { html << sender.Broadcasting.size(); } - TABLED() { ActorLink(html, tabletId, PathId, partitionId); } + TABLED() { ActorLink(html, tabletId, Identity->GetChangeSenderIdentity(), partitionId); } } } } @@ -763,13 +769,13 @@ protected: private: IActorOps* const ActorOps; + IChangeSenderIdentity* const Identity; IChangeSenderResolver* const Resolver; ISenderFactory* const SenderFactory; THolder<IChangeSenderPartitioner<TChangeRecord>> Partitioner; protected: TActorId ChangeServer; - const TPathId PathId; private: const ui64 MemLimit; diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 536c8bc97d..98bcf91fdc 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -331,6 +331,7 @@ private: class TAsyncIndexChangeSenderMain : public TActorBootstrapped<TAsyncIndexChangeSenderMain> , public NChangeExchange::TBaseChangeSender<TChangeRecord> + , public NChangeExchange::IChangeSenderIdentity , public NChangeExchange::IChangeSenderResolver , public NChangeExchange::ISenderFactory , private NSchemeCache::TSchemeCacheHelpers @@ -502,7 +503,7 @@ class TAsyncIndexChangeSenderMain void ResolveIndex() { auto request = MakeHolder<TNavigate>(); - request->ResultSet.emplace_back(MakeNavigateEntry(PathId, TNavigate::OpList)); + request->ResultSet.emplace_back(MakeNavigateEntry(IndexPathId, TNavigate::OpList)); Send(MakeSchemeCacheID(), new TEvNavigate(request.Release())); Become(&TThis::StateResolveIndex); @@ -533,7 +534,7 @@ class TAsyncIndexChangeSenderMain const auto& entry = result->ResultSet.at(0); - if (!CheckTableId(entry, PathId)) { + if (!CheckTableId(entry, IndexPathId)) { return; } @@ -746,7 +747,7 @@ class TAsyncIndexChangeSenderMain void Handle(TEvChangeExchange::TEvRemoveSender::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); - Y_ABORT_UNLESS(ev->Get()->PathId == PathId); + Y_ABORT_UNLESS(ev->Get()->PathId == GetChangeSenderIdentity()); RemoveRecords(); PassAway(); @@ -773,7 +774,8 @@ public: explicit TAsyncIndexChangeSenderMain(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId) : TActorBootstrapped() - , TBaseChangeSender(this, this, this, dataShard.ActorId, indexPathId) + , TBaseChangeSender(this, this, this, this, dataShard.ActorId) + , IndexPathId(indexPathId) , DataShard(dataShard) , UserTableId(userTableId) , IndexTableVersion(0) @@ -806,7 +808,12 @@ public: } } + TPathId GetChangeSenderIdentity() const override final { + return IndexPathId; + } + private: + const TPathId IndexPathId; const TDataShardId DataShard; const TTableId UserTableId; mutable TMaybe<TString> LogPrefix; @@ -817,7 +824,6 @@ private: TPathId IndexTablePathId; ui64 IndexTableVersion; THolder<TKeyDesc> KeyDesc; - }; // TAsyncIndexChangeSenderMain IActor* CreateAsyncIndexChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId) { diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 4983bb2785..a6dd8d56e6 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -326,6 +326,7 @@ private: class TCdcChangeSenderMain : public TActorBootstrapped<TCdcChangeSenderMain> , public NChangeExchange::TBaseChangeSender<TChangeRecord> + , public NChangeExchange::IChangeSenderIdentity , public NChangeExchange::IChangeSenderResolver , public NChangeExchange::ISenderFactory , private NSchemeCache::TSchemeCacheHelpers @@ -474,7 +475,7 @@ class TCdcChangeSenderMain void ResolveCdcStream() { auto request = MakeHolder<TNavigate>(); - request->ResultSet.emplace_back(MakeNavigateEntry(PathId, TNavigate::OpList)); + request->ResultSet.emplace_back(MakeNavigateEntry(StreamPathId, TNavigate::OpList)); Send(MakeSchemeCacheID(), new TEvNavigate(request.Release())); Become(&TThis::StateResolveCdcStream); @@ -505,7 +506,7 @@ class TCdcChangeSenderMain const auto& entry = result->ResultSet.at(0); - if (!CheckTableId(entry, PathId)) { + if (!CheckTableId(entry, StreamPathId)) { return; } @@ -718,7 +719,7 @@ class TCdcChangeSenderMain void Handle(TEvChangeExchange::TEvRemoveSender::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); - Y_ABORT_UNLESS(ev->Get()->PathId == PathId); + Y_ABORT_UNLESS(ev->Get()->PathId == GetChangeSenderIdentity()); RemoveRecords(); PassAway(); @@ -745,7 +746,8 @@ public: explicit TCdcChangeSenderMain(const TDataShardId& dataShard, const TPathId& streamPathId) : TActorBootstrapped() - , TBaseChangeSender(this, this, this, dataShard.ActorId, streamPathId) + , TBaseChangeSender(this, this, this, this, dataShard.ActorId) + , StreamPathId(streamPathId) , DataShard(dataShard) , TopicVersion(0) { @@ -777,7 +779,12 @@ public: } } + TPathId GetChangeSenderIdentity() const override final { + return StreamPathId; + } + private: + const TPathId StreamPathId; const TDataShardId DataShard; mutable TMaybe<TString> LogPrefix; diff --git a/ydb/core/tx/replication/service/table_writer_impl.h b/ydb/core/tx/replication/service/table_writer_impl.h index af87908f22..31783cf35e 100644 --- a/ydb/core/tx/replication/service/table_writer_impl.h +++ b/ydb/core/tx/replication/service/table_writer_impl.h @@ -215,6 +215,7 @@ template <typename TChangeRecord> class TLocalTableWriter : public TActor<TLocalTableWriter<TChangeRecord>> , public NChangeExchange::TBaseChangeSender<TChangeRecord> + , public NChangeExchange::IChangeSenderIdentity , public NChangeExchange::IChangeSenderResolver , public NChangeExchange::ISenderFactory , private NSchemeCache::TSchemeCacheHelpers @@ -227,7 +228,7 @@ class TLocalTableWriter if (!LogPrefix) { LogPrefix = TStringBuilder() << "[LocalTableWriter]" - << this->PathId + << TablePathId << TBase::SelfId() << " "; } @@ -318,7 +319,7 @@ class TLocalTableWriter Resolving = true; auto request = MakeHolder<TNavigate>(); - request->ResultSet.emplace_back(MakeNavigateEntry(this->PathId, TNavigate::OpTable)); + request->ResultSet.emplace_back(MakeNavigateEntry(TablePathId, TNavigate::OpTable)); this->Send(MakeSchemeCacheID(), new TEvNavigate(request.Release())); } @@ -338,7 +339,7 @@ class TLocalTableWriter const auto& entry = result->ResultSet.at(0); - if (!CheckTableId(entry, this->PathId)) { + if (!CheckTableId(entry, TablePathId)) { return; } @@ -407,7 +408,7 @@ class TLocalTableWriter auto& entry = result->ResultSet.at(0); - if (!CheckTableId(entry, this->PathId)) { + if (!CheckTableId(entry, TablePathId)) { return; } @@ -437,7 +438,7 @@ class TLocalTableWriter return new TTablePartitionWriter<TChangeRecord>( this->SelfId(), partitionId, - TTableId(this->PathId, Schema->Version), + TTableId(TablePathId, Schema->Version), BuilderContext); } @@ -448,7 +449,7 @@ class TLocalTableWriter TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> records(::Reserve(ev->Get()->Records.size())); for (auto& record : ev->Get()->Records) { - records.emplace_back(record.Offset, this->PathId, record.Data.size()); + records.emplace_back(record.Offset, TablePathId, record.Data.size()); auto res = PendingRecords.emplace(record.Offset, TChangeRecordBuilderTrait<TChangeRecord>() .WithSourceId(ev->Get()->Source) .WithOrder(record.Offset) @@ -528,11 +529,16 @@ public: template <class... TArgs> explicit TLocalTableWriter(const TPathId& tablePathId, TArgs&&... args) : TBase(&TThis::StateWork) - , TBaseSender(this, this, this, TActorId(), tablePathId) + , TBaseSender(this, this, this, this, TActorId()) + , TablePathId(tablePathId) , BuilderContext(std::forward<TArgs>(args)...) { } + TPathId GetChangeSenderIdentity() const override final { + return TablePathId; + } + STFUNC(StateWork) { switch (ev->GetTypeRewrite()) { hFunc(TEvWorker::TEvHandshake, Handle); @@ -550,6 +556,7 @@ public: private: mutable TMaybe<TString> LogPrefix; + const TPathId TablePathId; TChangeRecordBuilderContextTrait<TChangeRecord> BuilderContext; TActorId Worker; |