diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-02-13 19:50:39 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-02-13 19:50:39 +0300 |
commit | d25d2081f180c3ccd714acfc88b87341559d38e5 (patch) | |
tree | ce43ca2d4f3704718c8c4233437f7738b147e654 | |
parent | 243a672a56f96b4fb637f3fe1f749e7b8c576530 (diff) | |
download | ydb-d25d2081f180c3ccd714acfc88b87341559d38e5.tar.gz |
Drop replication
30 files changed, 861 insertions, 56 deletions
diff --git a/ydb/core/protos/counters_replication.proto b/ydb/core/protos/counters_replication.proto index b3c86ad17e5..e115e72867c 100644 --- a/ydb/core/protos/counters_replication.proto +++ b/ydb/core/protos/counters_replication.proto @@ -23,12 +23,14 @@ enum EPercentileCounters { } enum ETxTypes { - TXTYPE_INIT_SCHEMA = 0 [(TxTypeOpts) = {Name: "TxInitSchema"}]; - TXTYPE_INIT = 1 [(TxTypeOpts) = {Name: "TxInit"}]; - TXTYPE_CREATE_REPLICATION = 2 [(TxTypeOpts) = {Name: "TxCreateReplication"}]; - TXTYPE_DROP_REPLICATION = 3 [(TxTypeOpts) = {Name: "TxDropReplication"}]; - TXTYPE_DISCOVERY_RESULT = 4 [(TxTypeOpts) = {Name: "TxDiscoveryResult"}]; - TXTYPE_ASSIGN_STREAM_NAME = 5 [(TxTypeOpts) = {Name: "TxAssignStreamName"}]; + TXTYPE_INIT_SCHEMA = 0 [(TxTypeOpts) = {Name: "TxInitSchema"}]; + TXTYPE_INIT = 1 [(TxTypeOpts) = {Name: "TxInit"}]; + TXTYPE_CREATE_REPLICATION = 2 [(TxTypeOpts) = {Name: "TxCreateReplication"}]; + TXTYPE_DROP_REPLICATION = 3 [(TxTypeOpts) = {Name: "TxDropReplication"}]; + TXTYPE_DISCOVERY_RESULT = 4 [(TxTypeOpts) = {Name: "TxDiscoveryResult"}]; + TXTYPE_ASSIGN_STREAM_NAME = 5 [(TxTypeOpts) = {Name: "TxAssignStreamName"}]; TXTYPE_CREATE_STREAM_RESULT = 6 [(TxTypeOpts) = {Name: "TxCreateStreamResult"}]; - TXTYPE_CREATE_DST_RESULT = 7 [(TxTypeOpts) = {Name: "TxCreateDstResult"}]; + TXTYPE_CREATE_DST_RESULT = 7 [(TxTypeOpts) = {Name: "TxCreateDstResult"}]; + TXTYPE_DROP_STREAM_RESULT = 8 [(TxTypeOpts) = {Name: "TxDropStreamResult"}]; + TXTYPE_DROP_DST_RESULT = 9 [(TxTypeOpts) = {Name: "TxDropDstResult"}]; } diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 837a9d4e6d6..f868ce8bdc6 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -964,5 +964,7 @@ message TActivity { SCHEMESHARD_BORROWED_COMPACTION = 601; CDC_STREAM_SCAN_ACTOR = 602; SCHEMESHARD_CDC_STREAM_SCAN_FINALIZER = 603; + REPLICATION_CONTROLLER_STREAM_REMOVER = 604; + REPLICATION_CONTROLLER_DST_REMOVER = 605; }; }; diff --git a/ydb/core/tx/replication/controller/CMakeLists.darwin.txt b/ydb/core/tx/replication/controller/CMakeLists.darwin.txt index b5e406fb9f2..eb802ebe22c 100644 --- a/ydb/core/tx/replication/controller/CMakeLists.darwin.txt +++ b/ydb/core/tx/replication/controller/CMakeLists.darwin.txt @@ -27,10 +27,12 @@ target_sources(tx-replication-controller PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/discoverer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_creator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_remover.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/logging.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/private_events.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_creator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_remover.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/sys_params.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_base.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_table.cpp @@ -40,7 +42,9 @@ target_sources(tx-replication-controller PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_stream_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_result.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_replication.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_init.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_init_schema.cpp ) diff --git a/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt b/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt index 49eef8a42b8..cd653db9c3e 100644 --- a/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt @@ -28,10 +28,12 @@ target_sources(tx-replication-controller PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/discoverer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_creator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_remover.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/logging.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/private_events.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_creator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_remover.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/sys_params.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_base.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_table.cpp @@ -41,7 +43,9 @@ target_sources(tx-replication-controller PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_stream_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_result.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_replication.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_init.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_init_schema.cpp ) diff --git a/ydb/core/tx/replication/controller/CMakeLists.linux.txt b/ydb/core/tx/replication/controller/CMakeLists.linux.txt index 49eef8a42b8..cd653db9c3e 100644 --- a/ydb/core/tx/replication/controller/CMakeLists.linux.txt +++ b/ydb/core/tx/replication/controller/CMakeLists.linux.txt @@ -28,10 +28,12 @@ target_sources(tx-replication-controller PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/discoverer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_creator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_remover.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/logging.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/private_events.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_creator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_remover.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/sys_params.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_base.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_table.cpp @@ -41,7 +43,9 @@ target_sources(tx-replication-controller PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_stream_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_result.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_replication.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_init.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_init_schema.cpp ) diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index 89fa8c4b85d..63e1ef07a1c 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -49,10 +49,13 @@ STFUNC(TController::StateWork) { switch (ev->GetTypeRewrite()) { HFunc(TEvController::TEvCreateReplication, Handle); HFunc(TEvController::TEvDropReplication, Handle); + HFunc(TEvPrivate::TEvDropReplication, Handle); HFunc(TEvPrivate::TEvDiscoveryResult, Handle); HFunc(TEvPrivate::TEvAssignStreamName, Handle); HFunc(TEvPrivate::TEvCreateStreamResult, Handle); + HFunc(TEvPrivate::TEvDropStreamResult, Handle); HFunc(TEvPrivate::TEvCreateDstResult, Handle); + HFunc(TEvPrivate::TEvDropDstResult, Handle); HFunc(TEvents::TEvPoison, Handle); } } @@ -84,6 +87,11 @@ void TController::Handle(TEvController::TEvDropReplication::TPtr& ev, const TAct RunTxDropReplication(ev, ctx); } +void TController::Handle(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + RunTxDropReplication(ev, ctx); +} + void TController::Handle(TEvPrivate::TEvDiscoveryResult::TPtr& ev, const TActorContext& ctx) { CLOG_T(ctx, "Handle " << ev->Get()->ToString()); RunTxDiscoveryResult(ev, ctx); @@ -99,11 +107,21 @@ void TController::Handle(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TAct RunTxCreateStreamResult(ev, ctx); } +void TController::Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + RunTxDropStreamResult(ev, ctx); +} + void TController::Handle(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx) { CLOG_T(ctx, "Handle " << ev->Get()->ToString()); RunTxCreateDstResult(ev, ctx); } +void TController::Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + RunTxDropDstResult(ev, ctx); +} + void TController::Handle(TEvents::TEvPoison::TPtr& ev, const TActorContext& ctx) { CLOG_T(ctx, "Handle " << ev->Get()->ToString()); @@ -133,6 +151,16 @@ TReplication::TPtr TController::Find(const TPathId& pathId) { return it->second; } +void TController::Remove(ui64 id) { + auto it = Replications.find(id); + if (it == Replications.end()) { + return; + } + + ReplicationsByPathId.erase(it->second->GetPathId()); + Replications.erase(it); +} + } // NController IActor* CreateController(const TActorId& tablet, TTabletStorageInfo* info) { diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index f09fdc9e88a..69defe2fdf3 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -62,10 +62,13 @@ private: // handlers void Handle(TEvController::TEvCreateReplication::TPtr& ev, const TActorContext& ctx); void Handle(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvDiscoveryResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvents::TEvPoison::TPtr& ev, const TActorContext& ctx); // local transactions @@ -76,17 +79,22 @@ private: class TTxDiscoveryResult; class TTxAssignStreamName; class TTxCreateStreamResult; + class TTxDropStreamResult; class TTxCreateDstResult; + class TTxDropDstResult; // tx runners void RunTxInitSchema(const TActorContext& ctx); void RunTxInit(const TActorContext& ctx); void RunTxCreateReplication(TEvController::TEvCreateReplication::TPtr& ev, const TActorContext& ctx); void RunTxDropReplication(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx); + void RunTxDropReplication(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx); void RunTxDiscoveryResult(TEvPrivate::TEvDiscoveryResult::TPtr& ev, const TActorContext& ctx); void RunTxAssignStreamName(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx); void RunTxCreateStreamResult(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx); + void RunTxDropStreamResult(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx); void RunTxCreateDstResult(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx); + void RunTxDropDstResult(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx); // other template <typename T> @@ -106,6 +114,7 @@ private: TReplication::TPtr Find(ui64 id); TReplication::TPtr Find(const TPathId& pathId); + void Remove(ui64 id); private: const TTabletLogPrefix LogPrefix; diff --git a/ydb/core/tx/replication/controller/dst_remover.cpp b/ydb/core/tx/replication/controller/dst_remover.cpp new file mode 100644 index 00000000000..0d4bf9d5eb2 --- /dev/null +++ b/ydb/core/tx/replication/controller/dst_remover.cpp @@ -0,0 +1,185 @@ +#include "dst_remover.h" +#include "logging.h" +#include "private_events.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> + +#include <ydb/core/base/tablet_pipecache.h> +#include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/core/tx/tx_proxy/proxy.h> + +namespace NKikimr::NReplication::NController { + +using namespace NSchemeShard; + +class TDstRemover: public TActorBootstrapped<TDstRemover> { + void AllocateTxId() { + Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId); + Become(&TThis::StateAllocateTxId); + } + + STATEFN(StateAllocateTxId) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle); + default: + return StateBase(ev, TlsActivationContext->AsActorContext()); + } + } + + void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + + TxId = ev->Get()->TxId; + PipeCache = ev->Get()->Services.LeaderPipeCache; + DropDst(); + } + + void DropDst() { + auto ev = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(TxId, SchemeShardId); + auto& tx = *ev->Record.AddTransaction(); + tx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropTable); + tx.MutableDrop()->SetId(DstPathId.LocalPathId); + + Send(PipeCache, new TEvPipeCache::TEvForward(ev.Release(), SchemeShardId, true)); + Become(&TThis::StateDropDst); + } + + STATEFN(StateDropDst) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle); + hFunc(TEvSchemeShard::TEvNotifyTxCompletionResult, Handle); + sFunc(TEvents::TEvWakeup, AllocateTxId); + default: + return StateBase(ev, TlsActivationContext->AsActorContext()); + } + } + + void Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + const auto& record = ev->Get()->Record; + + switch (record.GetStatus()) { + case NKikimrScheme::StatusAccepted: + Y_VERIFY_DEBUG(TxId == record.GetTxId()); + return SubscribeTx(record.GetTxId()); + case NKikimrScheme::StatusMultipleModifications: + if (record.HasPathDropTxId()) { + return SubscribeTx(record.GetPathDropTxId()); + } else { + return Error(record.GetStatus(), record.GetReason()); + } + break; + case NKikimrScheme::StatusPathDoesNotExist: + return Success(); + default: + return Error(record.GetStatus(), record.GetReason()); + } + } + + void SubscribeTx(ui64 txId) { + LOG_D("Subscribe tx" + << ": txId# " << txId); + Send(PipeCache, new TEvPipeCache::TEvForward(new TEvSchemeShard::TEvNotifyTxCompletion(txId), SchemeShardId)); + } + + void Handle(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + Success(); + } + + void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + + if (SchemeShardId == ev->Get()->TabletId) { + return; + } + + Retry(); + } + + void Handle(TEvents::TEvUndelivered::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + Retry(); + } + + void Success() { + LOG_I("Success"); + + Send(Parent, new TEvPrivate::TEvDropDstResult(ReplicationId, TargetId)); + PassAway(); + } + + void Error(NKikimrScheme::EStatus status, const TString& error) { + LOG_E("Error" + << ": status# " << status + << ", reason# " << error); + + Send(Parent, new TEvPrivate::TEvDropDstResult(ReplicationId, TargetId, status, error)); + PassAway(); + } + + void Retry() { + LOG_D("Retry"); + Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup); + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::REPLICATION_CONTROLLER_DST_REMOVER; + } + + explicit TDstRemover( + const TActorId& parent, + ui64 schemeShardId, + const TActorId& proxy, + ui64 rid, + ui64 tid, + const TPathId& dstPathId) + : Parent(parent) + , SchemeShardId(schemeShardId) + , YdbProxy(proxy) + , ReplicationId(rid) + , TargetId(tid) + , DstPathId(dstPathId) + , LogPrefix("DstRemover", ReplicationId, TargetId) + { + } + + void Bootstrap() { + if (!DstPathId) { + Success(); + } else { + AllocateTxId(); + } + } + + STATEFN(StateBase) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + const TActorId Parent; + const ui64 SchemeShardId; + const TActorId YdbProxy; + const ui64 ReplicationId; + const ui64 TargetId; + const TPathId DstPathId; + const TActorLogPrefix LogPrefix; + + ui64 TxId = 0; + TActorId PipeCache; + +}; // TDstRemover + +IActor* CreateDstRemover(const TActorId& parent, ui64 schemeShardId, const TActorId& proxy, + ui64 rid, ui64 tid, const TPathId& dstPathId) +{ + return new TDstRemover(parent, schemeShardId, proxy, rid, tid, dstPathId); +} + +} diff --git a/ydb/core/tx/replication/controller/dst_remover.h b/ydb/core/tx/replication/controller/dst_remover.h new file mode 100644 index 00000000000..687c3607242 --- /dev/null +++ b/ydb/core/tx/replication/controller/dst_remover.h @@ -0,0 +1,12 @@ +#pragma once + +#include "replication.h" + +#include <ydb/core/base/defs.h> + +namespace NKikimr::NReplication::NController { + +IActor* CreateDstRemover(const TActorId& parent, ui64 schemeShardId, const TActorId& proxy, + ui64 rid, ui64 tid, const TPathId& dstPathId); + +} diff --git a/ydb/core/tx/replication/controller/private_events.cpp b/ydb/core/tx/replication/controller/private_events.cpp index 3a730b4cb7a..c54f4d149a5 100644 --- a/ydb/core/tx/replication/controller/private_events.cpp +++ b/ydb/core/tx/replication/controller/private_events.cpp @@ -93,6 +93,58 @@ bool TEvPrivate::TEvCreateDstResult::IsSuccess() const { return Status == NKikimrScheme::StatusSuccess; } +TEvPrivate::TEvDropStreamResult::TEvDropStreamResult(ui64 rid, ui64 tid, NYdb::TStatus&& status) + : ReplicationId(rid) + , TargetId(tid) + , Status(std::move(status)) +{ +} + +TString TEvPrivate::TEvDropStreamResult::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " ReplicationId: " << ReplicationId + << " TargetId: " << TargetId + << " Status: " << Status.GetStatus() + << " Issues: " << Status.GetIssues().ToOneLineString() + << " }"; +} + +bool TEvPrivate::TEvDropStreamResult::IsSuccess() const { + return Status.IsSuccess(); +} + +TEvPrivate::TEvDropDstResult::TEvDropDstResult(ui64 rid, ui64 tid, NKikimrScheme::EStatus status, const TString& error) + : ReplicationId(rid) + , TargetId(tid) + , Status(status) + , Error(error) +{ +} + +TString TEvPrivate::TEvDropDstResult::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " ReplicationId: " << ReplicationId + << " TargetId: " << TargetId + << " Status: " << NKikimrScheme::EStatus_Name(Status) + << " Error: " << Error + << " }"; +} + +bool TEvPrivate::TEvDropDstResult::IsSuccess() const { + return Status == NKikimrScheme::StatusSuccess; +} + +TEvPrivate::TEvDropReplication::TEvDropReplication(ui64 rid) + : ReplicationId(rid) +{ +} + +TString TEvPrivate::TEvDropReplication::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " ReplicationId: " << ReplicationId + << " }"; +} + } Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::NController::TEvPrivate::TEvDiscoveryResult::TAddEntry, stream, value) { diff --git a/ydb/core/tx/replication/controller/private_events.h b/ydb/core/tx/replication/controller/private_events.h index 35ba09ce11c..475aa59c165 100644 --- a/ydb/core/tx/replication/controller/private_events.h +++ b/ydb/core/tx/replication/controller/private_events.h @@ -15,6 +15,9 @@ struct TEvPrivate { EvAssignStreamName, EvCreateStreamResult, EvCreateDstResult, + EvDropStreamResult, + EvDropDstResult, + EvDropReplication, EvEnd, }; @@ -70,6 +73,37 @@ struct TEvPrivate { bool IsSuccess() const; }; + struct TEvDropStreamResult: public TEventLocal<TEvDropStreamResult, EvDropStreamResult> { + const ui64 ReplicationId; + const ui64 TargetId; + const NYdb::TStatus Status; + + explicit TEvDropStreamResult(ui64 rid, ui64 tid, NYdb::TStatus&& status); + TString ToString() const override; + + bool IsSuccess() const; + }; + + struct TEvDropDstResult: public TEventLocal<TEvDropDstResult, EvDropDstResult> { + const ui64 ReplicationId; + const ui64 TargetId; + const NKikimrScheme::EStatus Status; + const TString Error; + + explicit TEvDropDstResult(ui64 rid, ui64 tid, + NKikimrScheme::EStatus status = NKikimrScheme::StatusSuccess, const TString& error = {}); + TString ToString() const override; + + bool IsSuccess() const; + }; + + struct TEvDropReplication: public TEventLocal<TEvDropReplication, EvDropReplication> { + const ui64 ReplicationId; + + explicit TEvDropReplication(ui64 rid); + TString ToString() const override; + }; + }; // TEvPrivate } diff --git a/ydb/core/tx/replication/controller/replication.cpp b/ydb/core/tx/replication/controller/replication.cpp index ffa7fe297fc..467ad27b227 100644 --- a/ydb/core/tx/replication/controller/replication.cpp +++ b/ydb/core/tx/replication/controller/replication.cpp @@ -1,4 +1,5 @@ #include "discoverer.h" +#include "private_events.h" #include "replication.h" #include "target_table.h" #include "util.h" @@ -79,6 +80,10 @@ public: : nullptr; } + void RemoveTarget(ui64 id) { + Targets.erase(id); + } + void Progress(const TActorContext& ctx) { if (!YdbProxy) { THolder<IActor> ydbProxy; @@ -104,7 +109,11 @@ public: return ProgressTargets(ctx); } case EState::Removing: - return; // TODO + if (!Targets) { + return (void)ctx.Send(ctx.SelfID, new TEvPrivate::TEvDropReplication(ReplicationId)); + } else { + return ProgressTargets(ctx); + } case EState::Error: return; } @@ -183,6 +192,10 @@ TReplication::ITarget* TReplication::FindTarget(ui64 id) { return Impl->FindTarget(id); } +void TReplication::RemoveTarget(ui64 id) { + return Impl->RemoveTarget(id); +} + void TReplication::Progress(const TActorContext& ctx) { Impl->Progress(ctx); } @@ -195,6 +208,10 @@ ui64 TReplication::GetId() const { return Impl->ReplicationId; } +const TPathId& TReplication::GetPathId() const { + return Impl->PathId; +} + void TReplication::SetState(EState state, TString issue) { Impl->SetState(state, issue); } @@ -215,6 +232,14 @@ ui64 TReplication::GetNextTargetId() const { return Impl->NextTargetId; } +void TReplication::SetDropOp(const TActorId& sender, const std::pair<ui64, ui32>& opId) { + DropOp = {sender, opId}; +} + +const std::optional<TReplication::TDropOp>& TReplication::GetDropOp() const { + return DropOp; +} + } Y_DECLARE_OUT_SPEC(, NKikimrReplication::TReplicationConfig::TargetCase, stream, value) { diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h index 18c143d8637..db1ab27046e 100644 --- a/ydb/core/tx/replication/controller/replication.h +++ b/ydb/core/tx/replication/controller/replication.h @@ -8,6 +8,7 @@ #include <util/generic/ptr.h> #include <memory> +#include <optional> namespace NKikimrReplication { class TReplicationConfig; @@ -41,6 +42,7 @@ public: Creating, Ready, Removing, + Removed, Error = 255 }; @@ -70,6 +72,11 @@ public: virtual void Shutdown(const TActorContext& ctx) = 0; }; + struct TDropOp { + TActorId Sender; + std::pair<ui64, ui32> OperationId; // txId, partId + }; + public: explicit TReplication(ui64 id, const TPathId& pathId, const NKikimrReplication::TReplicationConfig& config); explicit TReplication(ui64 id, const TPathId& pathId, NKikimrReplication::TReplicationConfig&& config); @@ -79,11 +86,13 @@ public: ITarget* AddTarget(ui64 id, ETargetKind kind, const TString& srcPath, const TString& dstPath); const ITarget* FindTarget(ui64 id) const; ITarget* FindTarget(ui64 id); + void RemoveTarget(ui64 id); void Progress(const TActorContext& ctx); void Shutdown(const TActorContext& ctx); ui64 GetId() const; + const TPathId& GetPathId() const; void SetState(EState state, TString issue = {}); EState GetState() const; const TString& GetIssue() const; @@ -91,9 +100,13 @@ public: void SetNextTargetId(ui64 value); ui64 GetNextTargetId() const; + void SetDropOp(const TActorId& sender, const std::pair<ui64, ui32>& opId); + const std::optional<TDropOp>& GetDropOp() const; + private: class TImpl; std::shared_ptr<TImpl> Impl; + std::optional<TDropOp> DropOp; }; // TReplication diff --git a/ydb/core/tx/replication/controller/stream_remover.cpp b/ydb/core/tx/replication/controller/stream_remover.cpp new file mode 100644 index 00000000000..75fcbf31ca9 --- /dev/null +++ b/ydb/core/tx/replication/controller/stream_remover.cpp @@ -0,0 +1,101 @@ +#include "logging.h" +#include "private_events.h" +#include "stream_remover.h" +#include "util.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> + +#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h> + +namespace NKikimr::NReplication::NController { + +class TStreamRemover: public TActorBootstrapped<TStreamRemover> { + void DropStream() { + switch (Kind) { + case TReplication::ETargetKind::Table: + Send(YdbProxy, new TEvYdbProxy::TEvAlterTableRequest(SrcPath, NYdb::NTable::TAlterTableSettings() + .AppendDropChangefeeds(StreamName))); + break; + } + + Become(&TThis::StateWork); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvYdbProxy::TEvAlterTableResponse, Handle); + sFunc(TEvents::TEvWakeup, DropStream); + sFunc(TEvents::TEvPoison, PassAway); + } + } + + void Handle(TEvYdbProxy::TEvAlterTableResponse::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + auto& result = ev->Get()->Result; + + if (!result.IsSuccess()) { + if (IsRetryableError(result)) { + LOG_D("Retry"); + return Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup); + } + + LOG_E("Error" + << ": status# " << result.GetStatus() + << ", issues# " << result.GetIssues().ToOneLineString()); + } else { + LOG_I("Success" + << ": issues# " << result.GetIssues().ToOneLineString()); + } + + Send(Parent, new TEvPrivate::TEvDropStreamResult(ReplicationId, TargetId, std::move(result))); + PassAway(); + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::REPLICATION_CONTROLLER_STREAM_REMOVER; + } + + explicit TStreamRemover( + const TActorId& parent, + const TActorId& proxy, + ui64 rid, + ui64 tid, + TReplication::ETargetKind kind, + const TString& srcPath, + const TString& streamName) + : Parent(parent) + , YdbProxy(proxy) + , ReplicationId(rid) + , TargetId(tid) + , Kind(kind) + , SrcPath(srcPath) + , StreamName(streamName) + , LogPrefix("StreamRemover", ReplicationId, TargetId) + { + } + + void Bootstrap() { + DropStream(); + } + +private: + const TActorId Parent; + const TActorId YdbProxy; + const ui64 ReplicationId; + const ui64 TargetId; + const TReplication::ETargetKind Kind; + const TString SrcPath; + const TString StreamName; + const TActorLogPrefix LogPrefix; + +}; // TStreamRemover + +IActor* CreateStreamRemover(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid, + TReplication::ETargetKind kind, const TString& srcPath, const TString& streamName) +{ + return new TStreamRemover(parent, proxy, rid, tid, kind, srcPath, streamName); +} + +} diff --git a/ydb/core/tx/replication/controller/stream_remover.h b/ydb/core/tx/replication/controller/stream_remover.h new file mode 100644 index 00000000000..97694e1f19e --- /dev/null +++ b/ydb/core/tx/replication/controller/stream_remover.h @@ -0,0 +1,12 @@ +#pragma once + +#include "replication.h" + +#include <ydb/core/base/defs.h> + +namespace NKikimr::NReplication::NController { + +IActor* CreateStreamRemover(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid, + TReplication::ETargetKind kind, const TString& srcPath, const TString& streamName); + +} diff --git a/ydb/core/tx/replication/controller/target_base.cpp b/ydb/core/tx/replication/controller/target_base.cpp index 0adcb2e17ce..0e942fc5c94 100644 --- a/ydb/core/tx/replication/controller/target_base.cpp +++ b/ydb/core/tx/replication/controller/target_base.cpp @@ -1,4 +1,5 @@ #include "dst_creator.h" +#include "dst_remover.h" #include "target_base.h" #include "util.h" @@ -93,7 +94,11 @@ void TTargetBase::Progress(ui64 schemeShardId, const TActorId& proxy, const TAct case EDstState::Ready: break; // TODO case EDstState::Removing: - break; // TODO + if (!DstRemover) { + DstRemover = ctx.Register(CreateDstRemover(ctx.SelfID, schemeShardId, proxy, + ReplicationId, TargetId, DstPathId)); + } + break; case EDstState::Error: break; } diff --git a/ydb/core/tx/replication/controller/target_with_stream.cpp b/ydb/core/tx/replication/controller/target_with_stream.cpp index 841e01581c0..63296311f83 100644 --- a/ydb/core/tx/replication/controller/target_with_stream.cpp +++ b/ydb/core/tx/replication/controller/target_with_stream.cpp @@ -1,5 +1,6 @@ #include "private_events.h" #include "stream_creator.h" +#include "stream_remover.h" #include "target_with_stream.h" #include <library/cpp/actors/core/events.h> @@ -18,8 +19,13 @@ void TTargetWithStream::Progress(ui64 schemeShardId, const TActorId& proxy, cons } return; case EStreamState::Removing: - return; // TODO + if (!StreamRemover) { + StreamRemover = ctx.Register(CreateStreamRemover(ctx.SelfID, proxy, + GetReplicationId(), GetTargetId(), GetTargetKind(), GetSrcPath(), GetStreamName())); + } + return; case EStreamState::Ready: + case EStreamState::Removed: case EStreamState::Error: break; } diff --git a/ydb/core/tx/replication/controller/tx_create_dst_result.cpp b/ydb/core/tx/replication/controller/tx_create_dst_result.cpp index bc825f5ff18..cd340495ea0 100644 --- a/ydb/core/tx/replication/controller/tx_create_dst_result.cpp +++ b/ydb/core/tx/replication/controller/tx_create_dst_result.cpp @@ -38,6 +38,14 @@ public: return true; } + if (target->GetDstState() != TReplication::EDstState::Creating) { + CLOG_W(ctx, "Dst state mismatch" + << ": rid# " << rid + << ", tid# " << tid + << ", state# " << target->GetDstState()); + return true; + } + if (Ev->Get()->IsSuccess()) { target->SetDstPathId(Ev->Get()->DstPathId); target->SetDstState(TReplication::EDstState::Ready); diff --git a/ydb/core/tx/replication/controller/tx_create_stream_result.cpp b/ydb/core/tx/replication/controller/tx_create_stream_result.cpp index 37aff25ede1..ca9d0f645e5 100644 --- a/ydb/core/tx/replication/controller/tx_create_stream_result.cpp +++ b/ydb/core/tx/replication/controller/tx_create_stream_result.cpp @@ -38,6 +38,14 @@ public: return true; } + if (target->GetStreamState() != TReplication::EStreamState::Creating) { + CLOG_W(ctx, "Stream state mismatch" + << ": rid# " << rid + << ", tid# " << tid + << ", state# " << target->GetStreamState()); + return true; + } + if (Ev->Get()->IsSuccess()) { target->SetStreamState(TReplication::EStreamState::Ready); diff --git a/ydb/core/tx/replication/controller/tx_discovery_result.cpp b/ydb/core/tx/replication/controller/tx_discovery_result.cpp index 58fba085844..fca07c58eaa 100644 --- a/ydb/core/tx/replication/controller/tx_discovery_result.cpp +++ b/ydb/core/tx/replication/controller/tx_discovery_result.cpp @@ -32,6 +32,13 @@ public: return true; } + if (Replication->GetState() != TReplication::EState::Ready) { + CLOG_W(ctx, "Replication state mismatch" + << ": rid# " << rid + << ", state# " << Replication->GetState()); + return true; + } + NIceDb::TNiceDb db(txc.DB); if (Ev->Get()->IsSuccess()) { diff --git a/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp b/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp new file mode 100644 index 00000000000..3464db6a02c --- /dev/null +++ b/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp @@ -0,0 +1,83 @@ +#include "controller_impl.h" + +namespace NKikimr::NReplication::NController { + +class TController::TTxDropDstResult: public TTxBase { + TEvPrivate::TEvDropDstResult::TPtr Ev; + TReplication::TPtr Replication; + +public: + explicit TTxDropDstResult(TController* self, TEvPrivate::TEvDropDstResult::TPtr& ev) + : TTxBase("TxDropDstResult", self) + , Ev(ev) + { + } + + TTxType GetTxType() const override { + return TXTYPE_DROP_DST_RESULT; + } + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + CLOG_D(ctx, "Execute: " << Ev->Get()->ToString()); + + const auto rid = Ev->Get()->ReplicationId; + const auto tid = Ev->Get()->TargetId; + + Replication = Self->Find(rid); + if (!Replication) { + CLOG_W(ctx, "Unknown replication" + << ": rid# " << rid); + return true; + } + + auto* target = Replication->FindTarget(tid); + if (!target) { + CLOG_W(ctx, "Unknown target" + << ": rid# " << rid + << ", tid# " << tid); + return true; + } + + if (target->GetDstState() != TReplication::EDstState::Removing) { + CLOG_W(ctx, "Dst state mismatch" + << ": rid# " << rid + << ", tid# " << tid + << ", state# " << target->GetDstState()); + return true; + } + + if (Ev->Get()->IsSuccess()) { + CLOG_N(ctx, "Target dst dropped" + << ": rid# " << rid + << ", tid# " << tid); + } else { + CLOG_E(ctx, "Drop dst error" + << ": rid# " << rid + << ", tid# " << tid + << ", " << NKikimrScheme::EStatus_Name(Ev->Get()->Status) + << ", " << Ev->Get()->Error); + } + + NIceDb::TNiceDb db(txc.DB); + db.Table<Schema::Targets>().Key(rid, tid).Delete(); + db.Table<Schema::SrcStreams>().Key(rid, tid).Delete(); + Replication->RemoveTarget(tid); + + return true; + } + + void Complete(const TActorContext& ctx) override { + CLOG_D(ctx, "Complete"); + + if (Replication) { + Replication->Progress(ctx); + } + } + +}; // TTxDropDstResult + +void TController::RunTxDropDstResult(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx) { + Execute(new TTxDropDstResult(this, ev), ctx); +} + +} diff --git a/ydb/core/tx/replication/controller/tx_drop_replication.cpp b/ydb/core/tx/replication/controller/tx_drop_replication.cpp index 8557a4098fc..b59e519b3b9 100644 --- a/ydb/core/tx/replication/controller/tx_drop_replication.cpp +++ b/ydb/core/tx/replication/controller/tx_drop_replication.cpp @@ -3,14 +3,21 @@ namespace NKikimr::NReplication::NController { class TController::TTxDropReplication: public TTxBase { - TEvController::TEvDropReplication::TPtr Ev; - THolder<TEvController::TEvDropReplicationResult> Result; + TEvController::TEvDropReplication::TPtr PubEv; + TEvPrivate::TEvDropReplication::TPtr PrivEv; + THolder<IEventHandle> Result; // TEvController::TEvDropReplicationResult TReplication::TPtr Replication; public: explicit TTxDropReplication(TController* self, TEvController::TEvDropReplication::TPtr& ev) : TTxBase("TxDropReplication", self) - , Ev(ev) + , PubEv(ev) + { + } + + explicit TTxDropReplication(TController* self, TEvPrivate::TEvDropReplication::TPtr& ev) + : TTxBase("TxDropReplication", self) + , PrivEv(ev) { } @@ -19,36 +26,109 @@ public: } bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { - CLOG_D(ctx, "Execute: " << Ev->Get()->ToString()); + if (PubEv) { + return ExecutePub(txc, ctx); + } else if (PrivEv) { + return ExecutePriv(txc, ctx); + } else { + Y_FAIL("unreachable"); + } + } - const auto& record = Ev->Get()->Record; - Result = MakeHolder<TEvController::TEvDropReplicationResult>(); - Result->Record.MutableOperationId()->CopyFrom(record.GetOperationId()); - Result->Record.SetOrigin(Self->TabletID()); + bool ExecutePub(TTransactionContext& txc, const TActorContext& ctx) { + CLOG_D(ctx, "Execute: " << PubEv->Get()->ToString()); + const auto& record = PubEv->Get()->Record; const auto pathId = PathIdFromPathId(record.GetPathId()); + const auto& opId = record.GetOperationId(); Replication = Self->Find(pathId); if (!Replication) { CLOG_W(ctx, "Cannot drop unknown replication" << ": pathId# " << pathId); - Result->Record.SetStatus(NKikimrReplication::TEvDropReplicationResult::NOT_FOUND); + auto ev = MakeHolder<TEvController::TEvDropReplicationResult>(); + ev->Record.MutableOperationId()->CopyFrom(record.GetOperationId()); + ev->Record.SetOrigin(Self->TabletID()); + ev->Record.SetStatus(NKikimrReplication::TEvDropReplicationResult::NOT_FOUND); + Result = MakeHolder<IEventHandle>(PubEv->Sender, ctx.SelfID, ev.Release()); + + return true; + } + + if (Replication->GetState() == TReplication::EState::Removing) { + Replication->SetDropOp(PubEv->Sender, std::make_pair(opId.GetTxId(), opId.GetPartId())); return true; } NIceDb::TNiceDb db(txc.DB); + Replication->SetState(TReplication::EState::Removing); db.Table<Schema::Replications>().Key(Replication->GetId()).Update( NIceDb::TUpdate<Schema::Replications::State>(Replication->GetState()) ); + for (ui64 tid = 0; tid < Replication->GetNextTargetId(); ++tid) { + auto* target = Replication->FindTarget(tid); + if (!target) { + continue; + } + + target->Shutdown(ctx); + + target->SetStreamState(TReplication::EStreamState::Removing); + db.Table<Schema::SrcStreams>().Key(Replication->GetId(), tid).Update( + NIceDb::TUpdate<Schema::SrcStreams::State>(target->GetStreamState()) + ); + + target->SetDstState(TReplication::EDstState::Removing); // TODO: configurable + db.Table<Schema::Targets>().Key(Replication->GetId(), tid).Update( + NIceDb::TUpdate<Schema::Targets::DstState>(target->GetDstState()) + ); + } + CLOG_N(ctx, "Drop replication" << ": rid# " << Replication->GetId() << ", pathId# " << pathId); - // TODO: delay response - Result->Record.SetStatus(NKikimrReplication::TEvDropReplicationResult::SUCCESS); + Replication->SetDropOp(PubEv->Sender, std::make_pair(opId.GetTxId(), opId.GetPartId())); + return true; + } + + bool ExecutePriv(TTransactionContext& txc, const TActorContext& ctx) { + CLOG_D(ctx, "Execute: " << PrivEv->Get()->ToString()); + + const auto rid = PrivEv->Get()->ReplicationId; + Replication = Self->Find(rid); + + if (!Replication) { + CLOG_W(ctx, "Cannot drop unknown replication" + << ": rid# " << rid); + return true; + } + + if (Replication->GetState() != TReplication::EState::Removing) { + CLOG_W(ctx, "Replication state mismatch" + << ": rid# " << rid + << ", state# " << Replication->GetState()); + return true; + } + + NIceDb::TNiceDb db(txc.DB); + db.Table<Schema::Replications>().Key(rid).Delete(); + + if (const auto& op = Replication->GetDropOp()) { + auto ev = MakeHolder<TEvController::TEvDropReplicationResult>(); + ev->Record.MutableOperationId()->SetTxId(op->OperationId.first); + ev->Record.MutableOperationId()->SetPartId(op->OperationId.second); + ev->Record.SetOrigin(Self->TabletID()); + ev->Record.SetStatus(NKikimrReplication::TEvDropReplicationResult::SUCCESS); + Result = MakeHolder<IEventHandle>(op->Sender, ctx.SelfID, ev.Release()); + } + + Self->Remove(rid); + Replication.Reset(); + return true; } @@ -56,7 +136,7 @@ public: CLOG_D(ctx, "Complete"); if (Result) { - ctx.Send(Ev->Sender, Result.Release(), 0, Ev->Cookie); + ctx.Send(Result.Release()); } if (Replication) { @@ -70,4 +150,8 @@ void TController::RunTxDropReplication(TEvController::TEvDropReplication::TPtr& Execute(new TTxDropReplication(this, ev), ctx); } +void TController::RunTxDropReplication(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx) { + Execute(new TTxDropReplication(this, ev), ctx); +} + } diff --git a/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp b/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp new file mode 100644 index 00000000000..71f5a3782c9 --- /dev/null +++ b/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp @@ -0,0 +1,83 @@ +#include "controller_impl.h" + +namespace NKikimr::NReplication::NController { + +class TController::TTxDropStreamResult: public TTxBase { + TEvPrivate::TEvDropStreamResult::TPtr Ev; + TReplication::TPtr Replication; + +public: + explicit TTxDropStreamResult(TController* self, TEvPrivate::TEvDropStreamResult::TPtr& ev) + : TTxBase("TxDropStreamResult", self) + , Ev(ev) + { + } + + TTxType GetTxType() const override { + return TXTYPE_DROP_STREAM_RESULT; + } + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + CLOG_D(ctx, "Execute: " << Ev->Get()->ToString()); + + const auto rid = Ev->Get()->ReplicationId; + const auto tid = Ev->Get()->TargetId; + + Replication = Self->Find(rid); + if (!Replication) { + CLOG_W(ctx, "Unknown replication" + << ": rid# " << rid); + return true; + } + + auto* target = Replication->FindTarget(tid); + if (!target) { + CLOG_W(ctx, "Unknown target" + << ": rid# " << rid + << ", tid# " << tid); + return true; + } + + if (target->GetStreamState() != TReplication::EStreamState::Removing) { + CLOG_W(ctx, "Stream state mismatch" + << ": rid# " << rid + << ", tid# " << tid + << ", state# " << target->GetStreamState()); + return true; + } + + if (Ev->Get()->IsSuccess()) { + CLOG_N(ctx, "Stream dropped" + << ": rid# " << rid + << ", tid# " << tid); + } else { + const auto& status = Ev->Get()->Status; + CLOG_E(ctx, "Drop stream error" + << ": rid# " << rid + << ", tid# " << tid + << ", status# " << status.GetStatus() + << ", issue# " << status.GetIssues().ToOneLineString()); + } + + NIceDb::TNiceDb db(txc.DB); + target->SetStreamState(TReplication::EStreamState::Removed); + db.Table<Schema::SrcStreams>().Key(rid, tid).Update<Schema::SrcStreams::State>(target->GetStreamState()); + + return true; + } + + void Complete(const TActorContext& ctx) override { + CLOG_D(ctx, "Complete"); + + if (Replication) { + Replication->Progress(ctx); + } + } + +}; // TTxDropStreamResult + +void TController::RunTxDropStreamResult(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx) { + Execute(new TTxDropStreamResult(this, ev), ctx); +} + +} diff --git a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt index f7e8181651d..e31d395fe60 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt +++ b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt @@ -32,6 +32,7 @@ target_link_libraries(tx-schemeshard-ut_helpers PUBLIC core-tx-tx_allocator core-tx-tx_proxy public-lib-scheme_types + cpp-client-ydb_driver ) target_sources(tx-schemeshard-ut_helpers PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp diff --git a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt index eba6438ce64..c1c282cef4e 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt @@ -33,6 +33,7 @@ target_link_libraries(tx-schemeshard-ut_helpers PUBLIC core-tx-tx_allocator core-tx-tx_proxy public-lib-scheme_types + cpp-client-ydb_driver ) target_sources(tx-schemeshard-ut_helpers PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp diff --git a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt index eba6438ce64..c1c282cef4e 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt +++ b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt @@ -33,6 +33,7 @@ target_link_libraries(tx-schemeshard-ut_helpers PUBLIC core-tx-tx_allocator core-tx-tx_proxy public-lib-scheme_types + cpp-client-ydb_driver ) target_sources(tx-schemeshard-ut_helpers PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index 28e5a8285ee..d6cec4e1edb 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -561,6 +561,11 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe } } + if (opts.InitYdbDriver_) { + YdbDriver = MakeHolder<NYdb::TDriver>(NYdb::TDriverConfig()); + runtime.GetAppData().YdbDriver = YdbDriver.Get(); + } + TActorId sender = runtime.AllocateEdgeActor(); //CreateTestBootstrapper(runtime, CreateTestTabletInfo(MakeBSControllerID(TTestTxConfig::DomainUid), TTabletTypes::BSController), &CreateFlatBsController); BootSchemeShard(runtime, schemeRoot); diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h index 782593e48e5..e9ba642e839 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h @@ -10,6 +10,8 @@ #include <ydb/core/tx/schemeshard/schemeshard_identificators.h> #include <ydb/core/tx/schemeshard/schemeshard_import.h> +#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> + #include <functional> namespace NSchemeShardUT_Private { @@ -33,6 +35,7 @@ namespace NSchemeShardUT_Private { OPTION(ui32, NChannels, 4); OPTION(bool, EnablePipeRetries, true); OPTION(bool, RunFakeConfigDispatcher, false); + OPTION(bool, InitYdbDriver, false); OPTION(std::optional<bool>, EnableSystemViews, std::nullopt); OPTION(std::optional<bool>, EnablePersistentQueryStats, std::nullopt); OPTION(std::optional<bool>, EnablePersistentPartitionStats, std::nullopt); @@ -66,6 +69,7 @@ namespace NSchemeShardUT_Private { TActorId TxReliablePropose; ui32 ChannelsCount; TActorId MeteringFake; + THolder<NYdb::TDriver> YdbDriver; public: TTestEnv(TTestActorRuntime& runtime, ui32 nchannels = 4, bool enablePipeRetries = true, diff --git a/ydb/core/tx/schemeshard/ut_replication.cpp b/ydb/core/tx/schemeshard/ut_replication.cpp index 2030564642d..cb581c9c1db 100644 --- a/ydb/core/tx/schemeshard/ut_replication.cpp +++ b/ydb/core/tx/schemeshard/ut_replication.cpp @@ -3,6 +3,24 @@ using namespace NSchemeShardUT_Private; Y_UNIT_TEST_SUITE(TReplicationTests) { + static TString DefaultScheme(const TString& name) { + return Sprintf(R"( + Name: "%s" + Config { + StaticCredentials { + User: "user" + Password: "pwd" + } + Specific { + Targets { + SrcPath: "/MyRoot1/Table" + DstPath: "/MyRoot2/Table" + } + } + } + )", name.c_str()); + } + void SetupLogging(TTestActorRuntimeBase& runtime) { runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NActors::NLog::PRI_TRACE); @@ -17,21 +35,19 @@ Y_UNIT_TEST_SUITE(TReplicationTests) { Y_UNIT_TEST(Create) { TTestBasicRuntime runtime; - TTestEnv env(runtime); + TTestEnv env(runtime, TTestEnvOptions().InitYdbDriver(true)); ui64 txId = 100; SetupLogging(runtime); - TestCreateReplication(runtime, ++txId, "/MyRoot", R"( - Name: "Replication" - )"); + TestCreateReplication(runtime, ++txId, "/MyRoot", DefaultScheme("Replication")); env.TestWaitNotification(runtime, txId); TestLs(runtime, "/MyRoot/Replication", false, NLs::PathExist); } Y_UNIT_TEST(CreateSequential) { TTestBasicRuntime runtime; - TTestEnv env(runtime); + TTestEnv env(runtime, TTestEnvOptions().InitYdbDriver(true)); ui64 txId = 100; SetupLogging(runtime); @@ -40,9 +56,7 @@ Y_UNIT_TEST_SUITE(TReplicationTests) { for (int i = 0; i < 2; ++i) { const auto name = Sprintf("Replication%d", i); - TestCreateReplication(runtime, ++txId, "/MyRoot", Sprintf(R"( - Name: "%s" - )", name.c_str())); + TestCreateReplication(runtime, ++txId, "/MyRoot", DefaultScheme(name)); env.TestWaitNotification(runtime, txId); const auto desc = DescribePath(runtime, "/MyRoot/" + name); @@ -59,7 +73,7 @@ Y_UNIT_TEST_SUITE(TReplicationTests) { Y_UNIT_TEST(CreateInParallel) { TTestBasicRuntime runtime; - TTestEnv env(runtime); + TTestEnv env(runtime, TTestEnvOptions().InitYdbDriver(true)); ui64 txId = 100; SetupLogging(runtime); @@ -72,9 +86,7 @@ Y_UNIT_TEST_SUITE(TReplicationTests) { for (int j = 0; j < 2; ++j) { auto name = Sprintf("Replication%d-%d", i, j); - TestCreateReplication(runtime, ++txId, "/MyRoot", Sprintf(R"( - Name: "%s" - )", name.c_str())); + TestCreateReplication(runtime, ++txId, "/MyRoot", DefaultScheme(name)); names.push_back(std::move(name)); txIds.push_back(txId); @@ -97,15 +109,13 @@ Y_UNIT_TEST_SUITE(TReplicationTests) { Y_UNIT_TEST(CreateDropRecreate) { TTestBasicRuntime runtime; - TTestEnv env(runtime); + TTestEnv env(runtime, TTestEnvOptions().InitYdbDriver(true)); ui64 txId = 100; SetupLogging(runtime); ui64 controllerId = 0; - TestCreateReplication(runtime, ++txId, "/MyRoot", R"( - Name: "Replication" - )"); + TestCreateReplication(runtime, ++txId, "/MyRoot", DefaultScheme("Replication")); env.TestWaitNotification(runtime, txId); { const auto desc = DescribePath(runtime, "/MyRoot/Replication"); @@ -117,9 +127,7 @@ Y_UNIT_TEST_SUITE(TReplicationTests) { env.TestWaitNotification(runtime, txId); TestDescribeResult(DescribePath(runtime, "/MyRoot/Replication"), {NLs::PathNotExist}); - TestCreateReplication(runtime, ++txId, "/MyRoot", R"( - Name: "Replication" - )"); + TestCreateReplication(runtime, ++txId, "/MyRoot", DefaultScheme("Replication")); env.TestWaitNotification(runtime, txId); { const auto desc = DescribePath(runtime, "/MyRoot/Replication"); diff --git a/ydb/core/tx/schemeshard/ut_replication_reboots.cpp b/ydb/core/tx/schemeshard/ut_replication_reboots.cpp index 45d4e97640a..72043e1db25 100644 --- a/ydb/core/tx/schemeshard/ut_replication_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_replication_reboots.cpp @@ -3,6 +3,24 @@ using namespace NSchemeShardUT_Private; Y_UNIT_TEST_SUITE(TReplicationWithRebootsTests) { + static TString DefaultScheme(const TString& name) { + return Sprintf(R"( + Name: "%s" + Config { + StaticCredentials { + User: "user" + Password: "pwd" + } + Specific { + Targets { + SrcPath: "/MyRoot1/Table" + DstPath: "/MyRoot2/Table" + } + } + } + )", name.c_str()); + } + void SetupLogging(TTestActorRuntimeBase& runtime) { runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NActors::NLog::PRI_TRACE); @@ -10,15 +28,15 @@ Y_UNIT_TEST_SUITE(TReplicationWithRebootsTests) { Y_UNIT_TEST(Create) { TTestWithReboots t(false); + t.GetTestEnvOptions().InitYdbDriver(true); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { { TInactiveZone inactive(activeZone); SetupLogging(runtime); } - TestCreateReplication(runtime, ++t.TxId, "/MyRoot", R"( - Name: "Replication" - )"); + TestCreateReplication(runtime, ++t.TxId, "/MyRoot", DefaultScheme("Replication")); t.TestEnv->TestWaitNotification(runtime, t.TxId); TestLs(runtime, "/MyRoot/Replication", false, NLs::PathExist); @@ -27,15 +45,15 @@ Y_UNIT_TEST_SUITE(TReplicationWithRebootsTests) { void CreateMultipleReplications(bool withInitialController) { TTestWithReboots t(false); + t.GetTestEnvOptions().InitYdbDriver(true); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { { TInactiveZone inactive(activeZone); SetupLogging(runtime); if (withInitialController) { - TestCreateReplication(runtime, ++t.TxId, "/MyRoot", R"( - Name: "Replication0" - )"); + TestCreateReplication(runtime, ++t.TxId, "/MyRoot", DefaultScheme("Replication0")); t.TestEnv->TestWaitNotification(runtime, t.TxId); TestLs(runtime, "/MyRoot/Replication0", false, NLs::PathExist); @@ -47,9 +65,7 @@ Y_UNIT_TEST_SUITE(TReplicationWithRebootsTests) { for (int i = 1; i <= 3; ++i) { auto name = Sprintf("Replication%d", i); - auto request = CreateReplicationRequest(++t.TxId, "/MyRoot", Sprintf(R"( - Name: "%s" - )", name.c_str())); + auto request = CreateReplicationRequest(++t.TxId, "/MyRoot", DefaultScheme(name)); t.TestEnv->ReliablePropose(runtime, request, { NKikimrScheme::StatusAccepted, @@ -82,6 +98,8 @@ Y_UNIT_TEST_SUITE(TReplicationWithRebootsTests) { Y_UNIT_TEST(CreateDropRecreate) { TTestWithReboots t(false); + t.GetTestEnvOptions().InitYdbDriver(true); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { { TInactiveZone inactive(activeZone); @@ -89,9 +107,7 @@ Y_UNIT_TEST_SUITE(TReplicationWithRebootsTests) { } { - auto request = CreateReplicationRequest(++t.TxId, "/MyRoot", R"( - Name: "Replication" - )"); + auto request = CreateReplicationRequest(++t.TxId, "/MyRoot", DefaultScheme("Replication")); t.TestEnv->ReliablePropose(runtime, request, { NKikimrScheme::StatusAccepted, NKikimrScheme::StatusAlreadyExists, @@ -112,9 +128,7 @@ Y_UNIT_TEST_SUITE(TReplicationWithRebootsTests) { TestLs(runtime, "/MyRoot/Replication", false, NLs::PathNotExist); { - auto request = CreateReplicationRequest(++t.TxId, "/MyRoot", R"( - Name: "Replication" - )"); + auto request = CreateReplicationRequest(++t.TxId, "/MyRoot", DefaultScheme("Replication")); t.TestEnv->ReliablePropose(runtime, request, { NKikimrScheme::StatusAccepted, NKikimrScheme::StatusAlreadyExists, |