diff options
| author | Ilnaz Nizametdinov <[email protected]> | 2024-01-19 18:18:10 +0300 | 
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-19 18:18:10 +0300 | 
| commit | e7c25d2a09edbdcebda1e8d5248249d3a66dbf0e (patch) | |
| tree | 912cd357f29d620ad22e1f4326c4cbb62af21f25 | |
| parent | d6ad10e2a2bffcbd01fc40ab70ba47335feca9fe (diff) | |
(refactoring) Split TEvChangeExchange into two parts: common & DS KIKIMR-20673 (#1152)
| -rw-r--r-- | ydb/core/base/events.h | 3 | ||||
| -rw-r--r-- | ydb/core/change_exchange/change_exchange.cpp | 125 | ||||
| -rw-r--r-- | ydb/core/change_exchange/change_exchange.h | 101 | ||||
| -rw-r--r-- | ydb/core/change_exchange/ya.make | 6 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/change_exchange.cpp | 124 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/change_exchange.h | 91 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/change_sender.cpp | 14 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/change_sender_async_index.cpp | 20 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 20 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.cpp | 6 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.h | 14 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 4 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_change_sending.cpp | 8 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 11 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 52 | ||||
| -rw-r--r-- | ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp | 4 | 
16 files changed, 317 insertions, 286 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index f9bf6ad31f6..97f89dc978a 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -135,7 +135,7 @@ struct TKikimrEvents : TEvents {          ES_HEALTH_CHECK,          ES_DQ = NYql::NDq::TDqEvents::ES_DQ_COMPUTE, // 4212          ES_YQ, // 4213 -        ES_CHANGE_EXCHANGE, +        ES_CHANGE_EXCHANGE_DATASHARD,          ES_DATABASE_SERVICE, //4215          ES_SEQUENCESHARD, // 4216          ES_SEQUENCEPROXY, // 4217 @@ -172,6 +172,7 @@ struct TKikimrEvents : TEvents {          ES_PQ_PARTITION_CHOOSER,          ES_GRAPH,          ES_REPLICATION_SERVICE, +        ES_CHANGE_EXCHANGE,      };  }; diff --git a/ydb/core/change_exchange/change_exchange.cpp b/ydb/core/change_exchange/change_exchange.cpp new file mode 100644 index 00000000000..92df9b3cea7 --- /dev/null +++ b/ydb/core/change_exchange/change_exchange.cpp @@ -0,0 +1,125 @@ +#include "change_exchange.h" + +#include <util/string/builder.h> +#include <util/string/join.h> + +namespace NKikimr::NChangeExchange { + +/// TEvEnqueueRecords +TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(const TVector<TRecordInfo>& records) +    : Records(records) +{ +} + +TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(TVector<TRecordInfo>&& records) +    : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvEnqueueRecords::ToString() const { +    return TStringBuilder() << ToStringHeader() << " {" +        << " Records [" << JoinSeq(",", Records) << "]" +    << " }"; +} + +TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize) +    : Order(order) +    , PathId(pathId) +    , BodySize(bodySize) +{ +} + +void TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::Out(IOutputStream& out) const { +    out << "{" +        << " Order: " << Order +        << " PathId: " << PathId +        << " BodySize: " << BodySize +    << " }"; +} + +/// TEvRequestRecords +TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(const TVector<TRecordInfo>& records) +    : Records(records) +{ +} + +TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(TVector<TRecordInfo>&& records) +    : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvRequestRecords::ToString() const { +    return TStringBuilder() << ToStringHeader() << " {" +        << " Records [" << JoinSeq(",", Records) << "]" +    << " }"; +} + +TEvChangeExchange::TEvRequestRecords::TRecordInfo::TRecordInfo(ui64 order, ui64 bodySize) +    : Order(order) +    , BodySize(bodySize) +{ +} + +bool TEvChangeExchange::TEvRequestRecords::TRecordInfo::operator<(const TRecordInfo& rhs) const { +    return Order < rhs.Order; +} + +void TEvChangeExchange::TEvRequestRecords::TRecordInfo::Out(IOutputStream& out) const { +    out << "{" +        << " Order: " << Order +        << " BodySize: " << BodySize +    << " }"; +} + +/// TEvRemoveRecords +TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(const TVector<ui64>& records) +    : Records(records) +{ +} + +TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(TVector<ui64>&& records) +    : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvRemoveRecords::ToString() const { +    return TStringBuilder() << ToStringHeader() << " {" +        << " Records [" << JoinSeq(",", Records) << "]" +    << " }"; +} + +/// TEvRecords +TEvChangeExchange::TEvRecords::TEvRecords(const TVector<IChangeRecord::TPtr>& records) +    : Records(records) +{ +} + +TEvChangeExchange::TEvRecords::TEvRecords(TVector<IChangeRecord::TPtr>&& records) +    : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvRecords::ToString() const { +    return TStringBuilder() << ToStringHeader() << " {" +        << " Records [" << JoinSeq(",", Records) << "]" +    << " }"; +} + +/// TEvForgetRecords +TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(const TVector<ui64>& records) +    : Records(records) +{ +} + +TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(TVector<ui64>&& records) +    : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvForgetRecords::ToString() const { +    return TStringBuilder() << ToStringHeader() << " {" +        << " Records [" << JoinSeq(",", Records) << "]" +    << " }"; +} + +} diff --git a/ydb/core/change_exchange/change_exchange.h b/ydb/core/change_exchange/change_exchange.h new file mode 100644 index 00000000000..9cd000b4c23 --- /dev/null +++ b/ydb/core/change_exchange/change_exchange.h @@ -0,0 +1,101 @@ +#pragma once + +#include "change_record.h" + +#include <ydb/core/base/defs.h> +#include <ydb/core/base/events.h> +#include <ydb/core/scheme/scheme_pathid.h> + +#include <util/generic/vector.h> + +namespace NKikimr::NChangeExchange { + +struct TEvChangeExchange { +    enum EEv { +        // Enqueue for sending +        EvEnqueueRecords = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE), +        // Request change record(s) by id +        EvRequestRecords, +        // Change record(s) +        EvRecords, +        // Remove change record(s) from local database +        EvRemoveRecords, +        // Already removed records that the sender should forget about +        EvForgetRecods, + +        EvEnd, +    }; + +    static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE)); + +    struct TEvEnqueueRecords: public TEventLocal<TEvEnqueueRecords, EvEnqueueRecords> { +        struct TRecordInfo { +            ui64 Order; +            TPathId PathId; +            ui64 BodySize; + +            TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize); + +            void Out(IOutputStream& out) const; +        }; + +        TVector<TRecordInfo> Records; + +        explicit TEvEnqueueRecords(const TVector<TRecordInfo>& records); +        explicit TEvEnqueueRecords(TVector<TRecordInfo>&& records); +        TString ToString() const override; +    }; + +    struct TEvRequestRecords: public TEventLocal<TEvRequestRecords, EvRequestRecords> { +        struct TRecordInfo { +            ui64 Order; +            ui64 BodySize; + +            TRecordInfo(ui64 order, ui64 bodySize = 0); + +            bool operator<(const TRecordInfo& rhs) const; +            void Out(IOutputStream& out) const; +        }; + +        TVector<TRecordInfo> Records; + +        explicit TEvRequestRecords(const TVector<TRecordInfo>& records); +        explicit TEvRequestRecords(TVector<TRecordInfo>&& records); +        TString ToString() const override; +    }; + +    struct TEvRemoveRecords: public TEventLocal<TEvRemoveRecords, EvRemoveRecords> { +        TVector<ui64> Records; + +        explicit TEvRemoveRecords(const TVector<ui64>& records); +        explicit TEvRemoveRecords(TVector<ui64>&& records); +        TString ToString() const override; +    }; + +    struct TEvRecords: public TEventLocal<TEvRecords, EvRecords> { +        TVector<IChangeRecord::TPtr> Records; + +        explicit TEvRecords(const TVector<IChangeRecord::TPtr>& records); +        explicit TEvRecords(TVector<IChangeRecord::TPtr>&& records); +        TString ToString() const override; +    }; + +    struct TEvForgetRecords: public TEventLocal<TEvForgetRecords, EvForgetRecods> { +        TVector<ui64> Records; + +        explicit TEvForgetRecords(const TVector<ui64>& records); +        explicit TEvForgetRecords(TVector<ui64>&& records); +        TString ToString() const override; +    }; + +}; // TEvChangeExchange + +} + +Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo, o, x) { +    return x.Out(o); +} + +Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo, o, x) { +    return x.Out(o); +} diff --git a/ydb/core/change_exchange/ya.make b/ydb/core/change_exchange/ya.make index 1ba88203fef..a00055ba545 100644 --- a/ydb/core/change_exchange/ya.make +++ b/ydb/core/change_exchange/ya.make @@ -1,11 +1,17 @@  LIBRARY()  SRCS( +    change_exchange.cpp      change_record.cpp  )  GENERATE_ENUM_SERIALIZATION(change_record.h) +PEERDIR( +    ydb/core/base +    ydb/core/scheme +) +  YQL_LAST_ABI_VERSION()  END() diff --git a/ydb/core/tx/datashard/change_exchange.cpp b/ydb/core/tx/datashard/change_exchange.cpp index 99c7f6cec7c..752667384f3 100644 --- a/ydb/core/tx/datashard/change_exchange.cpp +++ b/ydb/core/tx/datashard/change_exchange.cpp @@ -1,127 +1,8 @@  #include "change_exchange.h"  #include <util/string/builder.h> -#include <util/string/join.h> -namespace NKikimr { -namespace NDataShard { - -/// TEvEnqueueRecords -TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(const TVector<TRecordInfo>& records) -    : Records(records) -{ -} - -TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(TVector<TRecordInfo>&& records) -    : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvEnqueueRecords::ToString() const { -    return TStringBuilder() << ToStringHeader() << " {" -        << " Records [" << JoinSeq(",", Records) << "]" -    << " }"; -} - -TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize) -    : Order(order) -    , PathId(pathId) -    , BodySize(bodySize) -{ -} - -void TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::Out(IOutputStream& out) const { -    out << "{" -        << " Order: " << Order -        << " PathId: " << PathId -        << " BodySize: " << BodySize -    << " }"; -} - -/// TEvRequestRecords -TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(const TVector<TRecordInfo>& records) -    : Records(records) -{ -} - -TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(TVector<TRecordInfo>&& records) -    : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvRequestRecords::ToString() const { -    return TStringBuilder() << ToStringHeader() << " {" -        << " Records [" << JoinSeq(",", Records) << "]" -    << " }"; -} - -TEvChangeExchange::TEvRequestRecords::TRecordInfo::TRecordInfo(ui64 order, ui64 bodySize) -    : Order(order) -    , BodySize(bodySize) -{ -} - -bool TEvChangeExchange::TEvRequestRecords::TRecordInfo::operator<(const TRecordInfo& rhs) const { -    return Order < rhs.Order; -} - -void TEvChangeExchange::TEvRequestRecords::TRecordInfo::Out(IOutputStream& out) const { -    out << "{" -        << " Order: " << Order -        << " BodySize: " << BodySize -    << " }"; -} - -/// TEvRemoveRecords -TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(const TVector<ui64>& records) -    : Records(records) -{ -} - -TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(TVector<ui64>&& records) -    : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvRemoveRecords::ToString() const { -    return TStringBuilder() << ToStringHeader() << " {" -        << " Records [" << JoinSeq(",", Records) << "]" -    << " }"; -} - -/// TEvRecords -TEvChangeExchange::TEvRecords::TEvRecords(const TVector<NChangeExchange::IChangeRecord::TPtr>& records) -    : Records(records) -{ -} - -TEvChangeExchange::TEvRecords::TEvRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) -    : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvRecords::ToString() const { -    return TStringBuilder() << ToStringHeader() << " {" -        << " Records [" << JoinSeq(",", Records) << "]" -    << " }"; -} - -/// TEvForgetRecords -TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(const TVector<ui64>& records) -    : Records(records) -{ -} - -TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(TVector<ui64>&& records) -    : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvForgetRecords::ToString() const { -    return TStringBuilder() << ToStringHeader() << " {" -        << " Records [" << JoinSeq(",", Records) << "]" -    << " }"; -} +namespace NKikimr::NDataShard {  /// TEvAddSender  TEvChangeExchange::TEvAddSender::TEvAddSender(const TTableId& userTableId, TEvChangeExchange::ESenderType type, const TPathId& pathId) @@ -151,5 +32,4 @@ TString TEvChangeExchange::TEvRemoveSender::ToString() const {      << " }";  } -} // NDataShard -} // NKikimr +} diff --git a/ydb/core/tx/datashard/change_exchange.h b/ydb/core/tx/datashard/change_exchange.h index 226a5125f5d..548fe14e316 100644 --- a/ydb/core/tx/datashard/change_exchange.h +++ b/ydb/core/tx/datashard/change_exchange.h @@ -2,14 +2,13 @@  #include "defs.h" -#include <ydb/core/change_exchange/change_record.h>  #include <ydb/core/protos/change_exchange.pb.h> +#include <ydb/core/scheme/scheme_pathid.h>  #include <ydb/core/scheme/scheme_tabledefs.h>  #include <util/generic/vector.h> -namespace NKikimr { -namespace NDataShard { +namespace NKikimr::NDataShard {  class TDataShard; @@ -17,7 +16,7 @@ struct TEvChangeExchange {      enum EEv {          /// Network exchange protocol          // Handshake between sender & receiver -        EvHandshake = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE), +        EvHandshake = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE_DATASHARD),          // Apply change record(s) on receiver          EvApplyRecords,          // Handshake & application status @@ -27,30 +26,17 @@ struct TEvChangeExchange {          EvActivateSenderAck,          /// Local exchange (mostly using change's id) -        // Enqueue for sending -        EvEnqueueRecords, -        // Request change record(s) by id -        EvRequestRecords, -        // Change record(s) -        EvRecords, -        // Remove change record(s) from local database -        EvRemoveRecords, -          // Add new change sender          EvAddSender,          // Remove existing change sender          EvRemoveSender, - -        // Already removed records that the sender should forget about -        EvForgetRecods, -          // Split/merge          EvSplitAck,          EvEnd,      }; -    static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE)); +    static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE_DATASHARD));      /// Network events      struct TEvHandshake: public TEventPB<TEvHandshake, NKikimrChangeExchange::TEvHandshake, EvHandshake> {}; @@ -60,66 +46,6 @@ struct TEvChangeExchange {      struct TEvActivateSenderAck: public TEventPB<TEvActivateSenderAck, NKikimrChangeExchange::TEvActivateSenderAck, EvActivateSenderAck> {};      /// Local events -    struct TEvEnqueueRecords: public TEventLocal<TEvEnqueueRecords, EvEnqueueRecords> { -        struct TRecordInfo { -            ui64 Order; -            TPathId PathId; -            ui64 BodySize; - -            TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize); - -            void Out(IOutputStream& out) const; -        }; - -        TVector<TRecordInfo> Records; - -        explicit TEvEnqueueRecords(const TVector<TRecordInfo>& records); -        explicit TEvEnqueueRecords(TVector<TRecordInfo>&& records); -        TString ToString() const override; -    }; - -    struct TEvRequestRecords: public TEventLocal<TEvRequestRecords, EvRequestRecords> { -        struct TRecordInfo { -            ui64 Order; -            ui64 BodySize; - -            TRecordInfo(ui64 order, ui64 bodySize = 0); - -            bool operator<(const TRecordInfo& rhs) const; -            void Out(IOutputStream& out) const; -        }; - -        TVector<TRecordInfo> Records; - -        explicit TEvRequestRecords(const TVector<TRecordInfo>& records); -        explicit TEvRequestRecords(TVector<TRecordInfo>&& records); -        TString ToString() const override; -    }; - -    struct TEvRemoveRecords: public TEventLocal<TEvRemoveRecords, EvRemoveRecords> { -        TVector<ui64> Records; - -        explicit TEvRemoveRecords(const TVector<ui64>& records); -        explicit TEvRemoveRecords(TVector<ui64>&& records); -        TString ToString() const override; -    }; - -    struct TEvRecords: public TEventLocal<TEvRecords, EvRecords> { -        TVector<NChangeExchange::IChangeRecord::TPtr> Records; - -        explicit TEvRecords(const TVector<NChangeExchange::IChangeRecord::TPtr>& records); -        explicit TEvRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records); -        TString ToString() const override; -    }; - -    struct TEvForgetRecords: public TEventLocal<TEvForgetRecords, EvForgetRecods> { -        TVector<ui64> Records; - -        explicit TEvForgetRecords(const TVector<ui64>& records); -        explicit TEvForgetRecords(TVector<ui64>&& records); -        TString ToString() const override; -    }; -      enum class ESenderType {          AsyncIndex,          CdcStream, @@ -149,13 +75,4 @@ struct TEvChangeExchange {  IActor* CreateChangeSender(const TDataShard* self);  IActor* CreateChangeExchangeSplit(const TDataShard* self, const TVector<ui64>& dstDataShards); -} // NDataShard -} // NKikimr - -Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo, o, x) { -    return x.Out(o); -} - -Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TEvChangeExchange::TEvRequestRecords::TRecordInfo, o, x) { -    return x.Out(o);  } diff --git a/ydb/core/tx/datashard/change_sender.cpp b/ydb/core/tx/datashard/change_sender.cpp index 825ffe90806..1f617d252e4 100644 --- a/ydb/core/tx/datashard/change_sender.cpp +++ b/ydb/core/tx/datashard/change_sender.cpp @@ -3,12 +3,12 @@  #include "change_sender_monitoring.h"  #include "datashard_impl.h" -#include <ydb/library/services/services.pb.h> - +#include <ydb/core/change_exchange/change_exchange.h>  #include <ydb/library/actors/core/actor.h>  #include <ydb/library/actors/core/hfunc.h>  #include <ydb/library/actors/core/log.h>  #include <ydb/library/actors/core/mon.h> +#include <ydb/library/services/services.pb.h>  #include <library/cpp/monlib/service/pages/mon_page.h>  #include <library/cpp/monlib/service/pages/templates.h> @@ -19,7 +19,7 @@ namespace NKikimr::NDataShard {  class TChangeSender: public TActor<TChangeSender> {      using ESenderType = TEvChangeExchange::ESenderType; -    using TEnqueuedRecord = TEvChangeExchange::TEvEnqueueRecords::TRecordInfo; +    using TEnqueuedRecord = NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo;      struct TSender {          TTableId UserTableId; @@ -67,7 +67,7 @@ class TChangeSender: public TActor<TChangeSender> {          sender.ActorId = RegisterChangeSender(pathId, sender.UserTableId, sender.Type);      } -    void Handle(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { +    void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {          LOG_D("Handle " << ev->Get()->ToString());          auto& records = ev->Get()->Records; @@ -92,11 +92,11 @@ class TChangeSender: public TActor<TChangeSender> {          }          for (auto& [to, records] : forward) { -            Send(to, new TEvChangeExchange::TEvEnqueueRecords(std::move(records))); +            Send(to, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(std::move(records)));          }          if (remove) { -            Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); +            Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(remove)));          }      } @@ -289,7 +289,7 @@ public:      STFUNC(StateBase) {          switch (ev->GetTypeRewrite()) { -            hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle); +            hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, Handle);              hFunc(TEvChangeExchange::TEvAddSender, Handle);              hFunc(TEvChangeExchange::TEvRemoveSender, Handle);              HFunc(NMon::TEvRemoteHttpInfo, Handle); diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 9ba3b7c0378..d8356013456 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -115,13 +115,13 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS      STATEFN(StateWaitingRecords) {          switch (ev->GetTypeRewrite()) { -            hFunc(TEvChangeExchange::TEvRecords, Handle); +            hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle);          default:              return StateBase(ev);          }      } -    void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) { +    void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) {          LOG_D("Handle " << ev->Get()->ToString());          auto records = MakeHolder<TEvChangeExchange::TEvApplyRecords>(); @@ -738,17 +738,17 @@ class TAsyncIndexChangeSenderMain          return new TAsyncIndexChangeSenderShard(SelfId(), DataShard, partitionId, IndexTablePathId, TagMap);      } -    void Handle(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { +    void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {          LOG_D("Handle " << ev->Get()->ToString());          EnqueueRecords(std::move(ev->Get()->Records));      } -    void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) { +    void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) {          LOG_D("Handle " << ev->Get()->ToString());          ProcessRecords(std::move(ev->Get()->Records));      } -    void Handle(TEvChangeExchange::TEvForgetRecords::TPtr& ev) { +    void Handle(NChangeExchange::TEvChangeExchange::TEvForgetRecords::TPtr& ev) {          LOG_D("Handle " << ev->Get()->ToString());          ForgetRecords(std::move(ev->Get()->Records));      } @@ -771,7 +771,7 @@ class TAsyncIndexChangeSenderMain          PassAway();      } -    void AutoRemove(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { +    void AutoRemove(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {          LOG_D("Handle " << ev->Get()->ToString());          RemoveRecords(std::move(ev->Get()->Records));      } @@ -804,9 +804,9 @@ public:      STFUNC(StateBase) {          switch (ev->GetTypeRewrite()) { -            hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle); -            hFunc(TEvChangeExchange::TEvRecords, Handle); -            hFunc(TEvChangeExchange::TEvForgetRecords, Handle); +            hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, Handle); +            hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); +            hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle);              hFunc(TEvChangeExchange::TEvRemoveSender, Handle);              hFunc(TEvChangeExchangePrivate::TEvReady, Handle);              hFunc(TEvChangeExchangePrivate::TEvGone, Handle); @@ -817,7 +817,7 @@ public:      STFUNC(StatePendingRemove) {          switch (ev->GetTypeRewrite()) { -            hFunc(TEvChangeExchange::TEvEnqueueRecords, AutoRemove); +            hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, AutoRemove);              hFunc(TEvChangeExchange::TEvRemoveSender, Handle);              HFunc(NMon::TEvRemoteHttpInfo, Handle);              sFunc(TEvents::TEvPoison, PassAway); diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 0d82b1eedf0..60e107569e6 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -83,14 +83,14 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti      STATEFN(StateWaitingRecords) {          switch (ev->GetTypeRewrite()) { -            hFunc(TEvChangeExchange::TEvRecords, Handle); +            hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle);              sFunc(TEvPartitionWriter::TEvWriteResponse, Lost);          default:              return StateBase(ev);          }      } -    void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) { +    void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) {          LOG_D("Handle " << ev->Get()->ToString());          NKikimrClient::TPersQueueRequest request; @@ -703,17 +703,17 @@ class TCdcChangeSenderMain          return new TCdcChangeSenderPartition(SelfId(), DataShard, partitionId, shardId, Stream);      } -    void Handle(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { +    void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {          LOG_D("Handle " << ev->Get()->ToString());          EnqueueRecords(std::move(ev->Get()->Records));      } -    void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) { +    void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) {          LOG_D("Handle " << ev->Get()->ToString());          ProcessRecords(std::move(ev->Get()->Records));      } -    void Handle(TEvChangeExchange::TEvForgetRecords::TPtr& ev) { +    void Handle(NChangeExchange::TEvChangeExchange::TEvForgetRecords::TPtr& ev) {          LOG_D("Handle " << ev->Get()->ToString());          ForgetRecords(std::move(ev->Get()->Records));      } @@ -736,7 +736,7 @@ class TCdcChangeSenderMain          PassAway();      } -    void AutoRemove(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { +    void AutoRemove(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {          LOG_D("Handle " << ev->Get()->ToString());          RemoveRecords(std::move(ev->Get()->Records));      } @@ -768,9 +768,9 @@ public:      STFUNC(StateBase) {          switch (ev->GetTypeRewrite()) { -            hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle); -            hFunc(TEvChangeExchange::TEvRecords, Handle); -            hFunc(TEvChangeExchange::TEvForgetRecords, Handle); +            hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, Handle); +            hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); +            hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle);              hFunc(TEvChangeExchange::TEvRemoveSender, Handle);              hFunc(TEvChangeExchangePrivate::TEvReady, Handle);              hFunc(TEvChangeExchangePrivate::TEvGone, Handle); @@ -781,7 +781,7 @@ public:      STFUNC(StatePendingRemove) {          switch (ev->GetTypeRewrite()) { -            hFunc(TEvChangeExchange::TEvEnqueueRecords, AutoRemove); +            hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, AutoRemove);              hFunc(TEvChangeExchange::TEvRemoveSender, Handle);              HFunc(NMon::TEvRemoteHttpInfo, Handle);              sFunc(TEvents::TEvPoison, PassAway); diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index d46d467daaa..1c57d3c17a7 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -82,7 +82,7 @@ void TBaseChangeSender::KillSenders() {      }  } -void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) { +void TBaseChangeSender::EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) {      for (auto& record : records) {          Y_VERIFY_S(PathId == record.PathId, "Unexpected record's path id"              << ": expected# " << PathId @@ -117,7 +117,7 @@ bool TBaseChangeSender::RequestRecords() {          return false;      } -    ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRequestRecords(std::move(records))); +    ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRequestRecords(std::move(records)));      return true;  } @@ -290,7 +290,7 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) {      }      Y_ABORT_UNLESS(sender.ActorId); -    ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {}))); +    ActorOps->Send(sender.ActorId, new NChangeExchange::TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {})));  }  void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) { diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h index 45351ee5af3..6b418ebf7d8 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/tx/datashard/change_sender_common_ops.h @@ -4,7 +4,7 @@  #include "change_exchange_helpers.h"  #include <ydb/core/base/appdata.h> - +#include <ydb/core/change_exchange/change_exchange.h>  #include <ydb/library/actors/core/actor.h>  #include <ydb/library/actors/core/hfunc.h>  #include <ydb/library/actors/core/mon.h> @@ -61,7 +61,7 @@ public:      virtual IActor* CreateSender(ui64 partitionId) = 0;      virtual void RemoveRecords() = 0; -    virtual void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) = 0; +    virtual void EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) = 0;      virtual void ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) = 0;      virtual void ForgetRecords(TVector<ui64>&& records) = 0;      virtual void OnReady(ui64 partitionId) = 0; @@ -79,8 +79,8 @@ public:  };  class TBaseChangeSender: public IChangeSender { -    using TEnqueuedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; -    using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; +    using TEnqueuedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo; +    using TRequestedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo;      struct TSender {          TActorId ActorId; @@ -124,19 +124,19 @@ protected:              remove.push_back(record.Order);          } -        ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); +        ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(remove)));      }      template <>      void RemoveRecords(TVector<ui64>&& records) { -        ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(records))); +        ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(records)));      }      void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) override;      void KillSenders() override;      void RemoveRecords() override; -    void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) override; +    void EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) override;      void ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) override;      void ForgetRecords(TVector<ui64>&& records) override;      void OnReady(ui64 partitionId) override; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index c251597f7ce..dcc2cec7abb 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -939,7 +939,7 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange          << ", records: " << JoinSeq(", ", records));      const auto now = AppData()->TimeProvider->Now(); -    TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size())); +    TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size()));      for (const auto& record : records) {          forward.emplace_back(record.Order, record.PathId, record.BodySize); @@ -966,7 +966,7 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange      SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());      Y_ABORT_UNLESS(OutChangeSender); -    Send(OutChangeSender, new TEvChangeExchange::TEvEnqueueRecords(std::move(forward))); +    Send(OutChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));  }  void TDataShard::UpdateChangeExchangeLag(TInstant now) { diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp index a49a99c488e..d70874dd64a 100644 --- a/ydb/core/tx/datashard/datashard_change_sending.cpp +++ b/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -233,7 +233,7 @@ public:              LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Send " << records.size() << " change records"                  << ": to# " << to                  << ", at tablet# " << Self->TabletID()); -            ctx.Send(to, new TEvChangeExchange::TEvRecords(std::move(records))); +            ctx.Send(to, new NChangeExchange::TEvChangeExchange::TEvRecords(std::move(records)));          }          size_t forgotten = 0; @@ -246,7 +246,7 @@ public:              LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Forget " << records.size() << " change records"                  << ": to# " << to                  << ", at tablet# " << Self->TabletID()); -            ctx.Send(to, new TEvChangeExchange::TEvForgetRecords(std::move(records))); +            ctx.Send(to, new NChangeExchange::TEvChangeExchange::TEvForgetRecords(std::move(records)));          }          size_t left = Accumulate(Self->ChangeRecordsRequested, (size_t)0, [](size_t sum, const auto& kv) { @@ -408,7 +408,7 @@ private:  }; // TTxChangeExchangeSplitAck  /// Request -void TDataShard::Handle(TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx) { +void TDataShard::Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx) {      ChangeRecordsRequested[ev->Sender].insert(ev->Get()->Records.begin(), ev->Get()->Records.end());      SetCounter(COUNTER_CHANGE_QUEUE_SIZE, Accumulate(ChangeRecordsRequested, (size_t)0, [](size_t sum, const auto& kv) {          return sum + kv.second.size(); @@ -428,7 +428,7 @@ void TDataShard::Handle(TEvPrivate::TEvRequestChangeRecords::TPtr&, const TActor  }  /// Remove -void TDataShard::Handle(TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx) { +void TDataShard::Handle(NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx) {      ChangeRecordsToRemove.insert(ev->Get()->Records.begin(), ev->Get()->Records.end());      ScheduleRemoveChangeRecords(ctx);  } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 2d5ff957122..872410ac2c1 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -33,6 +33,7 @@  #include <ydb/core/base/appdata.h>  #include <ydb/core/base/tablet_pipe.h>  #include <ydb/library/ydb_issue/issue_helpers.h> +#include <ydb/core/change_exchange/change_exchange.h>  #include <ydb/core/engine/mkql_engine_flat_host.h>  #include <ydb/core/tablet/pipe_tracker.h>  #include <ydb/core/tablet/tablet_exception.h> @@ -1304,8 +1305,8 @@ class TDataShard      void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx);      // change sending -    void Handle(TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx); -    void Handle(TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx); +    void Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx); +    void Handle(NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx);      void ScheduleRequestChangeRecords(const TActorContext& ctx);      void ScheduleRemoveChangeRecords(const TActorContext& ctx);      void Handle(TEvPrivate::TEvRequestChangeRecords::TPtr& ev, const TActorContext& ctx); @@ -2729,7 +2730,7 @@ private:          }      }; -    using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; +    using TRequestedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo;      // split/merge      TChangeSenderActivator ChangeSenderActivator; @@ -2972,8 +2973,8 @@ protected:              HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);              IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted);              IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyUnavailable); -            HFunc(TEvChangeExchange::TEvRequestRecords, Handle); -            HFunc(TEvChangeExchange::TEvRemoveRecords, Handle); +            HFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle); +            HFunc(NChangeExchange::TEvChangeExchange::TEvRemoveRecords, Handle);              HFunc(TEvPrivate::TEvRequestChangeRecords, Handle);              HFunc(TEvPrivate::TEvRemoveChangeRecords, Handle);              HFunc(TEvChangeExchange::TEvHandshake, Handle); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index e370a9b9a4d..a744dbfeb5f 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -218,20 +218,20 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {          runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {              switch (ev->GetTypeRewrite()) { -            case TEvChangeExchange::EvEnqueueRecords: -                for (const auto& record : ev->Get<TEvChangeExchange::TEvEnqueueRecords>()->Records) { +            case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: +                for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords>()->Records) {                      enqueued.insert(record.Order);                  }                  break; -            case TEvChangeExchange::EvRequestRecords: -                for (const auto& record : ev->Get<TEvChangeExchange::TEvRequestRecords>()->Records) { +            case NChangeExchange::TEvChangeExchange::EvRequestRecords: +                for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRequestRecords>()->Records) {                      requested.insert(record.Order);                  }                  break; -            case TEvChangeExchange::EvRemoveRecords: -                for (const auto& record : ev->Get<TEvChangeExchange::TEvRemoveRecords>()->Records) { +            case NChangeExchange::TEvChangeExchange::EvRemoveRecords: +                for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRemoveRecords>()->Records) {                      removed.insert(record);                  }                  break; @@ -307,14 +307,14 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {                      return TTestActorRuntime::EEventAction::PROCESS;                  } -            case TEvChangeExchange::EvEnqueueRecords: -                for (const auto& record : ev->Get<TEvChangeExchange::TEvEnqueueRecords>()->Records) { +            case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: +                for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords>()->Records) {                      enqueued.insert(record.Order);                  }                  break; -            case TEvChangeExchange::EvRemoveRecords: -                for (const auto& record : ev->Get<TEvChangeExchange::TEvRemoveRecords>()->Records) { +            case NChangeExchange::TEvChangeExchange::EvRemoveRecords: +                for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRemoveRecords>()->Records) {                      removed.insert(record);                  }                  break; @@ -371,7 +371,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {          bool inited = false;          runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {              switch (ev->GetTypeRewrite()) { -            case TEvChangeExchange::EvEnqueueRecords: +            case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:                  delayed.emplace_back(ev.Release());                  return TTestActorRuntime::EEventAction::DROP; @@ -416,14 +416,14 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {          THashSet<ui64> removed;          runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {              switch (ev->GetTypeRewrite()) { -            case TEvChangeExchange::EvEnqueueRecords: -                for (const auto& record : ev->Get<TEvChangeExchange::TEvEnqueueRecords>()->Records) { +            case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: +                for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords>()->Records) {                      enqueued.insert(record.Order);                  }                  break; -            case TEvChangeExchange::EvRemoveRecords: -                for (const auto& record : ev->Get<TEvChangeExchange::TEvRemoveRecords>()->Records) { +            case NChangeExchange::TEvChangeExchange::EvRemoveRecords: +                for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRemoveRecords>()->Records) {                      removed.insert(record);                  }                  break; @@ -481,7 +481,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {          runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {              switch (ev->GetTypeRewrite()) { -            case TEvChangeExchange::EvEnqueueRecords: +            case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:                  if (preventEnqueueing) {                      enqueued.emplace_back(ev.Release());                      return TTestActorRuntime::EEventAction::DROP; @@ -636,7 +636,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {          runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {              switch (ev->GetTypeRewrite()) { -            case TEvChangeExchange::EvEnqueueRecords: +            case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:                  if (preventEnqueueing) {                      enqueued.emplace_back(ev.Release());                      return TTestActorRuntime::EEventAction::DROP; @@ -1886,7 +1886,7 @@ Y_UNIT_TEST_SUITE(Cdc) {          env.GetServer()->GetRuntime()->SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {              switch (ev->GetTypeRewrite()) { -            case TEvChangeExchange::EvEnqueueRecords: +            case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:                  if (preventEnqueueing) {                      enqueued.emplace_back(ev.Release());                      return TTestActorRuntime::EEventAction::DROP; @@ -2143,7 +2143,7 @@ Y_UNIT_TEST_SUITE(Cdc) {          env.GetServer()->GetRuntime()->SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {              switch (ev->GetTypeRewrite()) { -            case TEvChangeExchange::EvEnqueueRecords: +            case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:                  if (preventEnqueueing) {                      enqueued.emplace_back(ev.Release());                      return TTestActorRuntime::EEventAction::DROP; @@ -2240,7 +2240,7 @@ Y_UNIT_TEST_SUITE(Cdc) {          env.GetServer()->GetRuntime()->SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {              switch (ev->GetTypeRewrite()) { -            case TEvChangeExchange::EvEnqueueRecords: +            case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:                  if (preventEnqueueing || (preventEnqueueingOnSpecificSender && *preventEnqueueingOnSpecificSender == ev->Recipient)) {                      enqueued.emplace_back(ev.Release());                      return TTestActorRuntime::EEventAction::DROP; @@ -2432,7 +2432,7 @@ Y_UNIT_TEST_SUITE(Cdc) {          TVector<THolder<IEventHandle>> enqueued;          auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { -            if (ev->GetTypeRewrite() == TEvChangeExchange::EvEnqueueRecords) { +            if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvEnqueueRecords) {                  enqueued.emplace_back(ev.Release());                  return TTestActorRuntime::EEventAction::DROP;              } @@ -2474,8 +2474,8 @@ Y_UNIT_TEST_SUITE(Cdc) {          THashSet<ui64> enqueued;          runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { -            if (ev->GetTypeRewrite() == TEvChangeExchange::EvEnqueueRecords) { -                for (const auto& record : ev->Get<TEvChangeExchange::TEvEnqueueRecords>()->Records) { +            if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvEnqueueRecords) { +                for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords>()->Records) {                      enqueued.insert(record.Order);                  } @@ -2506,8 +2506,8 @@ Y_UNIT_TEST_SUITE(Cdc) {          THashSet<ui64> removed;          runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { -            if (ev->GetTypeRewrite() == TEvChangeExchange::EvRemoveRecords) { -                for (const auto& record : ev->Get<TEvChangeExchange::TEvRemoveRecords>()->Records) { +            if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvRemoveRecords) { +                for (const auto& record : ev->Get<NChangeExchange::TEvChangeExchange::TEvRemoveRecords>()->Records) {                      removed.insert(record);                  }              } @@ -2743,7 +2743,7 @@ Y_UNIT_TEST_SUITE(Cdc) {                  }                  break; -            case TEvChangeExchange::EvEnqueueRecords: +            case NChangeExchange::TEvChangeExchange::EvEnqueueRecords:                  delayed.emplace_back(ev.Release());                  return TTestActorRuntime::EEventAction::DROP; diff --git a/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp b/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp index 0d05b12d68c..18bd1d0a45d 100644 --- a/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp +++ b/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp @@ -1,6 +1,6 @@  #include <ydb/core/base/path.h> +#include <ydb/core/change_exchange/change_exchange.h>  #include <ydb/core/scheme/scheme_tablecell.h> -#include <ydb/core/tx/datashard/change_exchange.h>  #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>  #include <ydb/core/tx/schemeshard/ut_helpers/test_with_reboots.h>  #include <ydb/core/testlib/tablet_helpers.h> @@ -371,7 +371,7 @@ Y_UNIT_TEST_SUITE(TAsyncIndexTests) {              TVector<THolder<IEventHandle>> enqueued;              runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { -                if (ev->GetTypeRewrite() == NDataShard::TEvChangeExchange::EvEnqueueRecords) { +                if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvEnqueueRecords) {                      enqueued.emplace_back(ev.Release());                      return TTestActorRuntime::EEventAction::DROP;                  }  | 
