aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-02-13 19:50:39 +0300
committerilnaz <ilnaz@ydb.tech>2023-02-13 19:50:39 +0300
commitd25d2081f180c3ccd714acfc88b87341559d38e5 (patch)
treece43ca2d4f3704718c8c4233437f7738b147e654
parent243a672a56f96b4fb637f3fe1f749e7b8c576530 (diff)
downloadydb-d25d2081f180c3ccd714acfc88b87341559d38e5.tar.gz
Drop replication
-rw-r--r--ydb/core/protos/counters_replication.proto16
-rw-r--r--ydb/core/protos/services.proto2
-rw-r--r--ydb/core/tx/replication/controller/CMakeLists.darwin.txt4
-rw-r--r--ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/core/tx/replication/controller/CMakeLists.linux.txt4
-rw-r--r--ydb/core/tx/replication/controller/controller.cpp28
-rw-r--r--ydb/core/tx/replication/controller/controller_impl.h9
-rw-r--r--ydb/core/tx/replication/controller/dst_remover.cpp185
-rw-r--r--ydb/core/tx/replication/controller/dst_remover.h12
-rw-r--r--ydb/core/tx/replication/controller/private_events.cpp52
-rw-r--r--ydb/core/tx/replication/controller/private_events.h34
-rw-r--r--ydb/core/tx/replication/controller/replication.cpp27
-rw-r--r--ydb/core/tx/replication/controller/replication.h13
-rw-r--r--ydb/core/tx/replication/controller/stream_remover.cpp101
-rw-r--r--ydb/core/tx/replication/controller/stream_remover.h12
-rw-r--r--ydb/core/tx/replication/controller/target_base.cpp7
-rw-r--r--ydb/core/tx/replication/controller/target_with_stream.cpp8
-rw-r--r--ydb/core/tx/replication/controller/tx_create_dst_result.cpp8
-rw-r--r--ydb/core/tx/replication/controller/tx_create_stream_result.cpp8
-rw-r--r--ydb/core/tx/replication/controller/tx_discovery_result.cpp7
-rw-r--r--ydb/core/tx/replication/controller/tx_drop_dst_result.cpp83
-rw-r--r--ydb/core/tx/replication/controller/tx_drop_replication.cpp108
-rw-r--r--ydb/core/tx/replication/controller/tx_drop_stream_result.cpp83
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt1
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/test_env.cpp5
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/test_env.h4
-rw-r--r--ydb/core/tx/schemeshard/ut_replication.cpp46
-rw-r--r--ydb/core/tx/schemeshard/ut_replication_reboots.cpp44
30 files changed, 861 insertions, 56 deletions
diff --git a/ydb/core/protos/counters_replication.proto b/ydb/core/protos/counters_replication.proto
index b3c86ad17e5..e115e72867c 100644
--- a/ydb/core/protos/counters_replication.proto
+++ b/ydb/core/protos/counters_replication.proto
@@ -23,12 +23,14 @@ enum EPercentileCounters {
}
enum ETxTypes {
- TXTYPE_INIT_SCHEMA = 0 [(TxTypeOpts) = {Name: "TxInitSchema"}];
- TXTYPE_INIT = 1 [(TxTypeOpts) = {Name: "TxInit"}];
- TXTYPE_CREATE_REPLICATION = 2 [(TxTypeOpts) = {Name: "TxCreateReplication"}];
- TXTYPE_DROP_REPLICATION = 3 [(TxTypeOpts) = {Name: "TxDropReplication"}];
- TXTYPE_DISCOVERY_RESULT = 4 [(TxTypeOpts) = {Name: "TxDiscoveryResult"}];
- TXTYPE_ASSIGN_STREAM_NAME = 5 [(TxTypeOpts) = {Name: "TxAssignStreamName"}];
+ TXTYPE_INIT_SCHEMA = 0 [(TxTypeOpts) = {Name: "TxInitSchema"}];
+ TXTYPE_INIT = 1 [(TxTypeOpts) = {Name: "TxInit"}];
+ TXTYPE_CREATE_REPLICATION = 2 [(TxTypeOpts) = {Name: "TxCreateReplication"}];
+ TXTYPE_DROP_REPLICATION = 3 [(TxTypeOpts) = {Name: "TxDropReplication"}];
+ TXTYPE_DISCOVERY_RESULT = 4 [(TxTypeOpts) = {Name: "TxDiscoveryResult"}];
+ TXTYPE_ASSIGN_STREAM_NAME = 5 [(TxTypeOpts) = {Name: "TxAssignStreamName"}];
TXTYPE_CREATE_STREAM_RESULT = 6 [(TxTypeOpts) = {Name: "TxCreateStreamResult"}];
- TXTYPE_CREATE_DST_RESULT = 7 [(TxTypeOpts) = {Name: "TxCreateDstResult"}];
+ TXTYPE_CREATE_DST_RESULT = 7 [(TxTypeOpts) = {Name: "TxCreateDstResult"}];
+ TXTYPE_DROP_STREAM_RESULT = 8 [(TxTypeOpts) = {Name: "TxDropStreamResult"}];
+ TXTYPE_DROP_DST_RESULT = 9 [(TxTypeOpts) = {Name: "TxDropDstResult"}];
}
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 837a9d4e6d6..f868ce8bdc6 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -964,5 +964,7 @@ message TActivity {
SCHEMESHARD_BORROWED_COMPACTION = 601;
CDC_STREAM_SCAN_ACTOR = 602;
SCHEMESHARD_CDC_STREAM_SCAN_FINALIZER = 603;
+ REPLICATION_CONTROLLER_STREAM_REMOVER = 604;
+ REPLICATION_CONTROLLER_DST_REMOVER = 605;
};
};
diff --git a/ydb/core/tx/replication/controller/CMakeLists.darwin.txt b/ydb/core/tx/replication/controller/CMakeLists.darwin.txt
index b5e406fb9f2..eb802ebe22c 100644
--- a/ydb/core/tx/replication/controller/CMakeLists.darwin.txt
+++ b/ydb/core/tx/replication/controller/CMakeLists.darwin.txt
@@ -27,10 +27,12 @@ target_sources(tx-replication-controller PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/discoverer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_creator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_remover.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/logging.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/private_events.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_creator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_remover.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/sys_params.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_base.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_table.cpp
@@ -40,7 +42,9 @@ target_sources(tx-replication-controller PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_stream_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_result.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_replication.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_init.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_init_schema.cpp
)
diff --git a/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt b/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt
index 49eef8a42b8..cd653db9c3e 100644
--- a/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt
@@ -28,10 +28,12 @@ target_sources(tx-replication-controller PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/discoverer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_creator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_remover.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/logging.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/private_events.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_creator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_remover.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/sys_params.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_base.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_table.cpp
@@ -41,7 +43,9 @@ target_sources(tx-replication-controller PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_stream_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_result.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_replication.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_init.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_init_schema.cpp
)
diff --git a/ydb/core/tx/replication/controller/CMakeLists.linux.txt b/ydb/core/tx/replication/controller/CMakeLists.linux.txt
index 49eef8a42b8..cd653db9c3e 100644
--- a/ydb/core/tx/replication/controller/CMakeLists.linux.txt
+++ b/ydb/core/tx/replication/controller/CMakeLists.linux.txt
@@ -28,10 +28,12 @@ target_sources(tx-replication-controller PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/discoverer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_creator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_remover.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/logging.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/private_events.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_creator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_remover.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/sys_params.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_base.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_table.cpp
@@ -41,7 +43,9 @@ target_sources(tx-replication-controller PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_stream_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_result.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_replication.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_init.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_init_schema.cpp
)
diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp
index 89fa8c4b85d..63e1ef07a1c 100644
--- a/ydb/core/tx/replication/controller/controller.cpp
+++ b/ydb/core/tx/replication/controller/controller.cpp
@@ -49,10 +49,13 @@ STFUNC(TController::StateWork) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvController::TEvCreateReplication, Handle);
HFunc(TEvController::TEvDropReplication, Handle);
+ HFunc(TEvPrivate::TEvDropReplication, Handle);
HFunc(TEvPrivate::TEvDiscoveryResult, Handle);
HFunc(TEvPrivate::TEvAssignStreamName, Handle);
HFunc(TEvPrivate::TEvCreateStreamResult, Handle);
+ HFunc(TEvPrivate::TEvDropStreamResult, Handle);
HFunc(TEvPrivate::TEvCreateDstResult, Handle);
+ HFunc(TEvPrivate::TEvDropDstResult, Handle);
HFunc(TEvents::TEvPoison, Handle);
}
}
@@ -84,6 +87,11 @@ void TController::Handle(TEvController::TEvDropReplication::TPtr& ev, const TAct
RunTxDropReplication(ev, ctx);
}
+void TController::Handle(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx) {
+ CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+ RunTxDropReplication(ev, ctx);
+}
+
void TController::Handle(TEvPrivate::TEvDiscoveryResult::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
RunTxDiscoveryResult(ev, ctx);
@@ -99,11 +107,21 @@ void TController::Handle(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TAct
RunTxCreateStreamResult(ev, ctx);
}
+void TController::Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx) {
+ CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+ RunTxDropStreamResult(ev, ctx);
+}
+
void TController::Handle(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
RunTxCreateDstResult(ev, ctx);
}
+void TController::Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx) {
+ CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+ RunTxDropDstResult(ev, ctx);
+}
+
void TController::Handle(TEvents::TEvPoison::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
@@ -133,6 +151,16 @@ TReplication::TPtr TController::Find(const TPathId& pathId) {
return it->second;
}
+void TController::Remove(ui64 id) {
+ auto it = Replications.find(id);
+ if (it == Replications.end()) {
+ return;
+ }
+
+ ReplicationsByPathId.erase(it->second->GetPathId());
+ Replications.erase(it);
+}
+
} // NController
IActor* CreateController(const TActorId& tablet, TTabletStorageInfo* info) {
diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h
index f09fdc9e88a..69defe2fdf3 100644
--- a/ydb/core/tx/replication/controller/controller_impl.h
+++ b/ydb/core/tx/replication/controller/controller_impl.h
@@ -62,10 +62,13 @@ private:
// handlers
void Handle(TEvController::TEvCreateReplication::TPtr& ev, const TActorContext& ctx);
void Handle(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvDiscoveryResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvents::TEvPoison::TPtr& ev, const TActorContext& ctx);
// local transactions
@@ -76,17 +79,22 @@ private:
class TTxDiscoveryResult;
class TTxAssignStreamName;
class TTxCreateStreamResult;
+ class TTxDropStreamResult;
class TTxCreateDstResult;
+ class TTxDropDstResult;
// tx runners
void RunTxInitSchema(const TActorContext& ctx);
void RunTxInit(const TActorContext& ctx);
void RunTxCreateReplication(TEvController::TEvCreateReplication::TPtr& ev, const TActorContext& ctx);
void RunTxDropReplication(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
+ void RunTxDropReplication(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
void RunTxDiscoveryResult(TEvPrivate::TEvDiscoveryResult::TPtr& ev, const TActorContext& ctx);
void RunTxAssignStreamName(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx);
void RunTxCreateStreamResult(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx);
+ void RunTxDropStreamResult(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx);
void RunTxCreateDstResult(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx);
+ void RunTxDropDstResult(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx);
// other
template <typename T>
@@ -106,6 +114,7 @@ private:
TReplication::TPtr Find(ui64 id);
TReplication::TPtr Find(const TPathId& pathId);
+ void Remove(ui64 id);
private:
const TTabletLogPrefix LogPrefix;
diff --git a/ydb/core/tx/replication/controller/dst_remover.cpp b/ydb/core/tx/replication/controller/dst_remover.cpp
new file mode 100644
index 00000000000..0d4bf9d5eb2
--- /dev/null
+++ b/ydb/core/tx/replication/controller/dst_remover.cpp
@@ -0,0 +1,185 @@
+#include "dst_remover.h"
+#include "logging.h"
+#include "private_events.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+
+#include <ydb/core/base/tablet_pipecache.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
+
+namespace NKikimr::NReplication::NController {
+
+using namespace NSchemeShard;
+
+class TDstRemover: public TActorBootstrapped<TDstRemover> {
+ void AllocateTxId() {
+ Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId);
+ Become(&TThis::StateAllocateTxId);
+ }
+
+ STATEFN(StateAllocateTxId) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle);
+ default:
+ return StateBase(ev, TlsActivationContext->AsActorContext());
+ }
+ }
+
+ void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
+ LOG_T("Handle " << ev->Get()->ToString());
+
+ TxId = ev->Get()->TxId;
+ PipeCache = ev->Get()->Services.LeaderPipeCache;
+ DropDst();
+ }
+
+ void DropDst() {
+ auto ev = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(TxId, SchemeShardId);
+ auto& tx = *ev->Record.AddTransaction();
+ tx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropTable);
+ tx.MutableDrop()->SetId(DstPathId.LocalPathId);
+
+ Send(PipeCache, new TEvPipeCache::TEvForward(ev.Release(), SchemeShardId, true));
+ Become(&TThis::StateDropDst);
+ }
+
+ STATEFN(StateDropDst) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle);
+ hFunc(TEvSchemeShard::TEvNotifyTxCompletionResult, Handle);
+ sFunc(TEvents::TEvWakeup, AllocateTxId);
+ default:
+ return StateBase(ev, TlsActivationContext->AsActorContext());
+ }
+ }
+
+ void Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev) {
+ LOG_T("Handle " << ev->Get()->ToString());
+ const auto& record = ev->Get()->Record;
+
+ switch (record.GetStatus()) {
+ case NKikimrScheme::StatusAccepted:
+ Y_VERIFY_DEBUG(TxId == record.GetTxId());
+ return SubscribeTx(record.GetTxId());
+ case NKikimrScheme::StatusMultipleModifications:
+ if (record.HasPathDropTxId()) {
+ return SubscribeTx(record.GetPathDropTxId());
+ } else {
+ return Error(record.GetStatus(), record.GetReason());
+ }
+ break;
+ case NKikimrScheme::StatusPathDoesNotExist:
+ return Success();
+ default:
+ return Error(record.GetStatus(), record.GetReason());
+ }
+ }
+
+ void SubscribeTx(ui64 txId) {
+ LOG_D("Subscribe tx"
+ << ": txId# " << txId);
+ Send(PipeCache, new TEvPipeCache::TEvForward(new TEvSchemeShard::TEvNotifyTxCompletion(txId), SchemeShardId));
+ }
+
+ void Handle(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) {
+ LOG_T("Handle " << ev->Get()->ToString());
+ Success();
+ }
+
+ void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
+ LOG_T("Handle " << ev->Get()->ToString());
+
+ if (SchemeShardId == ev->Get()->TabletId) {
+ return;
+ }
+
+ Retry();
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr& ev) {
+ LOG_T("Handle " << ev->Get()->ToString());
+ Retry();
+ }
+
+ void Success() {
+ LOG_I("Success");
+
+ Send(Parent, new TEvPrivate::TEvDropDstResult(ReplicationId, TargetId));
+ PassAway();
+ }
+
+ void Error(NKikimrScheme::EStatus status, const TString& error) {
+ LOG_E("Error"
+ << ": status# " << status
+ << ", reason# " << error);
+
+ Send(Parent, new TEvPrivate::TEvDropDstResult(ReplicationId, TargetId, status, error));
+ PassAway();
+ }
+
+ void Retry() {
+ LOG_D("Retry");
+ Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup);
+ }
+
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::REPLICATION_CONTROLLER_DST_REMOVER;
+ }
+
+ explicit TDstRemover(
+ const TActorId& parent,
+ ui64 schemeShardId,
+ const TActorId& proxy,
+ ui64 rid,
+ ui64 tid,
+ const TPathId& dstPathId)
+ : Parent(parent)
+ , SchemeShardId(schemeShardId)
+ , YdbProxy(proxy)
+ , ReplicationId(rid)
+ , TargetId(tid)
+ , DstPathId(dstPathId)
+ , LogPrefix("DstRemover", ReplicationId, TargetId)
+ {
+ }
+
+ void Bootstrap() {
+ if (!DstPathId) {
+ Success();
+ } else {
+ AllocateTxId();
+ }
+ }
+
+ STATEFN(StateBase) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+private:
+ const TActorId Parent;
+ const ui64 SchemeShardId;
+ const TActorId YdbProxy;
+ const ui64 ReplicationId;
+ const ui64 TargetId;
+ const TPathId DstPathId;
+ const TActorLogPrefix LogPrefix;
+
+ ui64 TxId = 0;
+ TActorId PipeCache;
+
+}; // TDstRemover
+
+IActor* CreateDstRemover(const TActorId& parent, ui64 schemeShardId, const TActorId& proxy,
+ ui64 rid, ui64 tid, const TPathId& dstPathId)
+{
+ return new TDstRemover(parent, schemeShardId, proxy, rid, tid, dstPathId);
+}
+
+}
diff --git a/ydb/core/tx/replication/controller/dst_remover.h b/ydb/core/tx/replication/controller/dst_remover.h
new file mode 100644
index 00000000000..687c3607242
--- /dev/null
+++ b/ydb/core/tx/replication/controller/dst_remover.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include "replication.h"
+
+#include <ydb/core/base/defs.h>
+
+namespace NKikimr::NReplication::NController {
+
+IActor* CreateDstRemover(const TActorId& parent, ui64 schemeShardId, const TActorId& proxy,
+ ui64 rid, ui64 tid, const TPathId& dstPathId);
+
+}
diff --git a/ydb/core/tx/replication/controller/private_events.cpp b/ydb/core/tx/replication/controller/private_events.cpp
index 3a730b4cb7a..c54f4d149a5 100644
--- a/ydb/core/tx/replication/controller/private_events.cpp
+++ b/ydb/core/tx/replication/controller/private_events.cpp
@@ -93,6 +93,58 @@ bool TEvPrivate::TEvCreateDstResult::IsSuccess() const {
return Status == NKikimrScheme::StatusSuccess;
}
+TEvPrivate::TEvDropStreamResult::TEvDropStreamResult(ui64 rid, ui64 tid, NYdb::TStatus&& status)
+ : ReplicationId(rid)
+ , TargetId(tid)
+ , Status(std::move(status))
+{
+}
+
+TString TEvPrivate::TEvDropStreamResult::ToString() const {
+ return TStringBuilder() << ToStringHeader() << " {"
+ << " ReplicationId: " << ReplicationId
+ << " TargetId: " << TargetId
+ << " Status: " << Status.GetStatus()
+ << " Issues: " << Status.GetIssues().ToOneLineString()
+ << " }";
+}
+
+bool TEvPrivate::TEvDropStreamResult::IsSuccess() const {
+ return Status.IsSuccess();
+}
+
+TEvPrivate::TEvDropDstResult::TEvDropDstResult(ui64 rid, ui64 tid, NKikimrScheme::EStatus status, const TString& error)
+ : ReplicationId(rid)
+ , TargetId(tid)
+ , Status(status)
+ , Error(error)
+{
+}
+
+TString TEvPrivate::TEvDropDstResult::ToString() const {
+ return TStringBuilder() << ToStringHeader() << " {"
+ << " ReplicationId: " << ReplicationId
+ << " TargetId: " << TargetId
+ << " Status: " << NKikimrScheme::EStatus_Name(Status)
+ << " Error: " << Error
+ << " }";
+}
+
+bool TEvPrivate::TEvDropDstResult::IsSuccess() const {
+ return Status == NKikimrScheme::StatusSuccess;
+}
+
+TEvPrivate::TEvDropReplication::TEvDropReplication(ui64 rid)
+ : ReplicationId(rid)
+{
+}
+
+TString TEvPrivate::TEvDropReplication::ToString() const {
+ return TStringBuilder() << ToStringHeader() << " {"
+ << " ReplicationId: " << ReplicationId
+ << " }";
+}
+
}
Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::NController::TEvPrivate::TEvDiscoveryResult::TAddEntry, stream, value) {
diff --git a/ydb/core/tx/replication/controller/private_events.h b/ydb/core/tx/replication/controller/private_events.h
index 35ba09ce11c..475aa59c165 100644
--- a/ydb/core/tx/replication/controller/private_events.h
+++ b/ydb/core/tx/replication/controller/private_events.h
@@ -15,6 +15,9 @@ struct TEvPrivate {
EvAssignStreamName,
EvCreateStreamResult,
EvCreateDstResult,
+ EvDropStreamResult,
+ EvDropDstResult,
+ EvDropReplication,
EvEnd,
};
@@ -70,6 +73,37 @@ struct TEvPrivate {
bool IsSuccess() const;
};
+ struct TEvDropStreamResult: public TEventLocal<TEvDropStreamResult, EvDropStreamResult> {
+ const ui64 ReplicationId;
+ const ui64 TargetId;
+ const NYdb::TStatus Status;
+
+ explicit TEvDropStreamResult(ui64 rid, ui64 tid, NYdb::TStatus&& status);
+ TString ToString() const override;
+
+ bool IsSuccess() const;
+ };
+
+ struct TEvDropDstResult: public TEventLocal<TEvDropDstResult, EvDropDstResult> {
+ const ui64 ReplicationId;
+ const ui64 TargetId;
+ const NKikimrScheme::EStatus Status;
+ const TString Error;
+
+ explicit TEvDropDstResult(ui64 rid, ui64 tid,
+ NKikimrScheme::EStatus status = NKikimrScheme::StatusSuccess, const TString& error = {});
+ TString ToString() const override;
+
+ bool IsSuccess() const;
+ };
+
+ struct TEvDropReplication: public TEventLocal<TEvDropReplication, EvDropReplication> {
+ const ui64 ReplicationId;
+
+ explicit TEvDropReplication(ui64 rid);
+ TString ToString() const override;
+ };
+
}; // TEvPrivate
}
diff --git a/ydb/core/tx/replication/controller/replication.cpp b/ydb/core/tx/replication/controller/replication.cpp
index ffa7fe297fc..467ad27b227 100644
--- a/ydb/core/tx/replication/controller/replication.cpp
+++ b/ydb/core/tx/replication/controller/replication.cpp
@@ -1,4 +1,5 @@
#include "discoverer.h"
+#include "private_events.h"
#include "replication.h"
#include "target_table.h"
#include "util.h"
@@ -79,6 +80,10 @@ public:
: nullptr;
}
+ void RemoveTarget(ui64 id) {
+ Targets.erase(id);
+ }
+
void Progress(const TActorContext& ctx) {
if (!YdbProxy) {
THolder<IActor> ydbProxy;
@@ -104,7 +109,11 @@ public:
return ProgressTargets(ctx);
}
case EState::Removing:
- return; // TODO
+ if (!Targets) {
+ return (void)ctx.Send(ctx.SelfID, new TEvPrivate::TEvDropReplication(ReplicationId));
+ } else {
+ return ProgressTargets(ctx);
+ }
case EState::Error:
return;
}
@@ -183,6 +192,10 @@ TReplication::ITarget* TReplication::FindTarget(ui64 id) {
return Impl->FindTarget(id);
}
+void TReplication::RemoveTarget(ui64 id) {
+ return Impl->RemoveTarget(id);
+}
+
void TReplication::Progress(const TActorContext& ctx) {
Impl->Progress(ctx);
}
@@ -195,6 +208,10 @@ ui64 TReplication::GetId() const {
return Impl->ReplicationId;
}
+const TPathId& TReplication::GetPathId() const {
+ return Impl->PathId;
+}
+
void TReplication::SetState(EState state, TString issue) {
Impl->SetState(state, issue);
}
@@ -215,6 +232,14 @@ ui64 TReplication::GetNextTargetId() const {
return Impl->NextTargetId;
}
+void TReplication::SetDropOp(const TActorId& sender, const std::pair<ui64, ui32>& opId) {
+ DropOp = {sender, opId};
+}
+
+const std::optional<TReplication::TDropOp>& TReplication::GetDropOp() const {
+ return DropOp;
+}
+
}
Y_DECLARE_OUT_SPEC(, NKikimrReplication::TReplicationConfig::TargetCase, stream, value) {
diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h
index 18c143d8637..db1ab27046e 100644
--- a/ydb/core/tx/replication/controller/replication.h
+++ b/ydb/core/tx/replication/controller/replication.h
@@ -8,6 +8,7 @@
#include <util/generic/ptr.h>
#include <memory>
+#include <optional>
namespace NKikimrReplication {
class TReplicationConfig;
@@ -41,6 +42,7 @@ public:
Creating,
Ready,
Removing,
+ Removed,
Error = 255
};
@@ -70,6 +72,11 @@ public:
virtual void Shutdown(const TActorContext& ctx) = 0;
};
+ struct TDropOp {
+ TActorId Sender;
+ std::pair<ui64, ui32> OperationId; // txId, partId
+ };
+
public:
explicit TReplication(ui64 id, const TPathId& pathId, const NKikimrReplication::TReplicationConfig& config);
explicit TReplication(ui64 id, const TPathId& pathId, NKikimrReplication::TReplicationConfig&& config);
@@ -79,11 +86,13 @@ public:
ITarget* AddTarget(ui64 id, ETargetKind kind, const TString& srcPath, const TString& dstPath);
const ITarget* FindTarget(ui64 id) const;
ITarget* FindTarget(ui64 id);
+ void RemoveTarget(ui64 id);
void Progress(const TActorContext& ctx);
void Shutdown(const TActorContext& ctx);
ui64 GetId() const;
+ const TPathId& GetPathId() const;
void SetState(EState state, TString issue = {});
EState GetState() const;
const TString& GetIssue() const;
@@ -91,9 +100,13 @@ public:
void SetNextTargetId(ui64 value);
ui64 GetNextTargetId() const;
+ void SetDropOp(const TActorId& sender, const std::pair<ui64, ui32>& opId);
+ const std::optional<TDropOp>& GetDropOp() const;
+
private:
class TImpl;
std::shared_ptr<TImpl> Impl;
+ std::optional<TDropOp> DropOp;
}; // TReplication
diff --git a/ydb/core/tx/replication/controller/stream_remover.cpp b/ydb/core/tx/replication/controller/stream_remover.cpp
new file mode 100644
index 00000000000..75fcbf31ca9
--- /dev/null
+++ b/ydb/core/tx/replication/controller/stream_remover.cpp
@@ -0,0 +1,101 @@
+#include "logging.h"
+#include "private_events.h"
+#include "stream_remover.h"
+#include "util.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+
+#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
+
+namespace NKikimr::NReplication::NController {
+
+class TStreamRemover: public TActorBootstrapped<TStreamRemover> {
+ void DropStream() {
+ switch (Kind) {
+ case TReplication::ETargetKind::Table:
+ Send(YdbProxy, new TEvYdbProxy::TEvAlterTableRequest(SrcPath, NYdb::NTable::TAlterTableSettings()
+ .AppendDropChangefeeds(StreamName)));
+ break;
+ }
+
+ Become(&TThis::StateWork);
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvYdbProxy::TEvAlterTableResponse, Handle);
+ sFunc(TEvents::TEvWakeup, DropStream);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+ void Handle(TEvYdbProxy::TEvAlterTableResponse::TPtr& ev) {
+ LOG_T("Handle " << ev->Get()->ToString());
+ auto& result = ev->Get()->Result;
+
+ if (!result.IsSuccess()) {
+ if (IsRetryableError(result)) {
+ LOG_D("Retry");
+ return Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup);
+ }
+
+ LOG_E("Error"
+ << ": status# " << result.GetStatus()
+ << ", issues# " << result.GetIssues().ToOneLineString());
+ } else {
+ LOG_I("Success"
+ << ": issues# " << result.GetIssues().ToOneLineString());
+ }
+
+ Send(Parent, new TEvPrivate::TEvDropStreamResult(ReplicationId, TargetId, std::move(result)));
+ PassAway();
+ }
+
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::REPLICATION_CONTROLLER_STREAM_REMOVER;
+ }
+
+ explicit TStreamRemover(
+ const TActorId& parent,
+ const TActorId& proxy,
+ ui64 rid,
+ ui64 tid,
+ TReplication::ETargetKind kind,
+ const TString& srcPath,
+ const TString& streamName)
+ : Parent(parent)
+ , YdbProxy(proxy)
+ , ReplicationId(rid)
+ , TargetId(tid)
+ , Kind(kind)
+ , SrcPath(srcPath)
+ , StreamName(streamName)
+ , LogPrefix("StreamRemover", ReplicationId, TargetId)
+ {
+ }
+
+ void Bootstrap() {
+ DropStream();
+ }
+
+private:
+ const TActorId Parent;
+ const TActorId YdbProxy;
+ const ui64 ReplicationId;
+ const ui64 TargetId;
+ const TReplication::ETargetKind Kind;
+ const TString SrcPath;
+ const TString StreamName;
+ const TActorLogPrefix LogPrefix;
+
+}; // TStreamRemover
+
+IActor* CreateStreamRemover(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
+ TReplication::ETargetKind kind, const TString& srcPath, const TString& streamName)
+{
+ return new TStreamRemover(parent, proxy, rid, tid, kind, srcPath, streamName);
+}
+
+}
diff --git a/ydb/core/tx/replication/controller/stream_remover.h b/ydb/core/tx/replication/controller/stream_remover.h
new file mode 100644
index 00000000000..97694e1f19e
--- /dev/null
+++ b/ydb/core/tx/replication/controller/stream_remover.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include "replication.h"
+
+#include <ydb/core/base/defs.h>
+
+namespace NKikimr::NReplication::NController {
+
+IActor* CreateStreamRemover(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
+ TReplication::ETargetKind kind, const TString& srcPath, const TString& streamName);
+
+}
diff --git a/ydb/core/tx/replication/controller/target_base.cpp b/ydb/core/tx/replication/controller/target_base.cpp
index 0adcb2e17ce..0e942fc5c94 100644
--- a/ydb/core/tx/replication/controller/target_base.cpp
+++ b/ydb/core/tx/replication/controller/target_base.cpp
@@ -1,4 +1,5 @@
#include "dst_creator.h"
+#include "dst_remover.h"
#include "target_base.h"
#include "util.h"
@@ -93,7 +94,11 @@ void TTargetBase::Progress(ui64 schemeShardId, const TActorId& proxy, const TAct
case EDstState::Ready:
break; // TODO
case EDstState::Removing:
- break; // TODO
+ if (!DstRemover) {
+ DstRemover = ctx.Register(CreateDstRemover(ctx.SelfID, schemeShardId, proxy,
+ ReplicationId, TargetId, DstPathId));
+ }
+ break;
case EDstState::Error:
break;
}
diff --git a/ydb/core/tx/replication/controller/target_with_stream.cpp b/ydb/core/tx/replication/controller/target_with_stream.cpp
index 841e01581c0..63296311f83 100644
--- a/ydb/core/tx/replication/controller/target_with_stream.cpp
+++ b/ydb/core/tx/replication/controller/target_with_stream.cpp
@@ -1,5 +1,6 @@
#include "private_events.h"
#include "stream_creator.h"
+#include "stream_remover.h"
#include "target_with_stream.h"
#include <library/cpp/actors/core/events.h>
@@ -18,8 +19,13 @@ void TTargetWithStream::Progress(ui64 schemeShardId, const TActorId& proxy, cons
}
return;
case EStreamState::Removing:
- return; // TODO
+ if (!StreamRemover) {
+ StreamRemover = ctx.Register(CreateStreamRemover(ctx.SelfID, proxy,
+ GetReplicationId(), GetTargetId(), GetTargetKind(), GetSrcPath(), GetStreamName()));
+ }
+ return;
case EStreamState::Ready:
+ case EStreamState::Removed:
case EStreamState::Error:
break;
}
diff --git a/ydb/core/tx/replication/controller/tx_create_dst_result.cpp b/ydb/core/tx/replication/controller/tx_create_dst_result.cpp
index bc825f5ff18..cd340495ea0 100644
--- a/ydb/core/tx/replication/controller/tx_create_dst_result.cpp
+++ b/ydb/core/tx/replication/controller/tx_create_dst_result.cpp
@@ -38,6 +38,14 @@ public:
return true;
}
+ if (target->GetDstState() != TReplication::EDstState::Creating) {
+ CLOG_W(ctx, "Dst state mismatch"
+ << ": rid# " << rid
+ << ", tid# " << tid
+ << ", state# " << target->GetDstState());
+ return true;
+ }
+
if (Ev->Get()->IsSuccess()) {
target->SetDstPathId(Ev->Get()->DstPathId);
target->SetDstState(TReplication::EDstState::Ready);
diff --git a/ydb/core/tx/replication/controller/tx_create_stream_result.cpp b/ydb/core/tx/replication/controller/tx_create_stream_result.cpp
index 37aff25ede1..ca9d0f645e5 100644
--- a/ydb/core/tx/replication/controller/tx_create_stream_result.cpp
+++ b/ydb/core/tx/replication/controller/tx_create_stream_result.cpp
@@ -38,6 +38,14 @@ public:
return true;
}
+ if (target->GetStreamState() != TReplication::EStreamState::Creating) {
+ CLOG_W(ctx, "Stream state mismatch"
+ << ": rid# " << rid
+ << ", tid# " << tid
+ << ", state# " << target->GetStreamState());
+ return true;
+ }
+
if (Ev->Get()->IsSuccess()) {
target->SetStreamState(TReplication::EStreamState::Ready);
diff --git a/ydb/core/tx/replication/controller/tx_discovery_result.cpp b/ydb/core/tx/replication/controller/tx_discovery_result.cpp
index 58fba085844..fca07c58eaa 100644
--- a/ydb/core/tx/replication/controller/tx_discovery_result.cpp
+++ b/ydb/core/tx/replication/controller/tx_discovery_result.cpp
@@ -32,6 +32,13 @@ public:
return true;
}
+ if (Replication->GetState() != TReplication::EState::Ready) {
+ CLOG_W(ctx, "Replication state mismatch"
+ << ": rid# " << rid
+ << ", state# " << Replication->GetState());
+ return true;
+ }
+
NIceDb::TNiceDb db(txc.DB);
if (Ev->Get()->IsSuccess()) {
diff --git a/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp b/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp
new file mode 100644
index 00000000000..3464db6a02c
--- /dev/null
+++ b/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp
@@ -0,0 +1,83 @@
+#include "controller_impl.h"
+
+namespace NKikimr::NReplication::NController {
+
+class TController::TTxDropDstResult: public TTxBase {
+ TEvPrivate::TEvDropDstResult::TPtr Ev;
+ TReplication::TPtr Replication;
+
+public:
+ explicit TTxDropDstResult(TController* self, TEvPrivate::TEvDropDstResult::TPtr& ev)
+ : TTxBase("TxDropDstResult", self)
+ , Ev(ev)
+ {
+ }
+
+ TTxType GetTxType() const override {
+ return TXTYPE_DROP_DST_RESULT;
+ }
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ CLOG_D(ctx, "Execute: " << Ev->Get()->ToString());
+
+ const auto rid = Ev->Get()->ReplicationId;
+ const auto tid = Ev->Get()->TargetId;
+
+ Replication = Self->Find(rid);
+ if (!Replication) {
+ CLOG_W(ctx, "Unknown replication"
+ << ": rid# " << rid);
+ return true;
+ }
+
+ auto* target = Replication->FindTarget(tid);
+ if (!target) {
+ CLOG_W(ctx, "Unknown target"
+ << ": rid# " << rid
+ << ", tid# " << tid);
+ return true;
+ }
+
+ if (target->GetDstState() != TReplication::EDstState::Removing) {
+ CLOG_W(ctx, "Dst state mismatch"
+ << ": rid# " << rid
+ << ", tid# " << tid
+ << ", state# " << target->GetDstState());
+ return true;
+ }
+
+ if (Ev->Get()->IsSuccess()) {
+ CLOG_N(ctx, "Target dst dropped"
+ << ": rid# " << rid
+ << ", tid# " << tid);
+ } else {
+ CLOG_E(ctx, "Drop dst error"
+ << ": rid# " << rid
+ << ", tid# " << tid
+ << ", " << NKikimrScheme::EStatus_Name(Ev->Get()->Status)
+ << ", " << Ev->Get()->Error);
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+ db.Table<Schema::Targets>().Key(rid, tid).Delete();
+ db.Table<Schema::SrcStreams>().Key(rid, tid).Delete();
+ Replication->RemoveTarget(tid);
+
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ CLOG_D(ctx, "Complete");
+
+ if (Replication) {
+ Replication->Progress(ctx);
+ }
+ }
+
+}; // TTxDropDstResult
+
+void TController::RunTxDropDstResult(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx) {
+ Execute(new TTxDropDstResult(this, ev), ctx);
+}
+
+}
diff --git a/ydb/core/tx/replication/controller/tx_drop_replication.cpp b/ydb/core/tx/replication/controller/tx_drop_replication.cpp
index 8557a4098fc..b59e519b3b9 100644
--- a/ydb/core/tx/replication/controller/tx_drop_replication.cpp
+++ b/ydb/core/tx/replication/controller/tx_drop_replication.cpp
@@ -3,14 +3,21 @@
namespace NKikimr::NReplication::NController {
class TController::TTxDropReplication: public TTxBase {
- TEvController::TEvDropReplication::TPtr Ev;
- THolder<TEvController::TEvDropReplicationResult> Result;
+ TEvController::TEvDropReplication::TPtr PubEv;
+ TEvPrivate::TEvDropReplication::TPtr PrivEv;
+ THolder<IEventHandle> Result; // TEvController::TEvDropReplicationResult
TReplication::TPtr Replication;
public:
explicit TTxDropReplication(TController* self, TEvController::TEvDropReplication::TPtr& ev)
: TTxBase("TxDropReplication", self)
- , Ev(ev)
+ , PubEv(ev)
+ {
+ }
+
+ explicit TTxDropReplication(TController* self, TEvPrivate::TEvDropReplication::TPtr& ev)
+ : TTxBase("TxDropReplication", self)
+ , PrivEv(ev)
{
}
@@ -19,36 +26,109 @@ public:
}
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
- CLOG_D(ctx, "Execute: " << Ev->Get()->ToString());
+ if (PubEv) {
+ return ExecutePub(txc, ctx);
+ } else if (PrivEv) {
+ return ExecutePriv(txc, ctx);
+ } else {
+ Y_FAIL("unreachable");
+ }
+ }
- const auto& record = Ev->Get()->Record;
- Result = MakeHolder<TEvController::TEvDropReplicationResult>();
- Result->Record.MutableOperationId()->CopyFrom(record.GetOperationId());
- Result->Record.SetOrigin(Self->TabletID());
+ bool ExecutePub(TTransactionContext& txc, const TActorContext& ctx) {
+ CLOG_D(ctx, "Execute: " << PubEv->Get()->ToString());
+ const auto& record = PubEv->Get()->Record;
const auto pathId = PathIdFromPathId(record.GetPathId());
+ const auto& opId = record.GetOperationId();
Replication = Self->Find(pathId);
if (!Replication) {
CLOG_W(ctx, "Cannot drop unknown replication"
<< ": pathId# " << pathId);
- Result->Record.SetStatus(NKikimrReplication::TEvDropReplicationResult::NOT_FOUND);
+ auto ev = MakeHolder<TEvController::TEvDropReplicationResult>();
+ ev->Record.MutableOperationId()->CopyFrom(record.GetOperationId());
+ ev->Record.SetOrigin(Self->TabletID());
+ ev->Record.SetStatus(NKikimrReplication::TEvDropReplicationResult::NOT_FOUND);
+ Result = MakeHolder<IEventHandle>(PubEv->Sender, ctx.SelfID, ev.Release());
+
+ return true;
+ }
+
+ if (Replication->GetState() == TReplication::EState::Removing) {
+ Replication->SetDropOp(PubEv->Sender, std::make_pair(opId.GetTxId(), opId.GetPartId()));
return true;
}
NIceDb::TNiceDb db(txc.DB);
+
Replication->SetState(TReplication::EState::Removing);
db.Table<Schema::Replications>().Key(Replication->GetId()).Update(
NIceDb::TUpdate<Schema::Replications::State>(Replication->GetState())
);
+ for (ui64 tid = 0; tid < Replication->GetNextTargetId(); ++tid) {
+ auto* target = Replication->FindTarget(tid);
+ if (!target) {
+ continue;
+ }
+
+ target->Shutdown(ctx);
+
+ target->SetStreamState(TReplication::EStreamState::Removing);
+ db.Table<Schema::SrcStreams>().Key(Replication->GetId(), tid).Update(
+ NIceDb::TUpdate<Schema::SrcStreams::State>(target->GetStreamState())
+ );
+
+ target->SetDstState(TReplication::EDstState::Removing); // TODO: configurable
+ db.Table<Schema::Targets>().Key(Replication->GetId(), tid).Update(
+ NIceDb::TUpdate<Schema::Targets::DstState>(target->GetDstState())
+ );
+ }
+
CLOG_N(ctx, "Drop replication"
<< ": rid# " << Replication->GetId()
<< ", pathId# " << pathId);
- // TODO: delay response
- Result->Record.SetStatus(NKikimrReplication::TEvDropReplicationResult::SUCCESS);
+ Replication->SetDropOp(PubEv->Sender, std::make_pair(opId.GetTxId(), opId.GetPartId()));
+ return true;
+ }
+
+ bool ExecutePriv(TTransactionContext& txc, const TActorContext& ctx) {
+ CLOG_D(ctx, "Execute: " << PrivEv->Get()->ToString());
+
+ const auto rid = PrivEv->Get()->ReplicationId;
+ Replication = Self->Find(rid);
+
+ if (!Replication) {
+ CLOG_W(ctx, "Cannot drop unknown replication"
+ << ": rid# " << rid);
+ return true;
+ }
+
+ if (Replication->GetState() != TReplication::EState::Removing) {
+ CLOG_W(ctx, "Replication state mismatch"
+ << ": rid# " << rid
+ << ", state# " << Replication->GetState());
+ return true;
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+ db.Table<Schema::Replications>().Key(rid).Delete();
+
+ if (const auto& op = Replication->GetDropOp()) {
+ auto ev = MakeHolder<TEvController::TEvDropReplicationResult>();
+ ev->Record.MutableOperationId()->SetTxId(op->OperationId.first);
+ ev->Record.MutableOperationId()->SetPartId(op->OperationId.second);
+ ev->Record.SetOrigin(Self->TabletID());
+ ev->Record.SetStatus(NKikimrReplication::TEvDropReplicationResult::SUCCESS);
+ Result = MakeHolder<IEventHandle>(op->Sender, ctx.SelfID, ev.Release());
+ }
+
+ Self->Remove(rid);
+ Replication.Reset();
+
return true;
}
@@ -56,7 +136,7 @@ public:
CLOG_D(ctx, "Complete");
if (Result) {
- ctx.Send(Ev->Sender, Result.Release(), 0, Ev->Cookie);
+ ctx.Send(Result.Release());
}
if (Replication) {
@@ -70,4 +150,8 @@ void TController::RunTxDropReplication(TEvController::TEvDropReplication::TPtr&
Execute(new TTxDropReplication(this, ev), ctx);
}
+void TController::RunTxDropReplication(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx) {
+ Execute(new TTxDropReplication(this, ev), ctx);
+}
+
}
diff --git a/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp b/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp
new file mode 100644
index 00000000000..71f5a3782c9
--- /dev/null
+++ b/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp
@@ -0,0 +1,83 @@
+#include "controller_impl.h"
+
+namespace NKikimr::NReplication::NController {
+
+class TController::TTxDropStreamResult: public TTxBase {
+ TEvPrivate::TEvDropStreamResult::TPtr Ev;
+ TReplication::TPtr Replication;
+
+public:
+ explicit TTxDropStreamResult(TController* self, TEvPrivate::TEvDropStreamResult::TPtr& ev)
+ : TTxBase("TxDropStreamResult", self)
+ , Ev(ev)
+ {
+ }
+
+ TTxType GetTxType() const override {
+ return TXTYPE_DROP_STREAM_RESULT;
+ }
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ CLOG_D(ctx, "Execute: " << Ev->Get()->ToString());
+
+ const auto rid = Ev->Get()->ReplicationId;
+ const auto tid = Ev->Get()->TargetId;
+
+ Replication = Self->Find(rid);
+ if (!Replication) {
+ CLOG_W(ctx, "Unknown replication"
+ << ": rid# " << rid);
+ return true;
+ }
+
+ auto* target = Replication->FindTarget(tid);
+ if (!target) {
+ CLOG_W(ctx, "Unknown target"
+ << ": rid# " << rid
+ << ", tid# " << tid);
+ return true;
+ }
+
+ if (target->GetStreamState() != TReplication::EStreamState::Removing) {
+ CLOG_W(ctx, "Stream state mismatch"
+ << ": rid# " << rid
+ << ", tid# " << tid
+ << ", state# " << target->GetStreamState());
+ return true;
+ }
+
+ if (Ev->Get()->IsSuccess()) {
+ CLOG_N(ctx, "Stream dropped"
+ << ": rid# " << rid
+ << ", tid# " << tid);
+ } else {
+ const auto& status = Ev->Get()->Status;
+ CLOG_E(ctx, "Drop stream error"
+ << ": rid# " << rid
+ << ", tid# " << tid
+ << ", status# " << status.GetStatus()
+ << ", issue# " << status.GetIssues().ToOneLineString());
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+ target->SetStreamState(TReplication::EStreamState::Removed);
+ db.Table<Schema::SrcStreams>().Key(rid, tid).Update<Schema::SrcStreams::State>(target->GetStreamState());
+
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ CLOG_D(ctx, "Complete");
+
+ if (Replication) {
+ Replication->Progress(ctx);
+ }
+ }
+
+}; // TTxDropStreamResult
+
+void TController::RunTxDropStreamResult(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx) {
+ Execute(new TTxDropStreamResult(this, ev), ctx);
+}
+
+}
diff --git a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt
index f7e8181651d..e31d395fe60 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt
+++ b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt
@@ -32,6 +32,7 @@ target_link_libraries(tx-schemeshard-ut_helpers PUBLIC
core-tx-tx_allocator
core-tx-tx_proxy
public-lib-scheme_types
+ cpp-client-ydb_driver
)
target_sources(tx-schemeshard-ut_helpers PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp
diff --git a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt
index eba6438ce64..c1c282cef4e 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt
@@ -33,6 +33,7 @@ target_link_libraries(tx-schemeshard-ut_helpers PUBLIC
core-tx-tx_allocator
core-tx-tx_proxy
public-lib-scheme_types
+ cpp-client-ydb_driver
)
target_sources(tx-schemeshard-ut_helpers PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp
diff --git a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt
index eba6438ce64..c1c282cef4e 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt
+++ b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt
@@ -33,6 +33,7 @@ target_link_libraries(tx-schemeshard-ut_helpers PUBLIC
core-tx-tx_allocator
core-tx-tx_proxy
public-lib-scheme_types
+ cpp-client-ydb_driver
)
target_sources(tx-schemeshard-ut_helpers PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp
diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
index 28e5a8285ee..d6cec4e1edb 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
@@ -561,6 +561,11 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe
}
}
+ if (opts.InitYdbDriver_) {
+ YdbDriver = MakeHolder<NYdb::TDriver>(NYdb::TDriverConfig());
+ runtime.GetAppData().YdbDriver = YdbDriver.Get();
+ }
+
TActorId sender = runtime.AllocateEdgeActor();
//CreateTestBootstrapper(runtime, CreateTestTabletInfo(MakeBSControllerID(TTestTxConfig::DomainUid), TTabletTypes::BSController), &CreateFlatBsController);
BootSchemeShard(runtime, schemeRoot);
diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h
index 782593e48e5..e9ba642e839 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h
@@ -10,6 +10,8 @@
#include <ydb/core/tx/schemeshard/schemeshard_identificators.h>
#include <ydb/core/tx/schemeshard/schemeshard_import.h>
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+
#include <functional>
namespace NSchemeShardUT_Private {
@@ -33,6 +35,7 @@ namespace NSchemeShardUT_Private {
OPTION(ui32, NChannels, 4);
OPTION(bool, EnablePipeRetries, true);
OPTION(bool, RunFakeConfigDispatcher, false);
+ OPTION(bool, InitYdbDriver, false);
OPTION(std::optional<bool>, EnableSystemViews, std::nullopt);
OPTION(std::optional<bool>, EnablePersistentQueryStats, std::nullopt);
OPTION(std::optional<bool>, EnablePersistentPartitionStats, std::nullopt);
@@ -66,6 +69,7 @@ namespace NSchemeShardUT_Private {
TActorId TxReliablePropose;
ui32 ChannelsCount;
TActorId MeteringFake;
+ THolder<NYdb::TDriver> YdbDriver;
public:
TTestEnv(TTestActorRuntime& runtime, ui32 nchannels = 4, bool enablePipeRetries = true,
diff --git a/ydb/core/tx/schemeshard/ut_replication.cpp b/ydb/core/tx/schemeshard/ut_replication.cpp
index 2030564642d..cb581c9c1db 100644
--- a/ydb/core/tx/schemeshard/ut_replication.cpp
+++ b/ydb/core/tx/schemeshard/ut_replication.cpp
@@ -3,6 +3,24 @@
using namespace NSchemeShardUT_Private;
Y_UNIT_TEST_SUITE(TReplicationTests) {
+ static TString DefaultScheme(const TString& name) {
+ return Sprintf(R"(
+ Name: "%s"
+ Config {
+ StaticCredentials {
+ User: "user"
+ Password: "pwd"
+ }
+ Specific {
+ Targets {
+ SrcPath: "/MyRoot1/Table"
+ DstPath: "/MyRoot2/Table"
+ }
+ }
+ }
+ )", name.c_str());
+ }
+
void SetupLogging(TTestActorRuntimeBase& runtime) {
runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NActors::NLog::PRI_TRACE);
@@ -17,21 +35,19 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
Y_UNIT_TEST(Create) {
TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ TTestEnv env(runtime, TTestEnvOptions().InitYdbDriver(true));
ui64 txId = 100;
SetupLogging(runtime);
- TestCreateReplication(runtime, ++txId, "/MyRoot", R"(
- Name: "Replication"
- )");
+ TestCreateReplication(runtime, ++txId, "/MyRoot", DefaultScheme("Replication"));
env.TestWaitNotification(runtime, txId);
TestLs(runtime, "/MyRoot/Replication", false, NLs::PathExist);
}
Y_UNIT_TEST(CreateSequential) {
TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ TTestEnv env(runtime, TTestEnvOptions().InitYdbDriver(true));
ui64 txId = 100;
SetupLogging(runtime);
@@ -40,9 +56,7 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
for (int i = 0; i < 2; ++i) {
const auto name = Sprintf("Replication%d", i);
- TestCreateReplication(runtime, ++txId, "/MyRoot", Sprintf(R"(
- Name: "%s"
- )", name.c_str()));
+ TestCreateReplication(runtime, ++txId, "/MyRoot", DefaultScheme(name));
env.TestWaitNotification(runtime, txId);
const auto desc = DescribePath(runtime, "/MyRoot/" + name);
@@ -59,7 +73,7 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
Y_UNIT_TEST(CreateInParallel) {
TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ TTestEnv env(runtime, TTestEnvOptions().InitYdbDriver(true));
ui64 txId = 100;
SetupLogging(runtime);
@@ -72,9 +86,7 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
for (int j = 0; j < 2; ++j) {
auto name = Sprintf("Replication%d-%d", i, j);
- TestCreateReplication(runtime, ++txId, "/MyRoot", Sprintf(R"(
- Name: "%s"
- )", name.c_str()));
+ TestCreateReplication(runtime, ++txId, "/MyRoot", DefaultScheme(name));
names.push_back(std::move(name));
txIds.push_back(txId);
@@ -97,15 +109,13 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
Y_UNIT_TEST(CreateDropRecreate) {
TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ TTestEnv env(runtime, TTestEnvOptions().InitYdbDriver(true));
ui64 txId = 100;
SetupLogging(runtime);
ui64 controllerId = 0;
- TestCreateReplication(runtime, ++txId, "/MyRoot", R"(
- Name: "Replication"
- )");
+ TestCreateReplication(runtime, ++txId, "/MyRoot", DefaultScheme("Replication"));
env.TestWaitNotification(runtime, txId);
{
const auto desc = DescribePath(runtime, "/MyRoot/Replication");
@@ -117,9 +127,7 @@ Y_UNIT_TEST_SUITE(TReplicationTests) {
env.TestWaitNotification(runtime, txId);
TestDescribeResult(DescribePath(runtime, "/MyRoot/Replication"), {NLs::PathNotExist});
- TestCreateReplication(runtime, ++txId, "/MyRoot", R"(
- Name: "Replication"
- )");
+ TestCreateReplication(runtime, ++txId, "/MyRoot", DefaultScheme("Replication"));
env.TestWaitNotification(runtime, txId);
{
const auto desc = DescribePath(runtime, "/MyRoot/Replication");
diff --git a/ydb/core/tx/schemeshard/ut_replication_reboots.cpp b/ydb/core/tx/schemeshard/ut_replication_reboots.cpp
index 45d4e97640a..72043e1db25 100644
--- a/ydb/core/tx/schemeshard/ut_replication_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_replication_reboots.cpp
@@ -3,6 +3,24 @@
using namespace NSchemeShardUT_Private;
Y_UNIT_TEST_SUITE(TReplicationWithRebootsTests) {
+ static TString DefaultScheme(const TString& name) {
+ return Sprintf(R"(
+ Name: "%s"
+ Config {
+ StaticCredentials {
+ User: "user"
+ Password: "pwd"
+ }
+ Specific {
+ Targets {
+ SrcPath: "/MyRoot1/Table"
+ DstPath: "/MyRoot2/Table"
+ }
+ }
+ }
+ )", name.c_str());
+ }
+
void SetupLogging(TTestActorRuntimeBase& runtime) {
runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NActors::NLog::PRI_TRACE);
@@ -10,15 +28,15 @@ Y_UNIT_TEST_SUITE(TReplicationWithRebootsTests) {
Y_UNIT_TEST(Create) {
TTestWithReboots t(false);
+ t.GetTestEnvOptions().InitYdbDriver(true);
+
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
{
TInactiveZone inactive(activeZone);
SetupLogging(runtime);
}
- TestCreateReplication(runtime, ++t.TxId, "/MyRoot", R"(
- Name: "Replication"
- )");
+ TestCreateReplication(runtime, ++t.TxId, "/MyRoot", DefaultScheme("Replication"));
t.TestEnv->TestWaitNotification(runtime, t.TxId);
TestLs(runtime, "/MyRoot/Replication", false, NLs::PathExist);
@@ -27,15 +45,15 @@ Y_UNIT_TEST_SUITE(TReplicationWithRebootsTests) {
void CreateMultipleReplications(bool withInitialController) {
TTestWithReboots t(false);
+ t.GetTestEnvOptions().InitYdbDriver(true);
+
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
{
TInactiveZone inactive(activeZone);
SetupLogging(runtime);
if (withInitialController) {
- TestCreateReplication(runtime, ++t.TxId, "/MyRoot", R"(
- Name: "Replication0"
- )");
+ TestCreateReplication(runtime, ++t.TxId, "/MyRoot", DefaultScheme("Replication0"));
t.TestEnv->TestWaitNotification(runtime, t.TxId);
TestLs(runtime, "/MyRoot/Replication0", false, NLs::PathExist);
@@ -47,9 +65,7 @@ Y_UNIT_TEST_SUITE(TReplicationWithRebootsTests) {
for (int i = 1; i <= 3; ++i) {
auto name = Sprintf("Replication%d", i);
- auto request = CreateReplicationRequest(++t.TxId, "/MyRoot", Sprintf(R"(
- Name: "%s"
- )", name.c_str()));
+ auto request = CreateReplicationRequest(++t.TxId, "/MyRoot", DefaultScheme(name));
t.TestEnv->ReliablePropose(runtime, request, {
NKikimrScheme::StatusAccepted,
@@ -82,6 +98,8 @@ Y_UNIT_TEST_SUITE(TReplicationWithRebootsTests) {
Y_UNIT_TEST(CreateDropRecreate) {
TTestWithReboots t(false);
+ t.GetTestEnvOptions().InitYdbDriver(true);
+
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
{
TInactiveZone inactive(activeZone);
@@ -89,9 +107,7 @@ Y_UNIT_TEST_SUITE(TReplicationWithRebootsTests) {
}
{
- auto request = CreateReplicationRequest(++t.TxId, "/MyRoot", R"(
- Name: "Replication"
- )");
+ auto request = CreateReplicationRequest(++t.TxId, "/MyRoot", DefaultScheme("Replication"));
t.TestEnv->ReliablePropose(runtime, request, {
NKikimrScheme::StatusAccepted,
NKikimrScheme::StatusAlreadyExists,
@@ -112,9 +128,7 @@ Y_UNIT_TEST_SUITE(TReplicationWithRebootsTests) {
TestLs(runtime, "/MyRoot/Replication", false, NLs::PathNotExist);
{
- auto request = CreateReplicationRequest(++t.TxId, "/MyRoot", R"(
- Name: "Replication"
- )");
+ auto request = CreateReplicationRequest(++t.TxId, "/MyRoot", DefaultScheme("Replication"));
t.TestEnv->ReliablePropose(runtime, request, {
NKikimrScheme::StatusAccepted,
NKikimrScheme::StatusAlreadyExists,