aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-01-19 18:18:10 +0300
committerGitHub <noreply@github.com>2024-01-19 18:18:10 +0300
commite7c25d2a09edbdcebda1e8d5248249d3a66dbf0e (patch)
tree912cd357f29d620ad22e1f4326c4cbb62af21f25
parentd6ad10e2a2bffcbd01fc40ab70ba47335feca9fe (diff)
downloadydb-e7c25d2a09edbdcebda1e8d5248249d3a66dbf0e.tar.gz
(refactoring) Split TEvChangeExchange into two parts: common & DS KIKIMR-20673 (#1152)
-rw-r--r--ydb/core/base/events.h3
-rw-r--r--ydb/core/change_exchange/change_exchange.cpp125
-rw-r--r--ydb/core/change_exchange/change_exchange.h101
-rw-r--r--ydb/core/change_exchange/ya.make6
-rw-r--r--ydb/core/tx/datashard/change_exchange.cpp124
-rw-r--r--ydb/core/tx/datashard/change_exchange.h91
-rw-r--r--ydb/core/tx/datashard/change_sender.cpp14
-rw-r--r--ydb/core/tx/datashard/change_sender_async_index.cpp20
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp20
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.cpp6
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.h14
-rw-r--r--ydb/core/tx/datashard/datashard.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_change_sending.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h11
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp52
-rw-r--r--ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp4
16 files changed, 317 insertions, 286 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index f9bf6ad31f..97f89dc978 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -135,7 +135,7 @@ struct TKikimrEvents : TEvents {
ES_HEALTH_CHECK,
ES_DQ = NYql::NDq::TDqEvents::ES_DQ_COMPUTE, // 4212
ES_YQ, // 4213
- ES_CHANGE_EXCHANGE,
+ ES_CHANGE_EXCHANGE_DATASHARD,
ES_DATABASE_SERVICE, //4215
ES_SEQUENCESHARD, // 4216
ES_SEQUENCEPROXY, // 4217
@@ -172,6 +172,7 @@ struct TKikimrEvents : TEvents {
ES_PQ_PARTITION_CHOOSER,
ES_GRAPH,
ES_REPLICATION_SERVICE,
+ ES_CHANGE_EXCHANGE,
};
};
diff --git a/ydb/core/change_exchange/change_exchange.cpp b/ydb/core/change_exchange/change_exchange.cpp
new file mode 100644
index 0000000000..92df9b3cea
--- /dev/null
+++ b/ydb/core/change_exchange/change_exchange.cpp
@@ -0,0 +1,125 @@
+#include "change_exchange.h"
+
+#include <util/string/builder.h>
+#include <util/string/join.h>
+
+namespace NKikimr::NChangeExchange {
+
+/// TEvEnqueueRecords
+TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(const TVector<TRecordInfo>& records)
+ : Records(records)
+{
+}
+
+TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(TVector<TRecordInfo>&& records)
+ : Records(std::move(records))
+{
+}
+
+TString TEvChangeExchange::TEvEnqueueRecords::ToString() const {
+ return TStringBuilder() << ToStringHeader() << " {"
+ << " Records [" << JoinSeq(",", Records) << "]"
+ << " }";
+}
+
+TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize)
+ : Order(order)
+ , PathId(pathId)
+ , BodySize(bodySize)
+{
+}
+
+void TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::Out(IOutputStream& out) const {
+ out << "{"
+ << " Order: " << Order
+ << " PathId: " << PathId
+ << " BodySize: " << BodySize
+ << " }";
+}
+
+/// TEvRequestRecords
+TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(const TVector<TRecordInfo>& records)
+ : Records(records)
+{
+}
+
+TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(TVector<TRecordInfo>&& records)
+ : Records(std::move(records))
+{
+}
+
+TString TEvChangeExchange::TEvRequestRecords::ToString() const {
+ return TStringBuilder() << ToStringHeader() << " {"
+ << " Records [" << JoinSeq(",", Records) << "]"
+ << " }";
+}
+
+TEvChangeExchange::TEvRequestRecords::TRecordInfo::TRecordInfo(ui64 order, ui64 bodySize)
+ : Order(order)
+ , BodySize(bodySize)
+{
+}
+
+bool TEvChangeExchange::TEvRequestRecords::TRecordInfo::operator<(const TRecordInfo& rhs) const {
+ return Order < rhs.Order;
+}
+
+void TEvChangeExchange::TEvRequestRecords::TRecordInfo::Out(IOutputStream& out) const {
+ out << "{"
+ << " Order: " << Order
+ << " BodySize: " << BodySize
+ << " }";
+}
+
+/// TEvRemoveRecords
+TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(const TVector<ui64>& records)
+ : Records(records)
+{
+}
+
+TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(TVector<ui64>&& records)
+ : Records(std::move(records))
+{
+}
+
+TString TEvChangeExchange::TEvRemoveRecords::ToString() const {
+ return TStringBuilder() << ToStringHeader() << " {"
+ << " Records [" << JoinSeq(",", Records) << "]"
+ << " }";
+}
+
+/// TEvRecords
+TEvChangeExchange::TEvRecords::TEvRecords(const TVector<IChangeRecord::TPtr>& records)
+ : Records(records)
+{
+}
+
+TEvChangeExchange::TEvRecords::TEvRecords(TVector<IChangeRecord::TPtr>&& records)
+ : Records(std::move(records))
+{
+}
+
+TString TEvChangeExchange::TEvRecords::ToString() const {
+ return TStringBuilder() << ToStringHeader() << " {"
+ << " Records [" << JoinSeq(",", Records) << "]"
+ << " }";
+}
+
+/// TEvForgetRecords
+TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(const TVector<ui64>& records)
+ : Records(records)
+{
+}
+
+TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(TVector<ui64>&& records)
+ : Records(std::move(records))
+{
+}
+
+TString TEvChangeExchange::TEvForgetRecords::ToString() const {
+ return TStringBuilder() << ToStringHeader() << " {"
+ << " Records [" << JoinSeq(",", Records) << "]"
+ << " }";
+}
+
+}
diff --git a/ydb/core/change_exchange/change_exchange.h b/ydb/core/change_exchange/change_exchange.h
new file mode 100644
index 0000000000..9cd000b4c2
--- /dev/null
+++ b/ydb/core/change_exchange/change_exchange.h
@@ -0,0 +1,101 @@
+#pragma once
+
+#include "change_record.h"
+
+#include <ydb/core/base/defs.h>
+#include <ydb/core/base/events.h>
+#include <ydb/core/scheme/scheme_pathid.h>
+
+#include <util/generic/vector.h>
+
+namespace NKikimr::NChangeExchange {
+
+struct TEvChangeExchange {
+ enum EEv {
+ // Enqueue for sending
+ EvEnqueueRecords = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE),
+ // Request change record(s) by id
+ EvRequestRecords,
+ // Change record(s)
+ EvRecords,
+ // Remove change record(s) from local database
+ EvRemoveRecords,
+ // Already removed records that the sender should forget about
+ EvForgetRecods,
+
+ EvEnd,
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE));
+
+ struct TEvEnqueueRecords: public TEventLocal<TEvEnqueueRecords, EvEnqueueRecords> {
+ struct TRecordInfo {
+ ui64 Order;
+ TPathId PathId;
+ ui64 BodySize;
+
+ TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize);
+
+ void Out(IOutputStream& out) const;
+ };
+
+ TVector<TRecordInfo> Records;
+
+ explicit TEvEnqueueRecords(const TVector<TRecordInfo>& records);
+ explicit TEvEnqueueRecords(TVector<TRecordInfo>&& records);
+ TString ToString() const override;
+ };
+
+ struct TEvRequestRecords: public TEventLocal<TEvRequestRecords, EvRequestRecords> {
+ struct TRecordInfo {
+ ui64 Order;
+ ui64 BodySize;
+
+ TRecordInfo(ui64 order, ui64 bodySize = 0);
+
+ bool operator<(const TRecordInfo& rhs) const;
+ void Out(IOutputStream& out) const;
+ };
+
+ TVector<TRecordInfo> Records;
+
+ explicit TEvRequestRecords(const TVector<TRecordInfo>& records);
+ explicit TEvRequestRecords(TVector<TRecordInfo>&& records);
+ TString ToString() const override;
+ };
+
+ struct TEvRemoveRecords: public TEventLocal<TEvRemoveRecords, EvRemoveRecords> {
+ TVector<ui64> Records;
+
+ explicit TEvRemoveRecords(const TVector<ui64>& records);
+ explicit TEvRemoveRecords(TVector<ui64>&& records);
+ TString ToString() const override;
+ };
+
+ struct TEvRecords: public TEventLocal<TEvRecords, EvRecords> {
+ TVector<IChangeRecord::TPtr> Records;
+
+ explicit TEvRecords(const TVector<IChangeRecord::TPtr>& records);
+ explicit TEvRecords(TVector<IChangeRecord::TPtr>&& records);
+ TString ToString() const override;
+ };
+
+ struct TEvForgetRecords: public TEventLocal<TEvForgetRecords, EvForgetRecods> {
+ TVector<ui64> Records;
+
+ explicit TEvForgetRecords(const TVector<ui64>& records);
+ explicit TEvForgetRecords(TVector<ui64>&& records);
+ TString ToString() const override;
+ };
+
+}; // TEvChangeExchange
+
+}
+
+Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo, o, x) {
+ return x.Out(o);
+}
+
+Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo, o, x) {
+ return x.Out(o);
+}
diff --git a/ydb/core/change_exchange/ya.make b/ydb/core/change_exchange/ya.make
index 1ba88203fe..a00055ba54 100644
--- a/ydb/core/change_exchange/ya.make
+++ b/ydb/core/change_exchange/ya.make
@@ -1,11 +1,17 @@
LIBRARY()
SRCS(
+ change_exchange.cpp
change_record.cpp
)
GENERATE_ENUM_SERIALIZATION(change_record.h)
+PEERDIR(
+ ydb/core/base
+ ydb/core/scheme
+)
+
YQL_LAST_ABI_VERSION()
END()
diff --git a/ydb/core/tx/datashard/change_exchange.cpp b/ydb/core/tx/datashard/change_exchange.cpp
index 99c7f6cec7..752667384f 100644
--- a/ydb/core/tx/datashard/change_exchange.cpp
+++ b/ydb/core/tx/datashard/change_exchange.cpp
@@ -1,127 +1,8 @@
#include "change_exchange.h"
#include <util/string/builder.h>
-#include <util/string/join.h>
-namespace NKikimr {
-namespace NDataShard {
-
-/// TEvEnqueueRecords
-TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(const TVector<TRecordInfo>& records)
- : Records(records)
-{
-}
-
-TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(TVector<TRecordInfo>&& records)
- : Records(std::move(records))
-{
-}
-
-TString TEvChangeExchange::TEvEnqueueRecords::ToString() const {
- return TStringBuilder() << ToStringHeader() << " {"
- << " Records [" << JoinSeq(",", Records) << "]"
- << " }";
-}
-
-TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize)
- : Order(order)
- , PathId(pathId)
- , BodySize(bodySize)
-{
-}
-
-void TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::Out(IOutputStream& out) const {
- out << "{"
- << " Order: " << Order
- << " PathId: " << PathId
- << " BodySize: " << BodySize
- << " }";
-}
-
-/// TEvRequestRecords
-TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(const TVector<TRecordInfo>& records)
- : Records(records)
-{
-}
-
-TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(TVector<TRecordInfo>&& records)
- : Records(std::move(records))
-{
-}
-
-TString TEvChangeExchange::TEvRequestRecords::ToString() const {
- return TStringBuilder() << ToStringHeader() << " {"
- << " Records [" << JoinSeq(",", Records) << "]"
- << " }";
-}
-
-TEvChangeExchange::TEvRequestRecords::TRecordInfo::TRecordInfo(ui64 order, ui64 bodySize)
- : Order(order)
- , BodySize(bodySize)
-{
-}
-
-bool TEvChangeExchange::TEvRequestRecords::TRecordInfo::operator<(const TRecordInfo& rhs) const {
- return Order < rhs.Order;
-}
-
-void TEvChangeExchange::TEvRequestRecords::TRecordInfo::Out(IOutputStream& out) const {
- out << "{"
- << " Order: " << Order
- << " BodySize: " << BodySize
- << " }";
-}
-
-/// TEvRemoveRecords
-TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(const TVector<ui64>& records)
- : Records(records)
-{
-}
-
-TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(TVector<ui64>&& records)
- : Records(std::move(records))
-{
-}
-
-TString TEvChangeExchange::TEvRemoveRecords::ToString() const {
- return TStringBuilder() << ToStringHeader() << " {"
- << " Records [" << JoinSeq(",", Records) << "]"
- << " }";
-}
-
-/// TEvRecords
-TEvChangeExchange::TEvRecords::TEvRecords(const TVector<NChangeExchange::IChangeRecord::TPtr>& records)
- : Records(records)
-{
-}
-
-TEvChangeExchange::TEvRecords::TEvRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records)
- : Records(std::move(records))
-{
-}
-
-TString TEvChangeExchange::TEvRecords::ToString() const {
- return TStringBuilder() << ToStringHeader() << " {"
- << " Records [" << JoinSeq(",", Records) << "]"
- << " }";
-}
-
-/// TEvForgetRecords
-TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(const TVector<ui64>& records)
- : Records(records)
-{
-}
-
-TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(TVector<ui64>&& records)
- : Records(std::move(records))
-{
-}
-
-TString TEvChangeExchange::TEvForgetRecords::ToString() const {
- return TStringBuilder() << ToStringHeader() << " {"
- << " Records [" << JoinSeq(",", Records) << "]"
- << " }";
-}
+namespace NKikimr::NDataShard {
/// TEvAddSender
TEvChangeExchange::TEvAddSender::TEvAddSender(const TTableId& userTableId, TEvChangeExchange::ESenderType type, const TPathId& pathId)
@@ -151,5 +32,4 @@ TString TEvChangeExchange::TEvRemoveSender::ToString() const {
<< " }";
}
-} // NDataShard
-} // NKikimr
+}
diff --git a/ydb/core/tx/datashard/change_exchange.h b/ydb/core/tx/datashard/change_exchange.h
index 226a5125f5..548fe14e31 100644
--- a/ydb/core/tx/datashard/change_exchange.h
+++ b/ydb/core/tx/datashard/change_exchange.h
@@ -2,14 +2,13 @@
#include "defs.h"
-#include <ydb/core/change_exchange/change_record.h>
#include <ydb/core/protos/change_exchange.pb.h>
+#include <ydb/core/scheme/scheme_pathid.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <util/generic/vector.h>
-namespace NKikimr {
-namespace NDataShard {
+namespace NKikimr::NDataShard {
class TDataShard;
@@ -17,7 +16,7 @@ struct TEvChangeExchange {
enum EEv {
/// Network exchange protocol
// Handshake between sender & receiver
- EvHandshake = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE),
+ EvHandshake = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE_DATASHARD),
// Apply change record(s) on receiver
EvApplyRecords,
// Handshake & application status
@@ -27,30 +26,17 @@ struct TEvChangeExchange {
EvActivateSenderAck,
/// Local exchange (mostly using change's id)
- // Enqueue for sending
- EvEnqueueRecords,
- // Request change record(s) by id
- EvRequestRecords,
- // Change record(s)
- EvRecords,
- // Remove change record(s) from local database
- EvRemoveRecords,
-
// Add new change sender
EvAddSender,
// Remove existing change sender
EvRemoveSender,
-
- // Already removed records that the sender should forget about
- EvForgetRecods,
-
// Split/merge
EvSplitAck,
EvEnd,
};
- static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE));
+ static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE_DATASHARD));
/// Network events
struct TEvHandshake: public TEventPB<TEvHandshake, NKikimrChangeExchange::TEvHandshake, EvHandshake> {};
@@ -60,66 +46,6 @@ struct TEvChangeExchange {
struct TEvActivateSenderAck: public TEventPB<TEvActivateSenderAck, NKikimrChangeExchange::TEvActivateSenderAck, EvActivateSenderAck> {};
/// Local events
- struct TEvEnqueueRecords: public TEventLocal<TEvEnqueueRecords, EvEnqueueRecords> {
- struct TRecordInfo {
- ui64 Order;
- TPathId PathId;
- ui64 BodySize;
-
- TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize);
-
- void Out(IOutputStream& out) const;
- };
-
- TVector<TRecordInfo> Records;
-
- explicit TEvEnqueueRecords(const TVector<TRecordInfo>& records);
- explicit TEvEnqueueRecords(TVector<TRecordInfo>&& records);
- TString ToString() const override;
- };
-
- struct TEvRequestRecords: public TEventLocal<TEvRequestRecords, EvRequestRecords> {
- struct TRecordInfo {
- ui64 Order;
- ui64 BodySize;
-
- TRecordInfo(ui64 order, ui64 bodySize = 0);
-
- bool operator<(const TRecordInfo& rhs) const;
- void Out(IOutputStream& out) const;
- };
-
- TVector<TRecordInfo> Records;
-
- explicit TEvRequestRecords(const TVector<TRecordInfo>& records);
- explicit TEvRequestRecords(TVector<TRecordInfo>&& records);
- TString ToString() const override;
- };
-
- struct TEvRemoveRecords: public TEventLocal<TEvRemoveRecords, EvRemoveRecords> {
- TVector<ui64> Records;
-
- explicit TEvRemoveRecords(const TVector<ui64>& records);
- explicit TEvRemoveRecords(TVector<ui64>&& records);
- TString ToString() const override;
- };
-
- struct TEvRecords: public TEventLocal<TEvRecords, EvRecords> {
- TVector<NChangeExchange::IChangeRecord::TPtr> Records;
-
- explicit TEvRecords(const TVector<NChangeExchange::IChangeRecord::TPtr>& records);
- explicit TEvRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records);
- TString ToString() const override;
- };
-
- struct TEvForgetRecords: public TEventLocal<TEvForgetRecords, EvForgetRecods> {
- TVector<ui64> Records;
-
- explicit TEvForgetRecords(const TVector<ui64>& records);
- explicit TEvForgetRecords(TVector<ui64>&& records);
- TString ToString() const override;
- };
-
enum class ESenderType {
AsyncIndex,
CdcStream,
@@ -149,13 +75,4 @@ struct TEvChangeExchange {
IActor* CreateChangeSender(const TDataShard* self);
IActor* CreateChangeExchangeSplit(const TDataShard* self, const TVector<ui64>& dstDataShards);
-} // NDataShard
-} // NKikimr
-
-Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo, o, x) {
- return x.Out(o);
-}
-
-Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TEvChangeExchange::TEvRequestRecords::TRecordInfo, o, x) {
- return x.Out(o);
}
diff --git a/ydb/core/tx/datashard/change_sender.cpp b/ydb/core/tx/datashard/change_sender.cpp
index 825ffe9080..1f617d252e 100644
--- a/ydb/core/tx/datashard/change_sender.cpp
+++ b/ydb/core/tx/datashard/change_sender.cpp
@@ -3,12 +3,12 @@
#include "change_sender_monitoring.h"
#include "datashard_impl.h"
-#include <ydb/library/services/services.pb.h>
-
+#include <ydb/core/change_exchange/change_exchange.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/actors/core/mon.h>
+#include <ydb/library/services/services.pb.h>
#include <library/cpp/monlib/service/pages/mon_page.h>
#include <library/cpp/monlib/service/pages/templates.h>
@@ -19,7 +19,7 @@ namespace NKikimr::NDataShard {
class TChangeSender: public TActor<TChangeSender> {
using ESenderType = TEvChangeExchange::ESenderType;
- using TEnqueuedRecord = TEvChangeExchange::TEvEnqueueRecords::TRecordInfo;
+ using TEnqueuedRecord = NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo;
struct TSender {
TTableId UserTableId;
@@ -67,7 +67,7 @@ class TChangeSender: public TActor<TChangeSender> {
sender.ActorId = RegisterChangeSender(pathId, sender.UserTableId, sender.Type);
}
- void Handle(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {
+ void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
auto& records = ev->Get()->Records;
@@ -92,11 +92,11 @@ class TChangeSender: public TActor<TChangeSender> {
}
for (auto& [to, records] : forward) {
- Send(to, new TEvChangeExchange::TEvEnqueueRecords(std::move(records)));
+ Send(to, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(std::move(records)));
}
if (remove) {
- Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
+ Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
}
}
@@ -289,7 +289,7 @@ public:
STFUNC(StateBase) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle);
+ hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, Handle);
hFunc(TEvChangeExchange::TEvAddSender, Handle);
hFunc(TEvChangeExchange::TEvRemoveSender, Handle);
HFunc(NMon::TEvRemoteHttpInfo, Handle);
diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp
index 9ba3b7c037..d835601345 100644
--- a/ydb/core/tx/datashard/change_sender_async_index.cpp
+++ b/ydb/core/tx/datashard/change_sender_async_index.cpp
@@ -115,13 +115,13 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS
STATEFN(StateWaitingRecords) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvChangeExchange::TEvRecords, Handle);
+ hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle);
default:
return StateBase(ev);
}
}
- void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) {
+ void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
auto records = MakeHolder<TEvChangeExchange::TEvApplyRecords>();
@@ -738,17 +738,17 @@ class TAsyncIndexChangeSenderMain
return new TAsyncIndexChangeSenderShard(SelfId(), DataShard, partitionId, IndexTablePathId, TagMap);
}
- void Handle(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {
+ void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
EnqueueRecords(std::move(ev->Get()->Records));
}
- void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) {
+ void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
ProcessRecords(std::move(ev->Get()->Records));
}
- void Handle(TEvChangeExchange::TEvForgetRecords::TPtr& ev) {
+ void Handle(NChangeExchange::TEvChangeExchange::TEvForgetRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
ForgetRecords(std::move(ev->Get()->Records));
}
@@ -771,7 +771,7 @@ class TAsyncIndexChangeSenderMain
PassAway();
}
- void AutoRemove(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {
+ void AutoRemove(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
RemoveRecords(std::move(ev->Get()->Records));
}
@@ -804,9 +804,9 @@ public:
STFUNC(StateBase) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle);
- hFunc(TEvChangeExchange::TEvRecords, Handle);
- hFunc(TEvChangeExchange::TEvForgetRecords, Handle);
+ hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, Handle);
+ hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle);
+ hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle);
hFunc(TEvChangeExchange::TEvRemoveSender, Handle);
hFunc(TEvChangeExchangePrivate::TEvReady, Handle);
hFunc(TEvChangeExchangePrivate::TEvGone, Handle);
@@ -817,7 +817,7 @@ public:
STFUNC(StatePendingRemove) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvChangeExchange::TEvEnqueueRecords, AutoRemove);
+ hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, AutoRemove);
hFunc(TEvChangeExchange::TEvRemoveSender, Handle);
HFunc(NMon::TEvRemoteHttpInfo, Handle);
sFunc(TEvents::TEvPoison, PassAway);
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 0d82b1eedf..60e107569e 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -83,14 +83,14 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
STATEFN(StateWaitingRecords) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvChangeExchange::TEvRecords, Handle);
+ hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle);
sFunc(TEvPartitionWriter::TEvWriteResponse, Lost);
default:
return StateBase(ev);
}
}
- void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) {
+ void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
NKikimrClient::TPersQueueRequest request;
@@ -703,17 +703,17 @@ class TCdcChangeSenderMain
return new TCdcChangeSenderPartition(SelfId(), DataShard, partitionId, shardId, Stream);
}
- void Handle(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {
+ void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
EnqueueRecords(std::move(ev->Get()->Records));
}
- void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) {
+ void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
ProcessRecords(std::move(ev->Get()->Records));
}
- void Handle(TEvChangeExchange::TEvForgetRecords::TPtr& ev) {
+ void Handle(NChangeExchange::TEvChangeExchange::TEvForgetRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
ForgetRecords(std::move(ev->Get()->Records));
}
@@ -736,7 +736,7 @@ class TCdcChangeSenderMain
PassAway();
}
- void AutoRemove(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {
+ void AutoRemove(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
RemoveRecords(std::move(ev->Get()->Records));
}
@@ -768,9 +768,9 @@ public:
STFUNC(StateBase) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle);
- hFunc(TEvChangeExchange::TEvRecords, Handle);
- hFunc(TEvChangeExchange::TEvForgetRecords, Handle);
+ hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, Handle);
+ hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle);
+ hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle);
hFunc(TEvChangeExchange::TEvRemoveSender, Handle);
hFunc(TEvChangeExchangePrivate::TEvReady, Handle);
hFunc(TEvChangeExchangePrivate::TEvGone, Handle);
@@ -781,7 +781,7 @@ public:
STFUNC(StatePendingRemove) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvChangeExchange::TEvEnqueueRecords, AutoRemove);
+ hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, AutoRemove);
hFunc(TEvChangeExchange::TEvRemoveSender, Handle);
HFunc(NMon::TEvRemoteHttpInfo, Handle);
sFunc(TEvents::TEvPoison, PassAway);
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp
index d46d467daa..1c57d3c17a 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.cpp
+++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp
@@ -82,7 +82,7 @@ void TBaseChangeSender::KillSenders() {
}
}
-void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) {
+void TBaseChangeSender::EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) {
for (auto& record : records) {
Y_VERIFY_S(PathId == record.PathId, "Unexpected record's path id"
<< ": expected# " << PathId
@@ -117,7 +117,7 @@ bool TBaseChangeSender::RequestRecords() {
return false;
}
- ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRequestRecords(std::move(records)));
+ ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRequestRecords(std::move(records)));
return true;
}
@@ -290,7 +290,7 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) {
}
Y_ABORT_UNLESS(sender.ActorId);
- ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {})));
+ ActorOps->Send(sender.ActorId, new NChangeExchange::TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {})));
}
void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) {
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h
index 45351ee5af..6b418ebf7d 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.h
+++ b/ydb/core/tx/datashard/change_sender_common_ops.h
@@ -4,7 +4,7 @@
#include "change_exchange_helpers.h"
#include <ydb/core/base/appdata.h>
-
+#include <ydb/core/change_exchange/change_exchange.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/mon.h>
@@ -61,7 +61,7 @@ public:
virtual IActor* CreateSender(ui64 partitionId) = 0;
virtual void RemoveRecords() = 0;
- virtual void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) = 0;
+ virtual void EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) = 0;
virtual void ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) = 0;
virtual void ForgetRecords(TVector<ui64>&& records) = 0;
virtual void OnReady(ui64 partitionId) = 0;
@@ -79,8 +79,8 @@ public:
};
class TBaseChangeSender: public IChangeSender {
- using TEnqueuedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
- using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
+ using TEnqueuedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo;
+ using TRequestedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo;
struct TSender {
TActorId ActorId;
@@ -124,19 +124,19 @@ protected:
remove.push_back(record.Order);
}
- ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
+ ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
}
template <>
void RemoveRecords(TVector<ui64>&& records) {
- ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(records)));
+ ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(records)));
}
void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) override;
void KillSenders() override;
void RemoveRecords() override;
- void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) override;
+ void EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) override;
void ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) override;
void ForgetRecords(TVector<ui64>&& records) override;
void OnReady(ui64 partitionId) override;
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index c251597f7c..dcc2cec7ab 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -939,7 +939,7 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
<< ", records: " << JoinSeq(", ", records));
const auto now = AppData()->TimeProvider->Now();
- TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size()));
+ TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size()));
for (const auto& record : records) {
forward.emplace_back(record.Order, record.PathId, record.BodySize);
@@ -966,7 +966,7 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());
Y_ABORT_UNLESS(OutChangeSender);
- Send(OutChangeSender, new TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));
+ Send(OutChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));
}
void TDataShard::UpdateChangeExchangeLag(TInstant now) {
diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp
index a49a99c488..d70874dd64 100644
--- a/ydb/core/tx/datashard/datashard_change_sending.cpp
+++ b/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -233,7 +233,7 @@ public:
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Send " << records.size() << " change records"
<< ": to# " << to
<< ", at tablet# " << Self->TabletID());
- ctx.Send(to, new TEvChangeExchange::TEvRecords(std::move(records)));
+ ctx.Send(to, new NChangeExchange::TEvChangeExchange::TEvRecords(std::move(records)));
}
size_t forgotten = 0;
@@ -246,7 +246,7 @@ public:
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Forget " << records.size() << " change records"
<< ": to# " << to
<< ", at tablet# " << Self->TabletID());
- ctx.Send(to, new TEvChangeExchange::TEvForgetRecords(std::move(records)));
+ ctx.Send(to, new NChangeExchange::TEvChangeExchange::TEvForgetRecords(std::move(records)));
}
size_t left = Accumulate(Self->ChangeRecordsRequested, (size_t)0, [](size_t sum, const auto& kv) {
@@ -408,7 +408,7 @@ private:
}; // TTxChangeExchangeSplitAck
/// Request
-void TDataShard::Handle(TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx) {
+void TDataShard::Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx) {
ChangeRecordsRequested[ev->Sender].insert(ev->Get()->Records.begin(), ev->Get()->Records.end());
SetCounter(COUNTER_CHANGE_QUEUE_SIZE, Accumulate(ChangeRecordsRequested, (size_t)0, [](size_t sum, const auto& kv) {
return sum + kv.second.size();
@@ -428,7 +428,7 @@ void TDataShard::Handle(TEvPrivate::TEvRequestChangeRecords::TPtr&, const TActor
}
/// Remove
-void TDataShard::Handle(TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx) {
+void TDataShard::Handle(NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx) {
ChangeRecordsToRemove.insert(ev->Get()->Records.begin(), ev->Get()->Records.end());
ScheduleRemoveChangeRecords(ctx);
}
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 2d5ff95712..872410ac2c 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -33,6 +33,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
+#include <ydb/core/change_exchange/change_exchange.h>
#include <ydb/core/engine/mkql_engine_flat_host.h>
#include <ydb/core/tablet/pipe_tracker.h>
#include <ydb/core/tablet/tablet_exception.h>
@@ -1304,8 +1305,8 @@ class TDataShard
void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx);
// change sending
- void Handle(TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx);
+ void Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx);
+ void Handle(NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx);
void ScheduleRequestChangeRecords(const TActorContext& ctx);
void ScheduleRemoveChangeRecords(const TActorContext& ctx);
void Handle(TEvPrivate::TEvRequestChangeRecords::TPtr& ev, const TActorContext& ctx);
@@ -2729,7 +2730,7 @@ private:
}
};
- using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
+ using TRequestedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo;
// split/merge
TChangeSenderActivator ChangeSenderActivator;
@@ -2972,8 +2973,8 @@ protected:
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted);
IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyUnavailable);
- HFunc(TEvChangeExchange::TEvRequestRecords, Handle);
- HFunc(TEvChangeExchange::TEvRemoveRecords, Handle);
+ HFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle);
+ HFunc(NChangeExchange::TEvChangeExchange::TEvRemoveRecords, Handle);
HFunc(TEvPrivate::TEvRequestChangeRecords, Handle);
HFunc(TEvPrivate::TEvRemoveChangeRecords, Handle);
HFunc(TEvChangeExchange::TEvHandshake, Handle);
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index e370a9b9a4..a744dbfeb5 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -218,20 +218,20 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
- case TEvChangeExchange::EvEnqueueRecords:
- for (const auto& record : ev->Get<TEvChangeExchange::TEvEnqueueRecords>()->Records) {
+ case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:
+ for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords>()->Records) {
enqueued.insert(record.Order);
}
break;
- case TEvChangeExchange::EvRequestRecords:
- for (const auto& record : ev->Get<TEvChangeExchange::TEvRequestRecords>()->Records) {
+ case NChangeExchange::TEvChangeExchange::EvRequestRecords:
+ for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRequestRecords>()->Records) {
requested.insert(record.Order);
}
break;
- case TEvChangeExchange::EvRemoveRecords:
- for (const auto& record : ev->Get<TEvChangeExchange::TEvRemoveRecords>()->Records) {
+ case NChangeExchange::TEvChangeExchange::EvRemoveRecords:
+ for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRemoveRecords>()->Records) {
removed.insert(record);
}
break;
@@ -307,14 +307,14 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
return TTestActorRuntime::EEventAction::PROCESS;
}
- case TEvChangeExchange::EvEnqueueRecords:
- for (const auto& record : ev->Get<TEvChangeExchange::TEvEnqueueRecords>()->Records) {
+ case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:
+ for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords>()->Records) {
enqueued.insert(record.Order);
}
break;
- case TEvChangeExchange::EvRemoveRecords:
- for (const auto& record : ev->Get<TEvChangeExchange::TEvRemoveRecords>()->Records) {
+ case NChangeExchange::TEvChangeExchange::EvRemoveRecords:
+ for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRemoveRecords>()->Records) {
removed.insert(record);
}
break;
@@ -371,7 +371,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
bool inited = false;
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
- case TEvChangeExchange::EvEnqueueRecords:
+ case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:
delayed.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
@@ -416,14 +416,14 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
THashSet<ui64> removed;
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
- case TEvChangeExchange::EvEnqueueRecords:
- for (const auto& record : ev->Get<TEvChangeExchange::TEvEnqueueRecords>()->Records) {
+ case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:
+ for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords>()->Records) {
enqueued.insert(record.Order);
}
break;
- case TEvChangeExchange::EvRemoveRecords:
- for (const auto& record : ev->Get<TEvChangeExchange::TEvRemoveRecords>()->Records) {
+ case NChangeExchange::TEvChangeExchange::EvRemoveRecords:
+ for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRemoveRecords>()->Records) {
removed.insert(record);
}
break;
@@ -481,7 +481,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
- case TEvChangeExchange::EvEnqueueRecords:
+ case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:
if (preventEnqueueing) {
enqueued.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
@@ -636,7 +636,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
- case TEvChangeExchange::EvEnqueueRecords:
+ case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:
if (preventEnqueueing) {
enqueued.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
@@ -1886,7 +1886,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
env.GetServer()->GetRuntime()->SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
- case TEvChangeExchange::EvEnqueueRecords:
+ case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:
if (preventEnqueueing) {
enqueued.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
@@ -2143,7 +2143,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
env.GetServer()->GetRuntime()->SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
- case TEvChangeExchange::EvEnqueueRecords:
+ case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:
if (preventEnqueueing) {
enqueued.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
@@ -2240,7 +2240,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
env.GetServer()->GetRuntime()->SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
- case TEvChangeExchange::EvEnqueueRecords:
+ case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:
if (preventEnqueueing || (preventEnqueueingOnSpecificSender && *preventEnqueueingOnSpecificSender == ev->Recipient)) {
enqueued.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
@@ -2432,7 +2432,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
TVector<THolder<IEventHandle>> enqueued;
auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
- if (ev->GetTypeRewrite() == TEvChangeExchange::EvEnqueueRecords) {
+ if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvEnqueueRecords) {
enqueued.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
}
@@ -2474,8 +2474,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
THashSet<ui64> enqueued;
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
- if (ev->GetTypeRewrite() == TEvChangeExchange::EvEnqueueRecords) {
- for (const auto& record : ev->Get<TEvChangeExchange::TEvEnqueueRecords>()->Records) {
+ if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvEnqueueRecords) {
+ for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords>()->Records) {
enqueued.insert(record.Order);
}
@@ -2506,8 +2506,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
THashSet<ui64> removed;
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
- if (ev->GetTypeRewrite() == TEvChangeExchange::EvRemoveRecords) {
- for (const auto& record : ev->Get<TEvChangeExchange::TEvRemoveRecords>()->Records) {
+ if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvRemoveRecords) {
+ for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRemoveRecords>()->Records) {
removed.insert(record);
}
}
@@ -2743,7 +2743,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
break;
- case TEvChangeExchange::EvEnqueueRecords:
+ case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:
delayed.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
diff --git a/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp b/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp
index 0d05b12d68..18bd1d0a45 100644
--- a/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp
+++ b/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp
@@ -1,6 +1,6 @@
#include <ydb/core/base/path.h>
+#include <ydb/core/change_exchange/change_exchange.h>
#include <ydb/core/scheme/scheme_tablecell.h>
-#include <ydb/core/tx/datashard/change_exchange.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/schemeshard/ut_helpers/test_with_reboots.h>
#include <ydb/core/testlib/tablet_helpers.h>
@@ -371,7 +371,7 @@ Y_UNIT_TEST_SUITE(TAsyncIndexTests) {
TVector<THolder<IEventHandle>> enqueued;
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
- if (ev->GetTypeRewrite() == NDataShard::TEvChangeExchange::EvEnqueueRecords) {
+ if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvEnqueueRecords) {
enqueued.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
}