summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <[email protected]>2024-01-22 21:00:03 +0300
committerGitHub <[email protected]>2024-01-22 21:00:03 +0300
commitd856b1bf8ed685b9b388a9ed67326d0769dacb1b (patch)
tree87122d2b0fc3be00e449d26a31b3e1f3f1fd569a
parent3a256cad7e0aa90a259a0083fb743b9ba30aa5b9 (diff)
Move TBaseChangeSender to ydb/core/change_exchange KIKIMR-20673 (#1207)
-rw-r--r--ydb/core/change_exchange/change_sender_common_ops.cpp (renamed from ydb/core/tx/datashard/change_sender_common_ops.cpp)26
-rw-r--r--ydb/core/change_exchange/change_sender_common_ops.h (renamed from ydb/core/tx/datashard/change_sender_common_ops.h)39
-rw-r--r--ydb/core/change_exchange/change_sender_monitoring.cpp (renamed from ydb/core/tx/datashard/change_sender_monitoring.cpp)2
-rw-r--r--ydb/core/change_exchange/change_sender_monitoring.h (renamed from ydb/core/tx/datashard/change_sender_monitoring.h)2
-rw-r--r--ydb/core/change_exchange/ya.make5
-rw-r--r--ydb/core/tx/datashard/change_exchange_split.cpp1
-rw-r--r--ydb/core/tx/datashard/change_sender.cpp4
-rw-r--r--ydb/core/tx/datashard/change_sender_async_index.cpp30
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp29
-rw-r--r--ydb/core/tx/datashard/ya.make2
10 files changed, 75 insertions, 65 deletions
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/change_exchange/change_sender_common_ops.cpp
index 1c57d3c17a7..c3887dfec62 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.cpp
+++ b/ydb/core/change_exchange/change_sender_common_ops.cpp
@@ -1,13 +1,15 @@
#include "change_sender_common_ops.h"
#include "change_sender_monitoring.h"
+#include <ydb/library/yverify_stream/yverify_stream.h>
+
#include <library/cpp/monlib/service/pages/mon_page.h>
#include <library/cpp/monlib/service/pages/templates.h>
#include <util/generic/algorithm.h>
#include <util/generic/size_literals.h>
-namespace NKikimr::NDataShard {
+namespace NKikimr::NChangeExchange {
void TBaseChangeSender::LazyCreateSender(THashMap<ui64, TSender>& senders, ui64 partitionId) {
auto res = senders.emplace(partitionId, TSender{});
@@ -82,7 +84,7 @@ void TBaseChangeSender::KillSenders() {
}
}
-void TBaseChangeSender::EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) {
+void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) {
for (auto& record : records) {
Y_VERIFY_S(PathId == record.PathId, "Unexpected record's path id"
<< ": expected# " << PathId
@@ -117,11 +119,11 @@ bool TBaseChangeSender::RequestRecords() {
return false;
}
- ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRequestRecords(std::move(records)));
+ ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRequestRecords(std::move(records)));
return true;
}
-void TBaseChangeSender::ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) {
+void TBaseChangeSender::ProcessRecords(TVector<IChangeRecord::TPtr>&& records) {
for (auto& record : records) {
auto it = PendingBody.find(record->GetOrder());
if (it == PendingBody.end()) {
@@ -290,7 +292,7 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) {
}
Y_ABORT_UNLESS(sender.ActorId);
- ActorOps->Send(sender.ActorId, new NChangeExchange::TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {})));
+ ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {})));
}
void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) {
@@ -306,7 +308,7 @@ void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) {
}
}
-TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(NChangeExchange::IChangeRecord::TPtr record) {
+TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(IChangeRecord::TPtr record) {
Y_ABORT_UNLESS(record->IsBroadcast());
auto it = Broadcasting.find(record->GetOrder());
@@ -430,17 +432,17 @@ void TBaseChangeSender::RemoveRecords() {
}
TBaseChangeSender::TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver,
- const TDataShardId& dataShard, const TPathId& pathId)
+ const TActorId& changeServer, const TPathId& pathId)
: ActorOps(actorOps)
, Resolver(resolver)
- , DataShard(dataShard)
+ , ChangeServer(changeServer)
, PathId(pathId)
, MemLimit(192_KB)
, MemUsage(0)
{
}
-void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon::TEvRemoteHttpInfo::TPtr& ev,
+void TBaseChangeSender::RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev,
const TActorContext& ctx)
{
const auto& cgi = ev->Get()->Cgi();
@@ -468,7 +470,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon
TStringStream html;
HTML(html) {
- Header(html, TStringBuilder() << type << " change sender", DataShard.TabletId);
+ Header(html, "Change sender", tabletId);
SimplePanel(html, "Info", [this](IOutputStream& html) {
HTML(html) {
@@ -479,7 +481,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon
}
});
- SimplePanel(html, "Partition senders", [this](IOutputStream& html) {
+ SimplePanel(html, "Partition senders", [this, tabletId](IOutputStream& html) {
HTML(html) {
TABLE_CLASS("table table-hover") {
TABLEHEAD() {
@@ -503,7 +505,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon
TABLED() { html << sender.Pending.size(); }
TABLED() { html << sender.Prepared.size(); }
TABLED() { html << sender.Broadcasting.size(); }
- TABLED() { ActorLink(html, DataShard.TabletId, PathId, partitionId); }
+ TABLED() { ActorLink(html, tabletId, PathId, partitionId); }
}
}
}
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h
index 6b418ebf7d8..fe516176a80 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.h
+++ b/ydb/core/change_exchange/change_sender_common_ops.h
@@ -1,10 +1,7 @@
#pragma once
#include "change_exchange.h"
-#include "change_exchange_helpers.h"
-#include <ydb/core/base/appdata.h>
-#include <ydb/core/change_exchange/change_exchange.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/mon.h>
@@ -14,8 +11,7 @@
#include <util/generic/set.h>
#include <util/string/builder.h>
-namespace NKikimr {
-namespace NDataShard {
+namespace NKikimr::NChangeExchange {
struct TEvChangeExchangePrivate {
enum EEv {
@@ -61,8 +57,8 @@ public:
virtual IActor* CreateSender(ui64 partitionId) = 0;
virtual void RemoveRecords() = 0;
- virtual void EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) = 0;
- virtual void ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) = 0;
+ virtual void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) = 0;
+ virtual void ProcessRecords(TVector<IChangeRecord::TPtr>&& records) = 0;
virtual void ForgetRecords(TVector<ui64>&& records) = 0;
virtual void OnReady(ui64 partitionId) = 0;
virtual void OnGone(ui64 partitionId) = 0;
@@ -75,18 +71,18 @@ public:
virtual void Resolve() = 0;
virtual bool IsResolving() const = 0;
virtual bool IsResolved() const = 0;
- virtual ui64 GetPartitionId(NChangeExchange::IChangeRecord::TPtr record) const = 0;
+ virtual ui64 GetPartitionId(IChangeRecord::TPtr record) const = 0;
};
class TBaseChangeSender: public IChangeSender {
- using TEnqueuedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo;
- using TRequestedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo;
+ using TEnqueuedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
+ using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
struct TSender {
TActorId ActorId;
bool Ready = false;
TVector<TEnqueuedRecord> Pending;
- TVector<NChangeExchange::IChangeRecord::TPtr> Prepared;
+ TVector<IChangeRecord::TPtr> Prepared;
TVector<ui64> Broadcasting;
};
@@ -108,7 +104,7 @@ class TBaseChangeSender: public IChangeSender {
void SendPreparedRecords(ui64 partitionId);
void ReEnqueueRecords(const TSender& sender);
- TBroadcast& EnsureBroadcast(NChangeExchange::IChangeRecord::TPtr record);
+ TBroadcast& EnsureBroadcast(IChangeRecord::TPtr record);
bool AddBroadcastPartition(ui64 order, ui64 partitionId);
bool RemoveBroadcastPartition(ui64 order, ui64 partitionId);
bool CompleteBroadcastPartition(ui64 order, ui64 partitionId);
@@ -124,35 +120,35 @@ protected:
remove.push_back(record.Order);
}
- ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
+ ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
}
template <>
void RemoveRecords(TVector<ui64>&& records) {
- ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(records)));
+ ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(records)));
}
void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) override;
void KillSenders() override;
void RemoveRecords() override;
- void EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) override;
- void ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) override;
+ void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) override;
+ void ProcessRecords(TVector<IChangeRecord::TPtr>&& records) override;
void ForgetRecords(TVector<ui64>&& records) override;
void OnReady(ui64 partitionId) override;
void OnGone(ui64 partitionId) override;
explicit TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver,
- const TDataShardId& dataShard, const TPathId& pathId);
+ const TActorId& changeServer, const TPathId& pathId);
- void RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx);
+ void RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx);
private:
IActorOps* const ActorOps;
IChangeSenderResolver* const Resolver;
protected:
- const TDataShardId DataShard;
+ const TActorId ChangeServer;
const TPathId PathId;
private:
@@ -162,12 +158,11 @@ private:
THashMap<ui64, TSender> Senders; // ui64 is partition id
TSet<TEnqueuedRecord> Enqueued;
TSet<TRequestedRecord> PendingBody;
- TMap<ui64, NChangeExchange::IChangeRecord::TPtr> PendingSent; // ui64 is order
+ TMap<ui64, IChangeRecord::TPtr> PendingSent; // ui64 is order
THashMap<ui64, TBroadcast> Broadcasting; // ui64 is order
TVector<ui64> GonePartitions;
}; // TBaseChangeSender
-} // NDataShard
-} // NKikimr
+}
diff --git a/ydb/core/tx/datashard/change_sender_monitoring.cpp b/ydb/core/change_exchange/change_sender_monitoring.cpp
index 9cc4ba1b6bc..9671bb787b5 100644
--- a/ydb/core/tx/datashard/change_sender_monitoring.cpp
+++ b/ydb/core/change_exchange/change_sender_monitoring.cpp
@@ -5,7 +5,7 @@
#include <util/string/printf.h>
#include <util/string/split.h>
-namespace NKikimr::NDataShard {
+namespace NKikimr::NChangeExchange {
void Panel(IOutputStream& str, std::function<void(IOutputStream&)> title, std::function<void(IOutputStream&)> body) {
HTML(str) {
diff --git a/ydb/core/tx/datashard/change_sender_monitoring.h b/ydb/core/change_exchange/change_sender_monitoring.h
index fe9bf40b9a9..83205bd0e70 100644
--- a/ydb/core/tx/datashard/change_sender_monitoring.h
+++ b/ydb/core/change_exchange/change_sender_monitoring.h
@@ -6,7 +6,7 @@
#include <util/generic/maybe.h>
-namespace NKikimr::NDataShard {
+namespace NKikimr::NChangeExchange {
template <typename T>
static void Link(IOutputStream& str, const TStringBuf path, const T& title) {
diff --git a/ydb/core/change_exchange/ya.make b/ydb/core/change_exchange/ya.make
index a00055ba545..6e82179d832 100644
--- a/ydb/core/change_exchange/ya.make
+++ b/ydb/core/change_exchange/ya.make
@@ -3,6 +3,8 @@ LIBRARY()
SRCS(
change_exchange.cpp
change_record.cpp
+ change_sender_common_ops.cpp
+ change_sender_monitoring.cpp
)
GENERATE_ENUM_SERIALIZATION(change_record.h)
@@ -10,6 +12,9 @@ GENERATE_ENUM_SERIALIZATION(change_record.h)
PEERDIR(
ydb/core/base
ydb/core/scheme
+ ydb/library/actors/core
+ ydb/library/yverify_stream
+ library/cpp/monlib/service/pages
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/tx/datashard/change_exchange_split.cpp b/ydb/core/tx/datashard/change_exchange_split.cpp
index 0bcd44390ed..93e6d68a3b8 100644
--- a/ydb/core/tx/datashard/change_exchange_split.cpp
+++ b/ydb/core/tx/datashard/change_exchange_split.cpp
@@ -1,6 +1,5 @@
#include "change_exchange.h"
#include "change_exchange_helpers.h"
-#include "change_sender_common_ops.h"
#include "datashard_impl.h"
#include <ydb/core/base/tablet_pipe.h>
diff --git a/ydb/core/tx/datashard/change_sender.cpp b/ydb/core/tx/datashard/change_sender.cpp
index 1f617d252e4..6150c00d0be 100644
--- a/ydb/core/tx/datashard/change_sender.cpp
+++ b/ydb/core/tx/datashard/change_sender.cpp
@@ -1,9 +1,9 @@
#include "change_exchange.h"
#include "change_exchange_impl.h"
-#include "change_sender_monitoring.h"
#include "datashard_impl.h"
#include <ydb/core/change_exchange/change_exchange.h>
+#include <ydb/core/change_exchange/change_sender_monitoring.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/log.h>
@@ -165,6 +165,8 @@ class TChangeSender: public TActor<TChangeSender> {
}
void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) {
+ using namespace NChangeExchange;
+
const auto& cgi = ev->Get()->Cgi();
if (const auto& str = cgi.Get("pathId")) {
if (const auto& pathId = ParsePathId(str)) {
diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp
index d8356013456..98a225f5543 100644
--- a/ydb/core/tx/datashard/change_sender_async_index.cpp
+++ b/ydb/core/tx/datashard/change_sender_async_index.cpp
@@ -1,16 +1,16 @@
#include "change_exchange.h"
#include "change_exchange_impl.h"
#include "change_record.h"
-#include "change_sender_common_ops.h"
-#include "change_sender_monitoring.h"
#include "datashard_impl.h"
#include <ydb/core/base/tablet_pipecache.h>
-#include <ydb/library/services/services.pb.h>
+#include <ydb/core/change_exchange/change_sender_common_ops.h>
+#include <ydb/core/change_exchange/change_sender_monitoring.h>
#include <ydb/core/tablet_flat/flat_row_eggs.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/library/services/services.pb.h>
#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
@@ -107,7 +107,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS
}
void Ready() {
- Send(Parent, new TEvChangeExchangePrivate::TEvReady(ShardId));
+ Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvReady(ShardId));
Become(&TThis::StateWaitingRecords);
}
@@ -240,6 +240,8 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS
}
void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev) {
+ using namespace NChangeExchange;
+
TStringStream html;
HTML(html) {
@@ -261,7 +263,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS
}
void Leave() {
- Send(Parent, new TEvChangeExchangePrivate::TEvGone(ShardId));
+ Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvGone(ShardId));
PassAway();
}
@@ -328,8 +330,8 @@ private:
class TAsyncIndexChangeSenderMain
: public TActorBootstrapped<TAsyncIndexChangeSenderMain>
- , public TBaseChangeSender
- , public IChangeSenderResolver
+ , public NChangeExchange::TBaseChangeSender
+ , public NChangeExchange::IChangeSenderResolver
, private NSchemeCache::TSchemeCacheHelpers
{
TStringBuf GetLogPrefix() const {
@@ -753,12 +755,12 @@ class TAsyncIndexChangeSenderMain
ForgetRecords(std::move(ev->Get()->Records));
}
- void Handle(TEvChangeExchangePrivate::TEvReady::TPtr& ev) {
+ void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvReady::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
OnReady(ev->Get()->PartitionId);
}
- void Handle(TEvChangeExchangePrivate::TEvGone::TPtr& ev) {
+ void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvGone::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
OnGone(ev->Get()->PartitionId);
}
@@ -777,7 +779,7 @@ class TAsyncIndexChangeSenderMain
}
void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) {
- RenderHtmlPage(ESenderType::AsyncIndex, ev, ctx);
+ RenderHtmlPage(DataShard.TabletId, ev, ctx);
}
void PassAway() override {
@@ -792,7 +794,8 @@ public:
explicit TAsyncIndexChangeSenderMain(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId)
: TActorBootstrapped()
- , TBaseChangeSender(this, this, dataShard, indexPathId)
+ , TBaseChangeSender(this, this, dataShard.ActorId, indexPathId)
+ , DataShard(dataShard)
, UserTableId(userTableId)
, IndexTableVersion(0)
{
@@ -808,8 +811,8 @@ public:
hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle);
hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle);
hFunc(TEvChangeExchange::TEvRemoveSender, Handle);
- hFunc(TEvChangeExchangePrivate::TEvReady, Handle);
- hFunc(TEvChangeExchangePrivate::TEvGone, Handle);
+ hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvReady, Handle);
+ hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvGone, Handle);
HFunc(NMon::TEvRemoteHttpInfo, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
@@ -825,6 +828,7 @@ public:
}
private:
+ const TDataShardId DataShard;
const TTableId UserTableId;
mutable TMaybe<TString> LogPrefix;
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 60e107569e6..111f1c74c69 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -2,10 +2,10 @@
#include "change_exchange_impl.h"
#include "change_record.h"
#include "change_record_cdc_serializer.h"
-#include "change_sender_common_ops.h"
-#include "change_sender_monitoring.h"
#include "datashard_user_table.h"
+#include <ydb/core/change_exchange/change_sender_common_ops.h>
+#include <ydb/core/change_exchange/change_sender_monitoring.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
@@ -16,6 +16,7 @@
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/log.h>
+
#include <library/cpp/json/json_writer.h>
namespace NKikimr::NDataShard {
@@ -75,7 +76,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
void Ready() {
Pending.clear();
- Send(Parent, new TEvChangeExchangePrivate::TEvReady(PartitionId));
+ Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvReady(PartitionId));
Become(&TThis::StateWaitingRecords);
}
@@ -179,6 +180,8 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
}
void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev) {
+ using namespace NChangeExchange;
+
TStringStream html;
HTML(html) {
@@ -219,7 +222,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
}
void Leave() {
- Send(Parent, new TEvChangeExchangePrivate::TEvGone(PartitionId));
+ Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvGone(PartitionId));
PassAway();
}
@@ -291,8 +294,8 @@ private:
class TCdcChangeSenderMain
: public TActorBootstrapped<TCdcChangeSenderMain>
- , public TBaseChangeSender
- , public IChangeSenderResolver
+ , public NChangeExchange::TBaseChangeSender
+ , public NChangeExchange::IChangeSenderResolver
, private NSchemeCache::TSchemeCacheHelpers
{
struct TPQPartitionInfo {
@@ -718,12 +721,12 @@ class TCdcChangeSenderMain
ForgetRecords(std::move(ev->Get()->Records));
}
- void Handle(TEvChangeExchangePrivate::TEvReady::TPtr& ev) {
+ void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvReady::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
OnReady(ev->Get()->PartitionId);
}
- void Handle(TEvChangeExchangePrivate::TEvGone::TPtr& ev) {
+ void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvGone::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
OnGone(ev->Get()->PartitionId);
}
@@ -742,7 +745,7 @@ class TCdcChangeSenderMain
}
void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) {
- RenderHtmlPage(ESenderType::CdcStream, ev, ctx);
+ RenderHtmlPage(DataShard.TabletId, ev, ctx);
}
void PassAway() override {
@@ -757,7 +760,8 @@ public:
explicit TCdcChangeSenderMain(const TDataShardId& dataShard, const TPathId& streamPathId)
: TActorBootstrapped()
- , TBaseChangeSender(this, this, dataShard, streamPathId)
+ , TBaseChangeSender(this, this, dataShard.ActorId, streamPathId)
+ , DataShard(dataShard)
, TopicVersion(0)
{
}
@@ -772,8 +776,8 @@ public:
hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle);
hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle);
hFunc(TEvChangeExchange::TEvRemoveSender, Handle);
- hFunc(TEvChangeExchangePrivate::TEvReady, Handle);
- hFunc(TEvChangeExchangePrivate::TEvGone, Handle);
+ hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvReady, Handle);
+ hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvGone, Handle);
HFunc(NMon::TEvRemoteHttpInfo, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
@@ -789,6 +793,7 @@ public:
}
private:
+ const TDataShardId DataShard;
mutable TMaybe<TString> LogPrefix;
TUserTable::TCdcStream Stream;
diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make
index cefaa714b0f..5cc688f117c 100644
--- a/ydb/core/tx/datashard/ya.make
+++ b/ydb/core/tx/datashard/ya.make
@@ -26,8 +26,6 @@ SRCS(
change_sender.cpp
change_sender_async_index.cpp
change_sender_cdc_stream.cpp
- change_sender_common_ops.cpp
- change_sender_monitoring.cpp
check_commit_writes_tx_unit.cpp
check_data_tx_unit.cpp
check_distributed_erase_tx_unit.cpp