diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-04-04 11:02:58 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-04-04 11:02:58 +0300 |
commit | 22f1654932759a7f59bb4b16e0611e7a22940fa7 (patch) | |
tree | 2e218873de862ff1817c2d921c5a04fba7b62962 | |
parent | 3252a2ccdda0f3fa0314aadf286d22d656bfdb9e (diff) | |
download | ydb-22f1654932759a7f59bb4b16e0611e7a22940fa7.tar.gz |
Handle cdc streams that are planned to drop
-rw-r--r-- | ydb/core/tx/datashard/change_exchange_split.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 26 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.cpp | 18 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.h | 10 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 33 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_path_describer.cpp | 18 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp | 45 |
7 files changed, 139 insertions, 16 deletions
diff --git a/ydb/core/tx/datashard/change_exchange_split.cpp b/ydb/core/tx/datashard/change_exchange_split.cpp index a7fc8afe743..06c8a00bfd1 100644 --- a/ydb/core/tx/datashard/change_exchange_split.cpp +++ b/ydb/core/tx/datashard/change_exchange_split.cpp @@ -272,6 +272,11 @@ class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHel return; } + if (entry.Self && entry.Self->Info.GetPathState() == NKikimrSchemeOp::EPathStateDrop) { + LOG_N("Auto-ack (stream is planned to drop)"); + return Ack(); + } + Y_VERIFY(entry.ListNodeEntry->Children.size() == 1); const auto& topic = entry.ListNodeEntry->Children.at(0); diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 533b001e8dc..0df10c6a89e 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -515,6 +515,14 @@ class TCdcChangeSenderMain return; } + if (entry.Self && entry.Self->Info.GetPathState() == NKikimrSchemeOp::EPathStateDrop) { + LOG_D("Stream is planned to drop, waiting for the EvRemoveSender command"); + + RemoveRecords(); + KillSenders(); + return Become(&TThis::StatePendingRemove); + } + Stream = TUserTable::TCdcStream(entry.CdcStreamInfo->Description); Y_VERIFY(entry.ListNodeEntry->Children.size() == 1); @@ -539,7 +547,7 @@ class TCdcChangeSenderMain STATEFN(StateResolveTopic) { switch (ev->GetTypeRewrite()) { hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTopic); - sFunc(TEvents::TEvWakeup, ResolveTopic); + sFunc(TEvents::TEvWakeup, ResolveCdcStream); default: return StateBase(ev, TlsActivationContext->AsActorContext()); } @@ -641,7 +649,7 @@ class TCdcChangeSenderMain } void Resolve() override { - ResolveTopic(); + ResolveCdcStream(); } bool IsResolved() const override { @@ -726,6 +734,11 @@ class TCdcChangeSenderMain PassAway(); } + void AutoRemove(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + RemoveRecords(std::move(ev->Get()->Records)); + } + void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { RenderHtmlPage(ESenderType::CdcStream, ev, ctx); } @@ -763,6 +776,15 @@ public: } } + STFUNC(StatePendingRemove) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvChangeExchange::TEvEnqueueRecords, AutoRemove); + hFunc(TEvChangeExchange::TEvRemoveSender, Handle); + HFunc(NMon::TEvRemoteHttpInfo, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + private: mutable TMaybe<TString> LogPrefix; diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index 201fd1c0a79..68094c6dad1 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -39,11 +39,9 @@ void TBaseChangeSender::CreateSenders(const TVector<ui64>& partitionIds) { } void TBaseChangeSender::KillSenders() { - for (const auto& [_, sender] : Senders) { + for (const auto& [_, sender] : std::exchange(Senders, {})) { ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill()); } - - Senders.clear(); } void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) { @@ -183,13 +181,7 @@ void TBaseChangeSender::OnReady(ui64 partitionId) { sender.Ready = true; if (sender.Pending) { - TVector<ui64> remove(Reserve(sender.Pending.size())); - for (const auto& record : sender.Pending) { - remove.push_back(record.Order); - } - - ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); - sender.Pending.clear(); + RemoveRecords(std::exchange(sender.Pending, {})); } SendRecords(); @@ -223,15 +215,15 @@ void TBaseChangeSender::RemoveRecords() { TVector<ui64> remove(Reserve(Enqueued.size() + PendingBody.size() + PendingSent.size() + pendingStatus)); - for (const auto& record : Enqueued) { + for (const auto& record : std::exchange(Enqueued, {})) { remove.push_back(record.Order); } - for (const auto& record : PendingBody) { + for (const auto& record : std::exchange(PendingBody, {})) { remove.push_back(record.Order); } - for (const auto& [order, _] : PendingSent) { + for (const auto& [order, _] : std::exchange(PendingSent, {})) { remove.push_back(order); } diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h index ce589025389..a5de292ecdf 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/tx/datashard/change_sender_common_ops.h @@ -93,6 +93,16 @@ class TBaseChangeSender: public IChangeSender { void SendRecords(); protected: + template <typename T> + void RemoveRecords(TVector<T>&& records) { + TVector<ui64> remove(Reserve(records.size())); + for (const auto& record : records) { + remove.push_back(record.Order); + } + + ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); + } + void CreateSenders(const TVector<ui64>& partitionIds) override; void KillSenders() override; void RemoveRecords() override; diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 70504fb1b35..6be8efdac38 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -1972,6 +1972,39 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(RacySplitAndDropTable) { + TTestPqEnv env(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), false); + auto& runtime = *env.GetServer()->GetRuntime(); + + TVector<THolder<IEventHandle>> enqueued; + auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == TEvChangeExchange::EvEnqueueRecords) { + enqueued.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + return TTestActorRuntime::EEventAction::PROCESS; + }); + + ExecSQL(env.GetServer(), env.GetEdgeActor(), R"( + UPSERT INTO `/Root/Table` (key, value) + VALUES (1, 10); + )"); + + SetSplitMergePartCountLimit(&runtime, -1); + const auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1); + AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 2); + + const auto dropTxId = AsyncDropTable(env.GetServer(), env.GetEdgeActor(), "/Root", "Table"); + + runtime.SetObserverFunc(prevObserver); + for (auto& ev : std::exchange(enqueued, {})) { + runtime.Send(ev.Release(), 0, true); + } + + WaitTxNotification(env.GetServer(), env.GetEdgeActor(), dropTxId); + } + Y_UNIT_TEST(RenameTable) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 0d74dda4c7c..61485013500 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -831,6 +831,22 @@ void TPathDescriber::DescribeExternalDataSource(const TActorContext&, TPathId pa entry->MutableAuth()->CopyFrom(externalDataSourceInfo->Auth); } +static bool ConsiderAsDropped(const TPath& path) { + Y_VERIFY(path.IsResolved()); + + if (path.Base()->IsTable() || path.Base()->IsTableIndex()) { + return false; + } + if (path.Base()->IsDirectory() || path.Base()->IsDomainRoot()) { + return false; + } + if (path.IsCdcStream()) { + return false; + } + + return true; +} + THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> TPathDescriber::Describe(const TActorContext& ctx) { TPathId pathId = Params.HasPathId() ? TPathId(Params.GetSchemeshardId(), Params.GetPathId()) : InvalidPathId; TString pathStr = Params.GetPath(); @@ -856,7 +872,7 @@ THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> TPathDescriber::Describe checks .NotDeleted(); - if (checks && !path.Base()->IsTable() && !path.Base()->IsTableIndex() && !path.Base()->IsDirectory() && !path.Base()->IsDomainRoot()) { + if (checks && ConsiderAsDropped(path)) { // KIKIMR-13173 // PQ BSV drop their shard before PlanStep // If they are being deleted consider them as deleted diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp index 62ba67cc9b6..800927f877a 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp @@ -343,4 +343,49 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { }); } + Y_UNIT_TEST(RacySplitAndDropTable) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + runtime.SetLogPriority(NKikimrServices::CHANGE_EXCHANGE, NLog::PRI_TRACE); + } + + TestSplitTable(runtime, ++t.TxId, "/MyRoot/Table", Sprintf(R"( + SourceTabletId: %lu + SplitBoundary { + KeyPrefix { + Tuple { Optional { Uint64: 2 } } + } + } + )", TTestTxConfig::FakeHiveTablets)); + TestDropTable(runtime, ++t.TxId, "/MyRoot", "Table"); + t.TestEnv->TestWaitNotification(runtime, {t.TxId - 1, t.TxId}); + + { + TInactiveZone inactive(activeZone); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::PathNotExist, + }); + } + }); + } + } // TCdcStreamWithRebootsTests |