diff options
| author | Ilnaz Nizametdinov <[email protected]> | 2024-01-22 21:00:03 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-22 21:00:03 +0300 |
| commit | d856b1bf8ed685b9b388a9ed67326d0769dacb1b (patch) | |
| tree | 87122d2b0fc3be00e449d26a31b3e1f3f1fd569a | |
| parent | 3a256cad7e0aa90a259a0083fb743b9ba30aa5b9 (diff) | |
Move TBaseChangeSender to ydb/core/change_exchange KIKIMR-20673 (#1207)
| -rw-r--r-- | ydb/core/change_exchange/change_sender_common_ops.cpp (renamed from ydb/core/tx/datashard/change_sender_common_ops.cpp) | 26 | ||||
| -rw-r--r-- | ydb/core/change_exchange/change_sender_common_ops.h (renamed from ydb/core/tx/datashard/change_sender_common_ops.h) | 39 | ||||
| -rw-r--r-- | ydb/core/change_exchange/change_sender_monitoring.cpp (renamed from ydb/core/tx/datashard/change_sender_monitoring.cpp) | 2 | ||||
| -rw-r--r-- | ydb/core/change_exchange/change_sender_monitoring.h (renamed from ydb/core/tx/datashard/change_sender_monitoring.h) | 2 | ||||
| -rw-r--r-- | ydb/core/change_exchange/ya.make | 5 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/change_exchange_split.cpp | 1 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/change_sender.cpp | 4 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/change_sender_async_index.cpp | 30 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 29 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/ya.make | 2 |
10 files changed, 75 insertions, 65 deletions
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/change_exchange/change_sender_common_ops.cpp index 1c57d3c17a7..c3887dfec62 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/change_exchange/change_sender_common_ops.cpp @@ -1,13 +1,15 @@ #include "change_sender_common_ops.h" #include "change_sender_monitoring.h" +#include <ydb/library/yverify_stream/yverify_stream.h> + #include <library/cpp/monlib/service/pages/mon_page.h> #include <library/cpp/monlib/service/pages/templates.h> #include <util/generic/algorithm.h> #include <util/generic/size_literals.h> -namespace NKikimr::NDataShard { +namespace NKikimr::NChangeExchange { void TBaseChangeSender::LazyCreateSender(THashMap<ui64, TSender>& senders, ui64 partitionId) { auto res = senders.emplace(partitionId, TSender{}); @@ -82,7 +84,7 @@ void TBaseChangeSender::KillSenders() { } } -void TBaseChangeSender::EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) { +void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) { for (auto& record : records) { Y_VERIFY_S(PathId == record.PathId, "Unexpected record's path id" << ": expected# " << PathId @@ -117,11 +119,11 @@ bool TBaseChangeSender::RequestRecords() { return false; } - ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRequestRecords(std::move(records))); + ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRequestRecords(std::move(records))); return true; } -void TBaseChangeSender::ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) { +void TBaseChangeSender::ProcessRecords(TVector<IChangeRecord::TPtr>&& records) { for (auto& record : records) { auto it = PendingBody.find(record->GetOrder()); if (it == PendingBody.end()) { @@ -290,7 +292,7 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) { } Y_ABORT_UNLESS(sender.ActorId); - ActorOps->Send(sender.ActorId, new NChangeExchange::TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {}))); + ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {}))); } void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) { @@ -306,7 +308,7 @@ void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) { } } -TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(NChangeExchange::IChangeRecord::TPtr record) { +TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(IChangeRecord::TPtr record) { Y_ABORT_UNLESS(record->IsBroadcast()); auto it = Broadcasting.find(record->GetOrder()); @@ -430,17 +432,17 @@ void TBaseChangeSender::RemoveRecords() { } TBaseChangeSender::TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver, - const TDataShardId& dataShard, const TPathId& pathId) + const TActorId& changeServer, const TPathId& pathId) : ActorOps(actorOps) , Resolver(resolver) - , DataShard(dataShard) + , ChangeServer(changeServer) , PathId(pathId) , MemLimit(192_KB) , MemUsage(0) { } -void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon::TEvRemoteHttpInfo::TPtr& ev, +void TBaseChangeSender::RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { const auto& cgi = ev->Get()->Cgi(); @@ -468,7 +470,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon TStringStream html; HTML(html) { - Header(html, TStringBuilder() << type << " change sender", DataShard.TabletId); + Header(html, "Change sender", tabletId); SimplePanel(html, "Info", [this](IOutputStream& html) { HTML(html) { @@ -479,7 +481,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon } }); - SimplePanel(html, "Partition senders", [this](IOutputStream& html) { + SimplePanel(html, "Partition senders", [this, tabletId](IOutputStream& html) { HTML(html) { TABLE_CLASS("table table-hover") { TABLEHEAD() { @@ -503,7 +505,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon TABLED() { html << sender.Pending.size(); } TABLED() { html << sender.Prepared.size(); } TABLED() { html << sender.Broadcasting.size(); } - TABLED() { ActorLink(html, DataShard.TabletId, PathId, partitionId); } + TABLED() { ActorLink(html, tabletId, PathId, partitionId); } } } } diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h index 6b418ebf7d8..fe516176a80 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/change_exchange/change_sender_common_ops.h @@ -1,10 +1,7 @@ #pragma once #include "change_exchange.h" -#include "change_exchange_helpers.h" -#include <ydb/core/base/appdata.h> -#include <ydb/core/change_exchange/change_exchange.h> #include <ydb/library/actors/core/actor.h> #include <ydb/library/actors/core/hfunc.h> #include <ydb/library/actors/core/mon.h> @@ -14,8 +11,7 @@ #include <util/generic/set.h> #include <util/string/builder.h> -namespace NKikimr { -namespace NDataShard { +namespace NKikimr::NChangeExchange { struct TEvChangeExchangePrivate { enum EEv { @@ -61,8 +57,8 @@ public: virtual IActor* CreateSender(ui64 partitionId) = 0; virtual void RemoveRecords() = 0; - virtual void EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) = 0; - virtual void ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) = 0; + virtual void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) = 0; + virtual void ProcessRecords(TVector<IChangeRecord::TPtr>&& records) = 0; virtual void ForgetRecords(TVector<ui64>&& records) = 0; virtual void OnReady(ui64 partitionId) = 0; virtual void OnGone(ui64 partitionId) = 0; @@ -75,18 +71,18 @@ public: virtual void Resolve() = 0; virtual bool IsResolving() const = 0; virtual bool IsResolved() const = 0; - virtual ui64 GetPartitionId(NChangeExchange::IChangeRecord::TPtr record) const = 0; + virtual ui64 GetPartitionId(IChangeRecord::TPtr record) const = 0; }; class TBaseChangeSender: public IChangeSender { - using TEnqueuedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo; - using TRequestedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo; + using TEnqueuedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; + using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; struct TSender { TActorId ActorId; bool Ready = false; TVector<TEnqueuedRecord> Pending; - TVector<NChangeExchange::IChangeRecord::TPtr> Prepared; + TVector<IChangeRecord::TPtr> Prepared; TVector<ui64> Broadcasting; }; @@ -108,7 +104,7 @@ class TBaseChangeSender: public IChangeSender { void SendPreparedRecords(ui64 partitionId); void ReEnqueueRecords(const TSender& sender); - TBroadcast& EnsureBroadcast(NChangeExchange::IChangeRecord::TPtr record); + TBroadcast& EnsureBroadcast(IChangeRecord::TPtr record); bool AddBroadcastPartition(ui64 order, ui64 partitionId); bool RemoveBroadcastPartition(ui64 order, ui64 partitionId); bool CompleteBroadcastPartition(ui64 order, ui64 partitionId); @@ -124,35 +120,35 @@ protected: remove.push_back(record.Order); } - ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(remove))); + ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); } template <> void RemoveRecords(TVector<ui64>&& records) { - ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(records))); + ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(records))); } void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) override; void KillSenders() override; void RemoveRecords() override; - void EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) override; - void ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) override; + void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) override; + void ProcessRecords(TVector<IChangeRecord::TPtr>&& records) override; void ForgetRecords(TVector<ui64>&& records) override; void OnReady(ui64 partitionId) override; void OnGone(ui64 partitionId) override; explicit TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver, - const TDataShardId& dataShard, const TPathId& pathId); + const TActorId& changeServer, const TPathId& pathId); - void RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx); + void RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx); private: IActorOps* const ActorOps; IChangeSenderResolver* const Resolver; protected: - const TDataShardId DataShard; + const TActorId ChangeServer; const TPathId PathId; private: @@ -162,12 +158,11 @@ private: THashMap<ui64, TSender> Senders; // ui64 is partition id TSet<TEnqueuedRecord> Enqueued; TSet<TRequestedRecord> PendingBody; - TMap<ui64, NChangeExchange::IChangeRecord::TPtr> PendingSent; // ui64 is order + TMap<ui64, IChangeRecord::TPtr> PendingSent; // ui64 is order THashMap<ui64, TBroadcast> Broadcasting; // ui64 is order TVector<ui64> GonePartitions; }; // TBaseChangeSender -} // NDataShard -} // NKikimr +} diff --git a/ydb/core/tx/datashard/change_sender_monitoring.cpp b/ydb/core/change_exchange/change_sender_monitoring.cpp index 9cc4ba1b6bc..9671bb787b5 100644 --- a/ydb/core/tx/datashard/change_sender_monitoring.cpp +++ b/ydb/core/change_exchange/change_sender_monitoring.cpp @@ -5,7 +5,7 @@ #include <util/string/printf.h> #include <util/string/split.h> -namespace NKikimr::NDataShard { +namespace NKikimr::NChangeExchange { void Panel(IOutputStream& str, std::function<void(IOutputStream&)> title, std::function<void(IOutputStream&)> body) { HTML(str) { diff --git a/ydb/core/tx/datashard/change_sender_monitoring.h b/ydb/core/change_exchange/change_sender_monitoring.h index fe9bf40b9a9..83205bd0e70 100644 --- a/ydb/core/tx/datashard/change_sender_monitoring.h +++ b/ydb/core/change_exchange/change_sender_monitoring.h @@ -6,7 +6,7 @@ #include <util/generic/maybe.h> -namespace NKikimr::NDataShard { +namespace NKikimr::NChangeExchange { template <typename T> static void Link(IOutputStream& str, const TStringBuf path, const T& title) { diff --git a/ydb/core/change_exchange/ya.make b/ydb/core/change_exchange/ya.make index a00055ba545..6e82179d832 100644 --- a/ydb/core/change_exchange/ya.make +++ b/ydb/core/change_exchange/ya.make @@ -3,6 +3,8 @@ LIBRARY() SRCS( change_exchange.cpp change_record.cpp + change_sender_common_ops.cpp + change_sender_monitoring.cpp ) GENERATE_ENUM_SERIALIZATION(change_record.h) @@ -10,6 +12,9 @@ GENERATE_ENUM_SERIALIZATION(change_record.h) PEERDIR( ydb/core/base ydb/core/scheme + ydb/library/actors/core + ydb/library/yverify_stream + library/cpp/monlib/service/pages ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/tx/datashard/change_exchange_split.cpp b/ydb/core/tx/datashard/change_exchange_split.cpp index 0bcd44390ed..93e6d68a3b8 100644 --- a/ydb/core/tx/datashard/change_exchange_split.cpp +++ b/ydb/core/tx/datashard/change_exchange_split.cpp @@ -1,6 +1,5 @@ #include "change_exchange.h" #include "change_exchange_helpers.h" -#include "change_sender_common_ops.h" #include "datashard_impl.h" #include <ydb/core/base/tablet_pipe.h> diff --git a/ydb/core/tx/datashard/change_sender.cpp b/ydb/core/tx/datashard/change_sender.cpp index 1f617d252e4..6150c00d0be 100644 --- a/ydb/core/tx/datashard/change_sender.cpp +++ b/ydb/core/tx/datashard/change_sender.cpp @@ -1,9 +1,9 @@ #include "change_exchange.h" #include "change_exchange_impl.h" -#include "change_sender_monitoring.h" #include "datashard_impl.h" #include <ydb/core/change_exchange/change_exchange.h> +#include <ydb/core/change_exchange/change_sender_monitoring.h> #include <ydb/library/actors/core/actor.h> #include <ydb/library/actors/core/hfunc.h> #include <ydb/library/actors/core/log.h> @@ -165,6 +165,8 @@ class TChangeSender: public TActor<TChangeSender> { } void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { + using namespace NChangeExchange; + const auto& cgi = ev->Get()->Cgi(); if (const auto& str = cgi.Get("pathId")) { if (const auto& pathId = ParsePathId(str)) { diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index d8356013456..98a225f5543 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -1,16 +1,16 @@ #include "change_exchange.h" #include "change_exchange_impl.h" #include "change_record.h" -#include "change_sender_common_ops.h" -#include "change_sender_monitoring.h" #include "datashard_impl.h" #include <ydb/core/base/tablet_pipecache.h> -#include <ydb/library/services/services.pb.h> +#include <ydb/core/change_exchange/change_sender_common_ops.h> +#include <ydb/core/change_exchange/change_sender_monitoring.h> #include <ydb/core/tablet_flat/flat_row_eggs.h> #include <ydb/core/tx/scheme_cache/helpers.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/library/services/services.pb.h> #include <ydb/library/yql/public/udf/udf_data_type.h> #include <ydb/library/actors/core/actor_bootstrapped.h> @@ -107,7 +107,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS } void Ready() { - Send(Parent, new TEvChangeExchangePrivate::TEvReady(ShardId)); + Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvReady(ShardId)); Become(&TThis::StateWaitingRecords); } @@ -240,6 +240,8 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS } void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev) { + using namespace NChangeExchange; + TStringStream html; HTML(html) { @@ -261,7 +263,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS } void Leave() { - Send(Parent, new TEvChangeExchangePrivate::TEvGone(ShardId)); + Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvGone(ShardId)); PassAway(); } @@ -328,8 +330,8 @@ private: class TAsyncIndexChangeSenderMain : public TActorBootstrapped<TAsyncIndexChangeSenderMain> - , public TBaseChangeSender - , public IChangeSenderResolver + , public NChangeExchange::TBaseChangeSender + , public NChangeExchange::IChangeSenderResolver , private NSchemeCache::TSchemeCacheHelpers { TStringBuf GetLogPrefix() const { @@ -753,12 +755,12 @@ class TAsyncIndexChangeSenderMain ForgetRecords(std::move(ev->Get()->Records)); } - void Handle(TEvChangeExchangePrivate::TEvReady::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvReady::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); OnReady(ev->Get()->PartitionId); } - void Handle(TEvChangeExchangePrivate::TEvGone::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvGone::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); OnGone(ev->Get()->PartitionId); } @@ -777,7 +779,7 @@ class TAsyncIndexChangeSenderMain } void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { - RenderHtmlPage(ESenderType::AsyncIndex, ev, ctx); + RenderHtmlPage(DataShard.TabletId, ev, ctx); } void PassAway() override { @@ -792,7 +794,8 @@ public: explicit TAsyncIndexChangeSenderMain(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId) : TActorBootstrapped() - , TBaseChangeSender(this, this, dataShard, indexPathId) + , TBaseChangeSender(this, this, dataShard.ActorId, indexPathId) + , DataShard(dataShard) , UserTableId(userTableId) , IndexTableVersion(0) { @@ -808,8 +811,8 @@ public: hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); - hFunc(TEvChangeExchangePrivate::TEvReady, Handle); - hFunc(TEvChangeExchangePrivate::TEvGone, Handle); + hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvReady, Handle); + hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvGone, Handle); HFunc(NMon::TEvRemoteHttpInfo, Handle); sFunc(TEvents::TEvPoison, PassAway); } @@ -825,6 +828,7 @@ public: } private: + const TDataShardId DataShard; const TTableId UserTableId; mutable TMaybe<TString> LogPrefix; diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 60e107569e6..111f1c74c69 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -2,10 +2,10 @@ #include "change_exchange_impl.h" #include "change_record.h" #include "change_record_cdc_serializer.h" -#include "change_sender_common_ops.h" -#include "change_sender_monitoring.h" #include "datashard_user_table.h" +#include <ydb/core/change_exchange/change_sender_common_ops.h> +#include <ydb/core/change_exchange/change_sender_monitoring.h> #include <ydb/core/persqueue/partition_key_range/partition_key_range.h> #include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/persqueue/writer/writer.h> @@ -16,6 +16,7 @@ #include <ydb/library/actors/core/actor_bootstrapped.h> #include <ydb/library/actors/core/hfunc.h> #include <ydb/library/actors/core/log.h> + #include <library/cpp/json/json_writer.h> namespace NKikimr::NDataShard { @@ -75,7 +76,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti void Ready() { Pending.clear(); - Send(Parent, new TEvChangeExchangePrivate::TEvReady(PartitionId)); + Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvReady(PartitionId)); Become(&TThis::StateWaitingRecords); } @@ -179,6 +180,8 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti } void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev) { + using namespace NChangeExchange; + TStringStream html; HTML(html) { @@ -219,7 +222,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti } void Leave() { - Send(Parent, new TEvChangeExchangePrivate::TEvGone(PartitionId)); + Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvGone(PartitionId)); PassAway(); } @@ -291,8 +294,8 @@ private: class TCdcChangeSenderMain : public TActorBootstrapped<TCdcChangeSenderMain> - , public TBaseChangeSender - , public IChangeSenderResolver + , public NChangeExchange::TBaseChangeSender + , public NChangeExchange::IChangeSenderResolver , private NSchemeCache::TSchemeCacheHelpers { struct TPQPartitionInfo { @@ -718,12 +721,12 @@ class TCdcChangeSenderMain ForgetRecords(std::move(ev->Get()->Records)); } - void Handle(TEvChangeExchangePrivate::TEvReady::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvReady::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); OnReady(ev->Get()->PartitionId); } - void Handle(TEvChangeExchangePrivate::TEvGone::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvGone::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); OnGone(ev->Get()->PartitionId); } @@ -742,7 +745,7 @@ class TCdcChangeSenderMain } void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { - RenderHtmlPage(ESenderType::CdcStream, ev, ctx); + RenderHtmlPage(DataShard.TabletId, ev, ctx); } void PassAway() override { @@ -757,7 +760,8 @@ public: explicit TCdcChangeSenderMain(const TDataShardId& dataShard, const TPathId& streamPathId) : TActorBootstrapped() - , TBaseChangeSender(this, this, dataShard, streamPathId) + , TBaseChangeSender(this, this, dataShard.ActorId, streamPathId) + , DataShard(dataShard) , TopicVersion(0) { } @@ -772,8 +776,8 @@ public: hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); - hFunc(TEvChangeExchangePrivate::TEvReady, Handle); - hFunc(TEvChangeExchangePrivate::TEvGone, Handle); + hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvReady, Handle); + hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvGone, Handle); HFunc(NMon::TEvRemoteHttpInfo, Handle); sFunc(TEvents::TEvPoison, PassAway); } @@ -789,6 +793,7 @@ public: } private: + const TDataShardId DataShard; mutable TMaybe<TString> LogPrefix; TUserTable::TCdcStream Stream; diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index cefaa714b0f..5cc688f117c 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -26,8 +26,6 @@ SRCS( change_sender.cpp change_sender_async_index.cpp change_sender_cdc_stream.cpp - change_sender_common_ops.cpp - change_sender_monitoring.cpp check_commit_writes_tx_unit.cpp check_data_tx_unit.cpp check_distributed_erase_tx_unit.cpp |
