diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-01-19 18:18:10 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-19 18:18:10 +0300 |
commit | e7c25d2a09edbdcebda1e8d5248249d3a66dbf0e (patch) | |
tree | 912cd357f29d620ad22e1f4326c4cbb62af21f25 | |
parent | d6ad10e2a2bffcbd01fc40ab70ba47335feca9fe (diff) | |
download | ydb-e7c25d2a09edbdcebda1e8d5248249d3a66dbf0e.tar.gz |
(refactoring) Split TEvChangeExchange into two parts: common & DS KIKIMR-20673 (#1152)
-rw-r--r-- | ydb/core/base/events.h | 3 | ||||
-rw-r--r-- | ydb/core/change_exchange/change_exchange.cpp | 125 | ||||
-rw-r--r-- | ydb/core/change_exchange/change_exchange.h | 101 | ||||
-rw-r--r-- | ydb/core/change_exchange/ya.make | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_exchange.cpp | 124 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_exchange.h | 91 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_async_index.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.h | 14 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_change_sending.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 11 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 52 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp | 4 |
16 files changed, 317 insertions, 286 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index f9bf6ad31f..97f89dc978 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -135,7 +135,7 @@ struct TKikimrEvents : TEvents { ES_HEALTH_CHECK, ES_DQ = NYql::NDq::TDqEvents::ES_DQ_COMPUTE, // 4212 ES_YQ, // 4213 - ES_CHANGE_EXCHANGE, + ES_CHANGE_EXCHANGE_DATASHARD, ES_DATABASE_SERVICE, //4215 ES_SEQUENCESHARD, // 4216 ES_SEQUENCEPROXY, // 4217 @@ -172,6 +172,7 @@ struct TKikimrEvents : TEvents { ES_PQ_PARTITION_CHOOSER, ES_GRAPH, ES_REPLICATION_SERVICE, + ES_CHANGE_EXCHANGE, }; }; diff --git a/ydb/core/change_exchange/change_exchange.cpp b/ydb/core/change_exchange/change_exchange.cpp new file mode 100644 index 0000000000..92df9b3cea --- /dev/null +++ b/ydb/core/change_exchange/change_exchange.cpp @@ -0,0 +1,125 @@ +#include "change_exchange.h" + +#include <util/string/builder.h> +#include <util/string/join.h> + +namespace NKikimr::NChangeExchange { + +/// TEvEnqueueRecords +TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(const TVector<TRecordInfo>& records) + : Records(records) +{ +} + +TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(TVector<TRecordInfo>&& records) + : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvEnqueueRecords::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " Records [" << JoinSeq(",", Records) << "]" + << " }"; +} + +TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize) + : Order(order) + , PathId(pathId) + , BodySize(bodySize) +{ +} + +void TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::Out(IOutputStream& out) const { + out << "{" + << " Order: " << Order + << " PathId: " << PathId + << " BodySize: " << BodySize + << " }"; +} + +/// TEvRequestRecords +TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(const TVector<TRecordInfo>& records) + : Records(records) +{ +} + +TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(TVector<TRecordInfo>&& records) + : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvRequestRecords::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " Records [" << JoinSeq(",", Records) << "]" + << " }"; +} + +TEvChangeExchange::TEvRequestRecords::TRecordInfo::TRecordInfo(ui64 order, ui64 bodySize) + : Order(order) + , BodySize(bodySize) +{ +} + +bool TEvChangeExchange::TEvRequestRecords::TRecordInfo::operator<(const TRecordInfo& rhs) const { + return Order < rhs.Order; +} + +void TEvChangeExchange::TEvRequestRecords::TRecordInfo::Out(IOutputStream& out) const { + out << "{" + << " Order: " << Order + << " BodySize: " << BodySize + << " }"; +} + +/// TEvRemoveRecords +TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(const TVector<ui64>& records) + : Records(records) +{ +} + +TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(TVector<ui64>&& records) + : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvRemoveRecords::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " Records [" << JoinSeq(",", Records) << "]" + << " }"; +} + +/// TEvRecords +TEvChangeExchange::TEvRecords::TEvRecords(const TVector<IChangeRecord::TPtr>& records) + : Records(records) +{ +} + +TEvChangeExchange::TEvRecords::TEvRecords(TVector<IChangeRecord::TPtr>&& records) + : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvRecords::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " Records [" << JoinSeq(",", Records) << "]" + << " }"; +} + +/// TEvForgetRecords +TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(const TVector<ui64>& records) + : Records(records) +{ +} + +TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(TVector<ui64>&& records) + : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvForgetRecords::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " Records [" << JoinSeq(",", Records) << "]" + << " }"; +} + +} diff --git a/ydb/core/change_exchange/change_exchange.h b/ydb/core/change_exchange/change_exchange.h new file mode 100644 index 0000000000..9cd000b4c2 --- /dev/null +++ b/ydb/core/change_exchange/change_exchange.h @@ -0,0 +1,101 @@ +#pragma once + +#include "change_record.h" + +#include <ydb/core/base/defs.h> +#include <ydb/core/base/events.h> +#include <ydb/core/scheme/scheme_pathid.h> + +#include <util/generic/vector.h> + +namespace NKikimr::NChangeExchange { + +struct TEvChangeExchange { + enum EEv { + // Enqueue for sending + EvEnqueueRecords = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE), + // Request change record(s) by id + EvRequestRecords, + // Change record(s) + EvRecords, + // Remove change record(s) from local database + EvRemoveRecords, + // Already removed records that the sender should forget about + EvForgetRecods, + + EvEnd, + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE)); + + struct TEvEnqueueRecords: public TEventLocal<TEvEnqueueRecords, EvEnqueueRecords> { + struct TRecordInfo { + ui64 Order; + TPathId PathId; + ui64 BodySize; + + TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize); + + void Out(IOutputStream& out) const; + }; + + TVector<TRecordInfo> Records; + + explicit TEvEnqueueRecords(const TVector<TRecordInfo>& records); + explicit TEvEnqueueRecords(TVector<TRecordInfo>&& records); + TString ToString() const override; + }; + + struct TEvRequestRecords: public TEventLocal<TEvRequestRecords, EvRequestRecords> { + struct TRecordInfo { + ui64 Order; + ui64 BodySize; + + TRecordInfo(ui64 order, ui64 bodySize = 0); + + bool operator<(const TRecordInfo& rhs) const; + void Out(IOutputStream& out) const; + }; + + TVector<TRecordInfo> Records; + + explicit TEvRequestRecords(const TVector<TRecordInfo>& records); + explicit TEvRequestRecords(TVector<TRecordInfo>&& records); + TString ToString() const override; + }; + + struct TEvRemoveRecords: public TEventLocal<TEvRemoveRecords, EvRemoveRecords> { + TVector<ui64> Records; + + explicit TEvRemoveRecords(const TVector<ui64>& records); + explicit TEvRemoveRecords(TVector<ui64>&& records); + TString ToString() const override; + }; + + struct TEvRecords: public TEventLocal<TEvRecords, EvRecords> { + TVector<IChangeRecord::TPtr> Records; + + explicit TEvRecords(const TVector<IChangeRecord::TPtr>& records); + explicit TEvRecords(TVector<IChangeRecord::TPtr>&& records); + TString ToString() const override; + }; + + struct TEvForgetRecords: public TEventLocal<TEvForgetRecords, EvForgetRecods> { + TVector<ui64> Records; + + explicit TEvForgetRecords(const TVector<ui64>& records); + explicit TEvForgetRecords(TVector<ui64>&& records); + TString ToString() const override; + }; + +}; // TEvChangeExchange + +} + +Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo, o, x) { + return x.Out(o); +} + +Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo, o, x) { + return x.Out(o); +} diff --git a/ydb/core/change_exchange/ya.make b/ydb/core/change_exchange/ya.make index 1ba88203fe..a00055ba54 100644 --- a/ydb/core/change_exchange/ya.make +++ b/ydb/core/change_exchange/ya.make @@ -1,11 +1,17 @@ LIBRARY() SRCS( + change_exchange.cpp change_record.cpp ) GENERATE_ENUM_SERIALIZATION(change_record.h) +PEERDIR( + ydb/core/base + ydb/core/scheme +) + YQL_LAST_ABI_VERSION() END() diff --git a/ydb/core/tx/datashard/change_exchange.cpp b/ydb/core/tx/datashard/change_exchange.cpp index 99c7f6cec7..752667384f 100644 --- a/ydb/core/tx/datashard/change_exchange.cpp +++ b/ydb/core/tx/datashard/change_exchange.cpp @@ -1,127 +1,8 @@ #include "change_exchange.h" #include <util/string/builder.h> -#include <util/string/join.h> -namespace NKikimr { -namespace NDataShard { - -/// TEvEnqueueRecords -TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(const TVector<TRecordInfo>& records) - : Records(records) -{ -} - -TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(TVector<TRecordInfo>&& records) - : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvEnqueueRecords::ToString() const { - return TStringBuilder() << ToStringHeader() << " {" - << " Records [" << JoinSeq(",", Records) << "]" - << " }"; -} - -TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize) - : Order(order) - , PathId(pathId) - , BodySize(bodySize) -{ -} - -void TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::Out(IOutputStream& out) const { - out << "{" - << " Order: " << Order - << " PathId: " << PathId - << " BodySize: " << BodySize - << " }"; -} - -/// TEvRequestRecords -TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(const TVector<TRecordInfo>& records) - : Records(records) -{ -} - -TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(TVector<TRecordInfo>&& records) - : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvRequestRecords::ToString() const { - return TStringBuilder() << ToStringHeader() << " {" - << " Records [" << JoinSeq(",", Records) << "]" - << " }"; -} - -TEvChangeExchange::TEvRequestRecords::TRecordInfo::TRecordInfo(ui64 order, ui64 bodySize) - : Order(order) - , BodySize(bodySize) -{ -} - -bool TEvChangeExchange::TEvRequestRecords::TRecordInfo::operator<(const TRecordInfo& rhs) const { - return Order < rhs.Order; -} - -void TEvChangeExchange::TEvRequestRecords::TRecordInfo::Out(IOutputStream& out) const { - out << "{" - << " Order: " << Order - << " BodySize: " << BodySize - << " }"; -} - -/// TEvRemoveRecords -TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(const TVector<ui64>& records) - : Records(records) -{ -} - -TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(TVector<ui64>&& records) - : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvRemoveRecords::ToString() const { - return TStringBuilder() << ToStringHeader() << " {" - << " Records [" << JoinSeq(",", Records) << "]" - << " }"; -} - -/// TEvRecords -TEvChangeExchange::TEvRecords::TEvRecords(const TVector<NChangeExchange::IChangeRecord::TPtr>& records) - : Records(records) -{ -} - -TEvChangeExchange::TEvRecords::TEvRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) - : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvRecords::ToString() const { - return TStringBuilder() << ToStringHeader() << " {" - << " Records [" << JoinSeq(",", Records) << "]" - << " }"; -} - -/// TEvForgetRecords -TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(const TVector<ui64>& records) - : Records(records) -{ -} - -TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(TVector<ui64>&& records) - : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvForgetRecords::ToString() const { - return TStringBuilder() << ToStringHeader() << " {" - << " Records [" << JoinSeq(",", Records) << "]" - << " }"; -} +namespace NKikimr::NDataShard { /// TEvAddSender TEvChangeExchange::TEvAddSender::TEvAddSender(const TTableId& userTableId, TEvChangeExchange::ESenderType type, const TPathId& pathId) @@ -151,5 +32,4 @@ TString TEvChangeExchange::TEvRemoveSender::ToString() const { << " }"; } -} // NDataShard -} // NKikimr +} diff --git a/ydb/core/tx/datashard/change_exchange.h b/ydb/core/tx/datashard/change_exchange.h index 226a5125f5..548fe14e31 100644 --- a/ydb/core/tx/datashard/change_exchange.h +++ b/ydb/core/tx/datashard/change_exchange.h @@ -2,14 +2,13 @@ #include "defs.h" -#include <ydb/core/change_exchange/change_record.h> #include <ydb/core/protos/change_exchange.pb.h> +#include <ydb/core/scheme/scheme_pathid.h> #include <ydb/core/scheme/scheme_tabledefs.h> #include <util/generic/vector.h> -namespace NKikimr { -namespace NDataShard { +namespace NKikimr::NDataShard { class TDataShard; @@ -17,7 +16,7 @@ struct TEvChangeExchange { enum EEv { /// Network exchange protocol // Handshake between sender & receiver - EvHandshake = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE), + EvHandshake = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE_DATASHARD), // Apply change record(s) on receiver EvApplyRecords, // Handshake & application status @@ -27,30 +26,17 @@ struct TEvChangeExchange { EvActivateSenderAck, /// Local exchange (mostly using change's id) - // Enqueue for sending - EvEnqueueRecords, - // Request change record(s) by id - EvRequestRecords, - // Change record(s) - EvRecords, - // Remove change record(s) from local database - EvRemoveRecords, - // Add new change sender EvAddSender, // Remove existing change sender EvRemoveSender, - - // Already removed records that the sender should forget about - EvForgetRecods, - // Split/merge EvSplitAck, EvEnd, }; - static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE)); + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE_DATASHARD)); /// Network events struct TEvHandshake: public TEventPB<TEvHandshake, NKikimrChangeExchange::TEvHandshake, EvHandshake> {}; @@ -60,66 +46,6 @@ struct TEvChangeExchange { struct TEvActivateSenderAck: public TEventPB<TEvActivateSenderAck, NKikimrChangeExchange::TEvActivateSenderAck, EvActivateSenderAck> {}; /// Local events - struct TEvEnqueueRecords: public TEventLocal<TEvEnqueueRecords, EvEnqueueRecords> { - struct TRecordInfo { - ui64 Order; - TPathId PathId; - ui64 BodySize; - - TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize); - - void Out(IOutputStream& out) const; - }; - - TVector<TRecordInfo> Records; - - explicit TEvEnqueueRecords(const TVector<TRecordInfo>& records); - explicit TEvEnqueueRecords(TVector<TRecordInfo>&& records); - TString ToString() const override; - }; - - struct TEvRequestRecords: public TEventLocal<TEvRequestRecords, EvRequestRecords> { - struct TRecordInfo { - ui64 Order; - ui64 BodySize; - - TRecordInfo(ui64 order, ui64 bodySize = 0); - - bool operator<(const TRecordInfo& rhs) const; - void Out(IOutputStream& out) const; - }; - - TVector<TRecordInfo> Records; - - explicit TEvRequestRecords(const TVector<TRecordInfo>& records); - explicit TEvRequestRecords(TVector<TRecordInfo>&& records); - TString ToString() const override; - }; - - struct TEvRemoveRecords: public TEventLocal<TEvRemoveRecords, EvRemoveRecords> { - TVector<ui64> Records; - - explicit TEvRemoveRecords(const TVector<ui64>& records); - explicit TEvRemoveRecords(TVector<ui64>&& records); - TString ToString() const override; - }; - - struct TEvRecords: public TEventLocal<TEvRecords, EvRecords> { - TVector<NChangeExchange::IChangeRecord::TPtr> Records; - - explicit TEvRecords(const TVector<NChangeExchange::IChangeRecord::TPtr>& records); - explicit TEvRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records); - TString ToString() const override; - }; - - struct TEvForgetRecords: public TEventLocal<TEvForgetRecords, EvForgetRecods> { - TVector<ui64> Records; - - explicit TEvForgetRecords(const TVector<ui64>& records); - explicit TEvForgetRecords(TVector<ui64>&& records); - TString ToString() const override; - }; - enum class ESenderType { AsyncIndex, CdcStream, @@ -149,13 +75,4 @@ struct TEvChangeExchange { IActor* CreateChangeSender(const TDataShard* self); IActor* CreateChangeExchangeSplit(const TDataShard* self, const TVector<ui64>& dstDataShards); -} // NDataShard -} // NKikimr - -Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo, o, x) { - return x.Out(o); -} - -Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TEvChangeExchange::TEvRequestRecords::TRecordInfo, o, x) { - return x.Out(o); } diff --git a/ydb/core/tx/datashard/change_sender.cpp b/ydb/core/tx/datashard/change_sender.cpp index 825ffe9080..1f617d252e 100644 --- a/ydb/core/tx/datashard/change_sender.cpp +++ b/ydb/core/tx/datashard/change_sender.cpp @@ -3,12 +3,12 @@ #include "change_sender_monitoring.h" #include "datashard_impl.h" -#include <ydb/library/services/services.pb.h> - +#include <ydb/core/change_exchange/change_exchange.h> #include <ydb/library/actors/core/actor.h> #include <ydb/library/actors/core/hfunc.h> #include <ydb/library/actors/core/log.h> #include <ydb/library/actors/core/mon.h> +#include <ydb/library/services/services.pb.h> #include <library/cpp/monlib/service/pages/mon_page.h> #include <library/cpp/monlib/service/pages/templates.h> @@ -19,7 +19,7 @@ namespace NKikimr::NDataShard { class TChangeSender: public TActor<TChangeSender> { using ESenderType = TEvChangeExchange::ESenderType; - using TEnqueuedRecord = TEvChangeExchange::TEvEnqueueRecords::TRecordInfo; + using TEnqueuedRecord = NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo; struct TSender { TTableId UserTableId; @@ -67,7 +67,7 @@ class TChangeSender: public TActor<TChangeSender> { sender.ActorId = RegisterChangeSender(pathId, sender.UserTableId, sender.Type); } - void Handle(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); auto& records = ev->Get()->Records; @@ -92,11 +92,11 @@ class TChangeSender: public TActor<TChangeSender> { } for (auto& [to, records] : forward) { - Send(to, new TEvChangeExchange::TEvEnqueueRecords(std::move(records))); + Send(to, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(std::move(records))); } if (remove) { - Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); + Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(remove))); } } @@ -289,7 +289,7 @@ public: STFUNC(StateBase) { switch (ev->GetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, Handle); hFunc(TEvChangeExchange::TEvAddSender, Handle); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); HFunc(NMon::TEvRemoteHttpInfo, Handle); diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 9ba3b7c037..d835601345 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -115,13 +115,13 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS STATEFN(StateWaitingRecords) { switch (ev->GetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); default: return StateBase(ev); } } - void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); auto records = MakeHolder<TEvChangeExchange::TEvApplyRecords>(); @@ -738,17 +738,17 @@ class TAsyncIndexChangeSenderMain return new TAsyncIndexChangeSenderShard(SelfId(), DataShard, partitionId, IndexTablePathId, TagMap); } - void Handle(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); EnqueueRecords(std::move(ev->Get()->Records)); } - void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); ProcessRecords(std::move(ev->Get()->Records)); } - void Handle(TEvChangeExchange::TEvForgetRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvForgetRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); ForgetRecords(std::move(ev->Get()->Records)); } @@ -771,7 +771,7 @@ class TAsyncIndexChangeSenderMain PassAway(); } - void AutoRemove(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { + void AutoRemove(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); RemoveRecords(std::move(ev->Get()->Records)); } @@ -804,9 +804,9 @@ public: STFUNC(StateBase) { switch (ev->GetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle); - hFunc(TEvChangeExchange::TEvRecords, Handle); - hFunc(TEvChangeExchange::TEvForgetRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); hFunc(TEvChangeExchangePrivate::TEvReady, Handle); hFunc(TEvChangeExchangePrivate::TEvGone, Handle); @@ -817,7 +817,7 @@ public: STFUNC(StatePendingRemove) { switch (ev->GetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvEnqueueRecords, AutoRemove); + hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, AutoRemove); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); HFunc(NMon::TEvRemoteHttpInfo, Handle); sFunc(TEvents::TEvPoison, PassAway); diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 0d82b1eedf..60e107569e 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -83,14 +83,14 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti STATEFN(StateWaitingRecords) { switch (ev->GetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); sFunc(TEvPartitionWriter::TEvWriteResponse, Lost); default: return StateBase(ev); } } - void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); NKikimrClient::TPersQueueRequest request; @@ -703,17 +703,17 @@ class TCdcChangeSenderMain return new TCdcChangeSenderPartition(SelfId(), DataShard, partitionId, shardId, Stream); } - void Handle(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); EnqueueRecords(std::move(ev->Get()->Records)); } - void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); ProcessRecords(std::move(ev->Get()->Records)); } - void Handle(TEvChangeExchange::TEvForgetRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvForgetRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); ForgetRecords(std::move(ev->Get()->Records)); } @@ -736,7 +736,7 @@ class TCdcChangeSenderMain PassAway(); } - void AutoRemove(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { + void AutoRemove(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); RemoveRecords(std::move(ev->Get()->Records)); } @@ -768,9 +768,9 @@ public: STFUNC(StateBase) { switch (ev->GetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle); - hFunc(TEvChangeExchange::TEvRecords, Handle); - hFunc(TEvChangeExchange::TEvForgetRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); hFunc(TEvChangeExchangePrivate::TEvReady, Handle); hFunc(TEvChangeExchangePrivate::TEvGone, Handle); @@ -781,7 +781,7 @@ public: STFUNC(StatePendingRemove) { switch (ev->GetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvEnqueueRecords, AutoRemove); + hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, AutoRemove); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); HFunc(NMon::TEvRemoteHttpInfo, Handle); sFunc(TEvents::TEvPoison, PassAway); diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index d46d467daa..1c57d3c17a 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -82,7 +82,7 @@ void TBaseChangeSender::KillSenders() { } } -void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) { +void TBaseChangeSender::EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) { for (auto& record : records) { Y_VERIFY_S(PathId == record.PathId, "Unexpected record's path id" << ": expected# " << PathId @@ -117,7 +117,7 @@ bool TBaseChangeSender::RequestRecords() { return false; } - ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRequestRecords(std::move(records))); + ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRequestRecords(std::move(records))); return true; } @@ -290,7 +290,7 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) { } Y_ABORT_UNLESS(sender.ActorId); - ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {}))); + ActorOps->Send(sender.ActorId, new NChangeExchange::TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {}))); } void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) { diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h index 45351ee5af..6b418ebf7d 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/tx/datashard/change_sender_common_ops.h @@ -4,7 +4,7 @@ #include "change_exchange_helpers.h" #include <ydb/core/base/appdata.h> - +#include <ydb/core/change_exchange/change_exchange.h> #include <ydb/library/actors/core/actor.h> #include <ydb/library/actors/core/hfunc.h> #include <ydb/library/actors/core/mon.h> @@ -61,7 +61,7 @@ public: virtual IActor* CreateSender(ui64 partitionId) = 0; virtual void RemoveRecords() = 0; - virtual void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) = 0; + virtual void EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) = 0; virtual void ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) = 0; virtual void ForgetRecords(TVector<ui64>&& records) = 0; virtual void OnReady(ui64 partitionId) = 0; @@ -79,8 +79,8 @@ public: }; class TBaseChangeSender: public IChangeSender { - using TEnqueuedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; - using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; + using TEnqueuedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo; + using TRequestedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo; struct TSender { TActorId ActorId; @@ -124,19 +124,19 @@ protected: remove.push_back(record.Order); } - ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); + ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(remove))); } template <> void RemoveRecords(TVector<ui64>&& records) { - ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(records))); + ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(records))); } void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) override; void KillSenders() override; void RemoveRecords() override; - void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) override; + void EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) override; void ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) override; void ForgetRecords(TVector<ui64>&& records) override; void OnReady(ui64 partitionId) override; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index c251597f7c..dcc2cec7ab 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -939,7 +939,7 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange << ", records: " << JoinSeq(", ", records)); const auto now = AppData()->TimeProvider->Now(); - TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size())); + TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size())); for (const auto& record : records) { forward.emplace_back(record.Order, record.PathId, record.BodySize); @@ -966,7 +966,7 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size()); Y_ABORT_UNLESS(OutChangeSender); - Send(OutChangeSender, new TEvChangeExchange::TEvEnqueueRecords(std::move(forward))); + Send(OutChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(std::move(forward))); } void TDataShard::UpdateChangeExchangeLag(TInstant now) { diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp index a49a99c488..d70874dd64 100644 --- a/ydb/core/tx/datashard/datashard_change_sending.cpp +++ b/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -233,7 +233,7 @@ public: LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Send " << records.size() << " change records" << ": to# " << to << ", at tablet# " << Self->TabletID()); - ctx.Send(to, new TEvChangeExchange::TEvRecords(std::move(records))); + ctx.Send(to, new NChangeExchange::TEvChangeExchange::TEvRecords(std::move(records))); } size_t forgotten = 0; @@ -246,7 +246,7 @@ public: LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Forget " << records.size() << " change records" << ": to# " << to << ", at tablet# " << Self->TabletID()); - ctx.Send(to, new TEvChangeExchange::TEvForgetRecords(std::move(records))); + ctx.Send(to, new NChangeExchange::TEvChangeExchange::TEvForgetRecords(std::move(records))); } size_t left = Accumulate(Self->ChangeRecordsRequested, (size_t)0, [](size_t sum, const auto& kv) { @@ -408,7 +408,7 @@ private: }; // TTxChangeExchangeSplitAck /// Request -void TDataShard::Handle(TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx) { +void TDataShard::Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx) { ChangeRecordsRequested[ev->Sender].insert(ev->Get()->Records.begin(), ev->Get()->Records.end()); SetCounter(COUNTER_CHANGE_QUEUE_SIZE, Accumulate(ChangeRecordsRequested, (size_t)0, [](size_t sum, const auto& kv) { return sum + kv.second.size(); @@ -428,7 +428,7 @@ void TDataShard::Handle(TEvPrivate::TEvRequestChangeRecords::TPtr&, const TActor } /// Remove -void TDataShard::Handle(TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx) { +void TDataShard::Handle(NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx) { ChangeRecordsToRemove.insert(ev->Get()->Records.begin(), ev->Get()->Records.end()); ScheduleRemoveChangeRecords(ctx); } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 2d5ff95712..872410ac2c 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -33,6 +33,7 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/base/tablet_pipe.h> #include <ydb/library/ydb_issue/issue_helpers.h> +#include <ydb/core/change_exchange/change_exchange.h> #include <ydb/core/engine/mkql_engine_flat_host.h> #include <ydb/core/tablet/pipe_tracker.h> #include <ydb/core/tablet/tablet_exception.h> @@ -1304,8 +1305,8 @@ class TDataShard void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx); // change sending - void Handle(TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx); - void Handle(TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx); + void Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx); + void Handle(NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx); void ScheduleRequestChangeRecords(const TActorContext& ctx); void ScheduleRemoveChangeRecords(const TActorContext& ctx); void Handle(TEvPrivate::TEvRequestChangeRecords::TPtr& ev, const TActorContext& ctx); @@ -2729,7 +2730,7 @@ private: } }; - using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; + using TRequestedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo; // split/merge TChangeSenderActivator ChangeSenderActivator; @@ -2972,8 +2973,8 @@ protected: HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted); IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyUnavailable); - HFunc(TEvChangeExchange::TEvRequestRecords, Handle); - HFunc(TEvChangeExchange::TEvRemoveRecords, Handle); + HFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle); + HFunc(NChangeExchange::TEvChangeExchange::TEvRemoveRecords, Handle); HFunc(TEvPrivate::TEvRequestChangeRecords, Handle); HFunc(TEvPrivate::TEvRemoveChangeRecords, Handle); HFunc(TEvChangeExchange::TEvHandshake, Handle); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index e370a9b9a4..a744dbfeb5 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -218,20 +218,20 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: - for (const auto& record : ev->Get<TEvChangeExchange::TEvEnqueueRecords>()->Records) { + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: + for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords>()->Records) { enqueued.insert(record.Order); } break; - case TEvChangeExchange::EvRequestRecords: - for (const auto& record : ev->Get<TEvChangeExchange::TEvRequestRecords>()->Records) { + case NChangeExchange::TEvChangeExchange::EvRequestRecords: + for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRequestRecords>()->Records) { requested.insert(record.Order); } break; - case TEvChangeExchange::EvRemoveRecords: - for (const auto& record : ev->Get<TEvChangeExchange::TEvRemoveRecords>()->Records) { + case NChangeExchange::TEvChangeExchange::EvRemoveRecords: + for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRemoveRecords>()->Records) { removed.insert(record); } break; @@ -307,14 +307,14 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { return TTestActorRuntime::EEventAction::PROCESS; } - case TEvChangeExchange::EvEnqueueRecords: - for (const auto& record : ev->Get<TEvChangeExchange::TEvEnqueueRecords>()->Records) { + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: + for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords>()->Records) { enqueued.insert(record.Order); } break; - case TEvChangeExchange::EvRemoveRecords: - for (const auto& record : ev->Get<TEvChangeExchange::TEvRemoveRecords>()->Records) { + case NChangeExchange::TEvChangeExchange::EvRemoveRecords: + for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRemoveRecords>()->Records) { removed.insert(record); } break; @@ -371,7 +371,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { bool inited = false; runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: delayed.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -416,14 +416,14 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { THashSet<ui64> removed; runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: - for (const auto& record : ev->Get<TEvChangeExchange::TEvEnqueueRecords>()->Records) { + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: + for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords>()->Records) { enqueued.insert(record.Order); } break; - case TEvChangeExchange::EvRemoveRecords: - for (const auto& record : ev->Get<TEvChangeExchange::TEvRemoveRecords>()->Records) { + case NChangeExchange::TEvChangeExchange::EvRemoveRecords: + for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRemoveRecords>()->Records) { removed.insert(record); } break; @@ -481,7 +481,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: if (preventEnqueueing) { enqueued.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -636,7 +636,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: if (preventEnqueueing) { enqueued.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -1886,7 +1886,7 @@ Y_UNIT_TEST_SUITE(Cdc) { env.GetServer()->GetRuntime()->SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: if (preventEnqueueing) { enqueued.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -2143,7 +2143,7 @@ Y_UNIT_TEST_SUITE(Cdc) { env.GetServer()->GetRuntime()->SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: if (preventEnqueueing) { enqueued.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -2240,7 +2240,7 @@ Y_UNIT_TEST_SUITE(Cdc) { env.GetServer()->GetRuntime()->SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: if (preventEnqueueing || (preventEnqueueingOnSpecificSender && *preventEnqueueingOnSpecificSender == ev->Recipient)) { enqueued.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -2432,7 +2432,7 @@ Y_UNIT_TEST_SUITE(Cdc) { TVector<THolder<IEventHandle>> enqueued; auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { - if (ev->GetTypeRewrite() == TEvChangeExchange::EvEnqueueRecords) { + if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvEnqueueRecords) { enqueued.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; } @@ -2474,8 +2474,8 @@ Y_UNIT_TEST_SUITE(Cdc) { THashSet<ui64> enqueued; runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { - if (ev->GetTypeRewrite() == TEvChangeExchange::EvEnqueueRecords) { - for (const auto& record : ev->Get<TEvChangeExchange::TEvEnqueueRecords>()->Records) { + if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvEnqueueRecords) { + for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords>()->Records) { enqueued.insert(record.Order); } @@ -2506,8 +2506,8 @@ Y_UNIT_TEST_SUITE(Cdc) { THashSet<ui64> removed; runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { - if (ev->GetTypeRewrite() == TEvChangeExchange::EvRemoveRecords) { - for (const auto& record : ev->Get<TEvChangeExchange::TEvRemoveRecords>()->Records) { + if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvRemoveRecords) { + for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRemoveRecords>()->Records) { removed.insert(record); } } @@ -2743,7 +2743,7 @@ Y_UNIT_TEST_SUITE(Cdc) { } break; - case TEvChangeExchange::EvEnqueueRecords: + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: delayed.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; diff --git a/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp b/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp index 0d05b12d68..18bd1d0a45 100644 --- a/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp +++ b/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp @@ -1,6 +1,6 @@ #include <ydb/core/base/path.h> +#include <ydb/core/change_exchange/change_exchange.h> #include <ydb/core/scheme/scheme_tablecell.h> -#include <ydb/core/tx/datashard/change_exchange.h> #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> #include <ydb/core/tx/schemeshard/ut_helpers/test_with_reboots.h> #include <ydb/core/testlib/tablet_helpers.h> @@ -371,7 +371,7 @@ Y_UNIT_TEST_SUITE(TAsyncIndexTests) { TVector<THolder<IEventHandle>> enqueued; runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { - if (ev->GetTypeRewrite() == NDataShard::TEvChangeExchange::EvEnqueueRecords) { + if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvEnqueueRecords) { enqueued.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; } |