aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-04-04 11:02:58 +0300
committerilnaz <ilnaz@ydb.tech>2023-04-04 11:02:58 +0300
commit22f1654932759a7f59bb4b16e0611e7a22940fa7 (patch)
tree2e218873de862ff1817c2d921c5a04fba7b62962
parent3252a2ccdda0f3fa0314aadf286d22d656bfdb9e (diff)
downloadydb-22f1654932759a7f59bb4b16e0611e7a22940fa7.tar.gz
Handle cdc streams that are planned to drop
-rw-r--r--ydb/core/tx/datashard/change_exchange_split.cpp5
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp26
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.cpp18
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.h10
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp33
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp18
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp45
7 files changed, 139 insertions, 16 deletions
diff --git a/ydb/core/tx/datashard/change_exchange_split.cpp b/ydb/core/tx/datashard/change_exchange_split.cpp
index a7fc8afe743..06c8a00bfd1 100644
--- a/ydb/core/tx/datashard/change_exchange_split.cpp
+++ b/ydb/core/tx/datashard/change_exchange_split.cpp
@@ -272,6 +272,11 @@ class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHel
return;
}
+ if (entry.Self && entry.Self->Info.GetPathState() == NKikimrSchemeOp::EPathStateDrop) {
+ LOG_N("Auto-ack (stream is planned to drop)");
+ return Ack();
+ }
+
Y_VERIFY(entry.ListNodeEntry->Children.size() == 1);
const auto& topic = entry.ListNodeEntry->Children.at(0);
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 533b001e8dc..0df10c6a89e 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -515,6 +515,14 @@ class TCdcChangeSenderMain
return;
}
+ if (entry.Self && entry.Self->Info.GetPathState() == NKikimrSchemeOp::EPathStateDrop) {
+ LOG_D("Stream is planned to drop, waiting for the EvRemoveSender command");
+
+ RemoveRecords();
+ KillSenders();
+ return Become(&TThis::StatePendingRemove);
+ }
+
Stream = TUserTable::TCdcStream(entry.CdcStreamInfo->Description);
Y_VERIFY(entry.ListNodeEntry->Children.size() == 1);
@@ -539,7 +547,7 @@ class TCdcChangeSenderMain
STATEFN(StateResolveTopic) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTopic);
- sFunc(TEvents::TEvWakeup, ResolveTopic);
+ sFunc(TEvents::TEvWakeup, ResolveCdcStream);
default:
return StateBase(ev, TlsActivationContext->AsActorContext());
}
@@ -641,7 +649,7 @@ class TCdcChangeSenderMain
}
void Resolve() override {
- ResolveTopic();
+ ResolveCdcStream();
}
bool IsResolved() const override {
@@ -726,6 +734,11 @@ class TCdcChangeSenderMain
PassAway();
}
+ void AutoRemove(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {
+ LOG_D("Handle " << ev->Get()->ToString());
+ RemoveRecords(std::move(ev->Get()->Records));
+ }
+
void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) {
RenderHtmlPage(ESenderType::CdcStream, ev, ctx);
}
@@ -763,6 +776,15 @@ public:
}
}
+ STFUNC(StatePendingRemove) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvChangeExchange::TEvEnqueueRecords, AutoRemove);
+ hFunc(TEvChangeExchange::TEvRemoveSender, Handle);
+ HFunc(NMon::TEvRemoteHttpInfo, Handle);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
private:
mutable TMaybe<TString> LogPrefix;
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp
index 201fd1c0a79..68094c6dad1 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.cpp
+++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp
@@ -39,11 +39,9 @@ void TBaseChangeSender::CreateSenders(const TVector<ui64>& partitionIds) {
}
void TBaseChangeSender::KillSenders() {
- for (const auto& [_, sender] : Senders) {
+ for (const auto& [_, sender] : std::exchange(Senders, {})) {
ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill());
}
-
- Senders.clear();
}
void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) {
@@ -183,13 +181,7 @@ void TBaseChangeSender::OnReady(ui64 partitionId) {
sender.Ready = true;
if (sender.Pending) {
- TVector<ui64> remove(Reserve(sender.Pending.size()));
- for (const auto& record : sender.Pending) {
- remove.push_back(record.Order);
- }
-
- ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
- sender.Pending.clear();
+ RemoveRecords(std::exchange(sender.Pending, {}));
}
SendRecords();
@@ -223,15 +215,15 @@ void TBaseChangeSender::RemoveRecords() {
TVector<ui64> remove(Reserve(Enqueued.size() + PendingBody.size() + PendingSent.size() + pendingStatus));
- for (const auto& record : Enqueued) {
+ for (const auto& record : std::exchange(Enqueued, {})) {
remove.push_back(record.Order);
}
- for (const auto& record : PendingBody) {
+ for (const auto& record : std::exchange(PendingBody, {})) {
remove.push_back(record.Order);
}
- for (const auto& [order, _] : PendingSent) {
+ for (const auto& [order, _] : std::exchange(PendingSent, {})) {
remove.push_back(order);
}
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h
index ce589025389..a5de292ecdf 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.h
+++ b/ydb/core/tx/datashard/change_sender_common_ops.h
@@ -93,6 +93,16 @@ class TBaseChangeSender: public IChangeSender {
void SendRecords();
protected:
+ template <typename T>
+ void RemoveRecords(TVector<T>&& records) {
+ TVector<ui64> remove(Reserve(records.size()));
+ for (const auto& record : records) {
+ remove.push_back(record.Order);
+ }
+
+ ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
+ }
+
void CreateSenders(const TVector<ui64>& partitionIds) override;
void KillSenders() override;
void RemoveRecords() override;
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 70504fb1b35..6be8efdac38 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -1972,6 +1972,39 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(RacySplitAndDropTable) {
+ TTestPqEnv env(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), false);
+ auto& runtime = *env.GetServer()->GetRuntime();
+
+ TVector<THolder<IEventHandle>> enqueued;
+ auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvChangeExchange::EvEnqueueRecords) {
+ enqueued.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
+ UPSERT INTO `/Root/Table` (key, value)
+ VALUES (1, 10);
+ )");
+
+ SetSplitMergePartCountLimit(&runtime, -1);
+ const auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
+ UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);
+ AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 2);
+
+ const auto dropTxId = AsyncDropTable(env.GetServer(), env.GetEdgeActor(), "/Root", "Table");
+
+ runtime.SetObserverFunc(prevObserver);
+ for (auto& ev : std::exchange(enqueued, {})) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ WaitTxNotification(env.GetServer(), env.GetEdgeActor(), dropTxId);
+ }
+
Y_UNIT_TEST(RenameTable) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index 0d74dda4c7c..61485013500 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -831,6 +831,22 @@ void TPathDescriber::DescribeExternalDataSource(const TActorContext&, TPathId pa
entry->MutableAuth()->CopyFrom(externalDataSourceInfo->Auth);
}
+static bool ConsiderAsDropped(const TPath& path) {
+ Y_VERIFY(path.IsResolved());
+
+ if (path.Base()->IsTable() || path.Base()->IsTableIndex()) {
+ return false;
+ }
+ if (path.Base()->IsDirectory() || path.Base()->IsDomainRoot()) {
+ return false;
+ }
+ if (path.IsCdcStream()) {
+ return false;
+ }
+
+ return true;
+}
+
THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> TPathDescriber::Describe(const TActorContext& ctx) {
TPathId pathId = Params.HasPathId() ? TPathId(Params.GetSchemeshardId(), Params.GetPathId()) : InvalidPathId;
TString pathStr = Params.GetPath();
@@ -856,7 +872,7 @@ THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> TPathDescriber::Describe
checks
.NotDeleted();
- if (checks && !path.Base()->IsTable() && !path.Base()->IsTableIndex() && !path.Base()->IsDirectory() && !path.Base()->IsDomainRoot()) {
+ if (checks && ConsiderAsDropped(path)) {
// KIKIMR-13173
// PQ BSV drop their shard before PlanStep
// If they are being deleted consider them as deleted
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
index 62ba67cc9b6..800927f877a 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
@@ -343,4 +343,49 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
});
}
+ Y_UNIT_TEST(RacySplitAndDropTable) {
+ TTestWithReboots t;
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ {
+ TInactiveZone inactive(activeZone);
+ TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+ runtime.SetLogPriority(NKikimrServices::CHANGE_EXCHANGE, NLog::PRI_TRACE);
+ }
+
+ TestSplitTable(runtime, ++t.TxId, "/MyRoot/Table", Sprintf(R"(
+ SourceTabletId: %lu
+ SplitBoundary {
+ KeyPrefix {
+ Tuple { Optional { Uint64: 2 } }
+ }
+ }
+ )", TTestTxConfig::FakeHiveTablets));
+ TestDropTable(runtime, ++t.TxId, "/MyRoot", "Table");
+ t.TestEnv->TestWaitNotification(runtime, {t.TxId - 1, t.TxId});
+
+ {
+ TInactiveZone inactive(activeZone);
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::PathNotExist,
+ });
+ }
+ });
+ }
+
} // TCdcStreamWithRebootsTests