aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorInnokentii Mokin <innokentii@ydb.tech>2024-09-10 16:42:51 +0300
committerGitHub <noreply@github.com>2024-09-10 16:42:51 +0300
commit94f3e4ab8519d5c4b97d6ed9e77f28f3e0b219b7 (patch)
treef01d690febe9c7a5e5d5da7127eaa5fc22f59f33
parent69daba2b78745713aee46f9d4b6cc783ecb2f22f (diff)
downloadydb-94f3e4ab8519d5c4b97d6ed9e77f28f3e0b219b7.tar.gz
Make change sender identity explicit (#8995)
-rw-r--r--ydb/core/change_exchange/change_sender_common_ops.h22
-rw-r--r--ydb/core/tx/datashard/change_sender_async_index.cpp16
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp15
-rw-r--r--ydb/core/tx/replication/service/table_writer_impl.h21
4 files changed, 50 insertions, 24 deletions
diff --git a/ydb/core/change_exchange/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h
index 7d774339f4..a220178d80 100644
--- a/ydb/core/change_exchange/change_sender_common_ops.h
+++ b/ydb/core/change_exchange/change_sender_common_ops.h
@@ -70,6 +70,12 @@ public:
virtual IActor* CreateSender(ui64 partitionId) const = 0;
};
+class IChangeSenderIdentity {
+public:
+ virtual ~IChangeSenderIdentity() = default;
+ virtual TPathId GetChangeSenderIdentity() const = 0;
+};
+
template <typename TChangeRecord>
class TBaseChangeSender {
using TIncompleteRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
@@ -470,8 +476,8 @@ protected:
void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) {
for (auto& record : records) {
- Y_VERIFY_S(PathId == record.PathId, "Unexpected record's path id"
- << ": expected# " << PathId
+ Y_VERIFY_S(Identity->GetChangeSenderIdentity() == record.PathId, "Unexpected record's path id"
+ << ": expected# " << Identity->GetChangeSenderIdentity()
<< ", got# " << record.PathId);
Enqueued.emplace(record.Order, record.BodySize);
}
@@ -561,15 +567,15 @@ protected:
explicit TBaseChangeSender(
IActorOps* const actorOps,
+ IChangeSenderIdentity* const identity,
IChangeSenderResolver* const resolver,
ISenderFactory* const senderFactory,
- const TActorId changeServer,
- const TPathId& pathId)
+ const TActorId changeServer)
: ActorOps(actorOps)
+ , Identity(identity)
, Resolver(resolver)
, SenderFactory(senderFactory)
, ChangeServer(changeServer)
- , PathId(pathId)
, MemLimit(192_KB)
, MemUsage(0)
{}
@@ -585,7 +591,7 @@ protected:
ctx.Send(ev->Forward(to));
} else {
ActorOps->Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(TStringBuilder()
- << "Change sender '" << PathId << ":" << partitionId << "' is not running"));
+ << "Change sender '" << Identity->GetChangeSenderIdentity() << ":" << partitionId << "' is not running"));
}
} else {
ActorOps->Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(NMonitoring::HTTPNOTFOUND));
@@ -635,7 +641,7 @@ protected:
TABLED() { html << sender.Pending.size(); }
TABLED() { html << sender.Prepared.size(); }
TABLED() { html << sender.Broadcasting.size(); }
- TABLED() { ActorLink(html, tabletId, PathId, partitionId); }
+ TABLED() { ActorLink(html, tabletId, Identity->GetChangeSenderIdentity(), partitionId); }
}
}
}
@@ -763,13 +769,13 @@ protected:
private:
IActorOps* const ActorOps;
+ IChangeSenderIdentity* const Identity;
IChangeSenderResolver* const Resolver;
ISenderFactory* const SenderFactory;
THolder<IChangeSenderPartitioner<TChangeRecord>> Partitioner;
protected:
TActorId ChangeServer;
- const TPathId PathId;
private:
const ui64 MemLimit;
diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp
index 536c8bc97d..98bcf91fdc 100644
--- a/ydb/core/tx/datashard/change_sender_async_index.cpp
+++ b/ydb/core/tx/datashard/change_sender_async_index.cpp
@@ -331,6 +331,7 @@ private:
class TAsyncIndexChangeSenderMain
: public TActorBootstrapped<TAsyncIndexChangeSenderMain>
, public NChangeExchange::TBaseChangeSender<TChangeRecord>
+ , public NChangeExchange::IChangeSenderIdentity
, public NChangeExchange::IChangeSenderResolver
, public NChangeExchange::ISenderFactory
, private NSchemeCache::TSchemeCacheHelpers
@@ -502,7 +503,7 @@ class TAsyncIndexChangeSenderMain
void ResolveIndex() {
auto request = MakeHolder<TNavigate>();
- request->ResultSet.emplace_back(MakeNavigateEntry(PathId, TNavigate::OpList));
+ request->ResultSet.emplace_back(MakeNavigateEntry(IndexPathId, TNavigate::OpList));
Send(MakeSchemeCacheID(), new TEvNavigate(request.Release()));
Become(&TThis::StateResolveIndex);
@@ -533,7 +534,7 @@ class TAsyncIndexChangeSenderMain
const auto& entry = result->ResultSet.at(0);
- if (!CheckTableId(entry, PathId)) {
+ if (!CheckTableId(entry, IndexPathId)) {
return;
}
@@ -746,7 +747,7 @@ class TAsyncIndexChangeSenderMain
void Handle(TEvChangeExchange::TEvRemoveSender::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
- Y_ABORT_UNLESS(ev->Get()->PathId == PathId);
+ Y_ABORT_UNLESS(ev->Get()->PathId == GetChangeSenderIdentity());
RemoveRecords();
PassAway();
@@ -773,7 +774,8 @@ public:
explicit TAsyncIndexChangeSenderMain(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId)
: TActorBootstrapped()
- , TBaseChangeSender(this, this, this, dataShard.ActorId, indexPathId)
+ , TBaseChangeSender(this, this, this, this, dataShard.ActorId)
+ , IndexPathId(indexPathId)
, DataShard(dataShard)
, UserTableId(userTableId)
, IndexTableVersion(0)
@@ -806,7 +808,12 @@ public:
}
}
+ TPathId GetChangeSenderIdentity() const override final {
+ return IndexPathId;
+ }
+
private:
+ const TPathId IndexPathId;
const TDataShardId DataShard;
const TTableId UserTableId;
mutable TMaybe<TString> LogPrefix;
@@ -817,7 +824,6 @@ private:
TPathId IndexTablePathId;
ui64 IndexTableVersion;
THolder<TKeyDesc> KeyDesc;
-
}; // TAsyncIndexChangeSenderMain
IActor* CreateAsyncIndexChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId) {
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 4983bb2785..a6dd8d56e6 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -326,6 +326,7 @@ private:
class TCdcChangeSenderMain
: public TActorBootstrapped<TCdcChangeSenderMain>
, public NChangeExchange::TBaseChangeSender<TChangeRecord>
+ , public NChangeExchange::IChangeSenderIdentity
, public NChangeExchange::IChangeSenderResolver
, public NChangeExchange::ISenderFactory
, private NSchemeCache::TSchemeCacheHelpers
@@ -474,7 +475,7 @@ class TCdcChangeSenderMain
void ResolveCdcStream() {
auto request = MakeHolder<TNavigate>();
- request->ResultSet.emplace_back(MakeNavigateEntry(PathId, TNavigate::OpList));
+ request->ResultSet.emplace_back(MakeNavigateEntry(StreamPathId, TNavigate::OpList));
Send(MakeSchemeCacheID(), new TEvNavigate(request.Release()));
Become(&TThis::StateResolveCdcStream);
@@ -505,7 +506,7 @@ class TCdcChangeSenderMain
const auto& entry = result->ResultSet.at(0);
- if (!CheckTableId(entry, PathId)) {
+ if (!CheckTableId(entry, StreamPathId)) {
return;
}
@@ -718,7 +719,7 @@ class TCdcChangeSenderMain
void Handle(TEvChangeExchange::TEvRemoveSender::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
- Y_ABORT_UNLESS(ev->Get()->PathId == PathId);
+ Y_ABORT_UNLESS(ev->Get()->PathId == GetChangeSenderIdentity());
RemoveRecords();
PassAway();
@@ -745,7 +746,8 @@ public:
explicit TCdcChangeSenderMain(const TDataShardId& dataShard, const TPathId& streamPathId)
: TActorBootstrapped()
- , TBaseChangeSender(this, this, this, dataShard.ActorId, streamPathId)
+ , TBaseChangeSender(this, this, this, this, dataShard.ActorId)
+ , StreamPathId(streamPathId)
, DataShard(dataShard)
, TopicVersion(0)
{
@@ -777,7 +779,12 @@ public:
}
}
+ TPathId GetChangeSenderIdentity() const override final {
+ return StreamPathId;
+ }
+
private:
+ const TPathId StreamPathId;
const TDataShardId DataShard;
mutable TMaybe<TString> LogPrefix;
diff --git a/ydb/core/tx/replication/service/table_writer_impl.h b/ydb/core/tx/replication/service/table_writer_impl.h
index af87908f22..31783cf35e 100644
--- a/ydb/core/tx/replication/service/table_writer_impl.h
+++ b/ydb/core/tx/replication/service/table_writer_impl.h
@@ -215,6 +215,7 @@ template <typename TChangeRecord>
class TLocalTableWriter
: public TActor<TLocalTableWriter<TChangeRecord>>
, public NChangeExchange::TBaseChangeSender<TChangeRecord>
+ , public NChangeExchange::IChangeSenderIdentity
, public NChangeExchange::IChangeSenderResolver
, public NChangeExchange::ISenderFactory
, private NSchemeCache::TSchemeCacheHelpers
@@ -227,7 +228,7 @@ class TLocalTableWriter
if (!LogPrefix) {
LogPrefix = TStringBuilder()
<< "[LocalTableWriter]"
- << this->PathId
+ << TablePathId
<< TBase::SelfId() << " ";
}
@@ -318,7 +319,7 @@ class TLocalTableWriter
Resolving = true;
auto request = MakeHolder<TNavigate>();
- request->ResultSet.emplace_back(MakeNavigateEntry(this->PathId, TNavigate::OpTable));
+ request->ResultSet.emplace_back(MakeNavigateEntry(TablePathId, TNavigate::OpTable));
this->Send(MakeSchemeCacheID(), new TEvNavigate(request.Release()));
}
@@ -338,7 +339,7 @@ class TLocalTableWriter
const auto& entry = result->ResultSet.at(0);
- if (!CheckTableId(entry, this->PathId)) {
+ if (!CheckTableId(entry, TablePathId)) {
return;
}
@@ -407,7 +408,7 @@ class TLocalTableWriter
auto& entry = result->ResultSet.at(0);
- if (!CheckTableId(entry, this->PathId)) {
+ if (!CheckTableId(entry, TablePathId)) {
return;
}
@@ -437,7 +438,7 @@ class TLocalTableWriter
return new TTablePartitionWriter<TChangeRecord>(
this->SelfId(),
partitionId,
- TTableId(this->PathId, Schema->Version),
+ TTableId(TablePathId, Schema->Version),
BuilderContext);
}
@@ -448,7 +449,7 @@ class TLocalTableWriter
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> records(::Reserve(ev->Get()->Records.size()));
for (auto& record : ev->Get()->Records) {
- records.emplace_back(record.Offset, this->PathId, record.Data.size());
+ records.emplace_back(record.Offset, TablePathId, record.Data.size());
auto res = PendingRecords.emplace(record.Offset, TChangeRecordBuilderTrait<TChangeRecord>()
.WithSourceId(ev->Get()->Source)
.WithOrder(record.Offset)
@@ -528,11 +529,16 @@ public:
template <class... TArgs>
explicit TLocalTableWriter(const TPathId& tablePathId, TArgs&&... args)
: TBase(&TThis::StateWork)
- , TBaseSender(this, this, this, TActorId(), tablePathId)
+ , TBaseSender(this, this, this, this, TActorId())
+ , TablePathId(tablePathId)
, BuilderContext(std::forward<TArgs>(args)...)
{
}
+ TPathId GetChangeSenderIdentity() const override final {
+ return TablePathId;
+ }
+
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvWorker::TEvHandshake, Handle);
@@ -550,6 +556,7 @@ public:
private:
mutable TMaybe<TString> LogPrefix;
+ const TPathId TablePathId;
TChangeRecordBuilderContextTrait<TChangeRecord> BuilderContext;
TActorId Worker;