diff options
| author | Ilnaz Nizametdinov <[email protected]> | 2022-07-05 21:00:48 +0300 |
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-07-05 21:00:48 +0300 |
| commit | d78d29cdaaa0fed635ca52411af74b8442275bc4 (patch) | |
| tree | 655272bd178733794653cce248c3ca8f93e1e45e | |
| parent | 9d40453058c96c460db9d1e3e8bae45cb071df1d (diff) | |
22-2: Check 'Created' flag for cdc streams KIKIMR-15223
merge from trunk: r9673198
REVIEW: 2703168
x-ydb-stable-ref: ebb7c532e79dcd509f4df402007654ac3cc476d9
| -rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 125 | ||||
| -rw-r--r-- | ydb/core/tx/scheme_board/cache.cpp | 10 |
2 files changed, 108 insertions, 27 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 830b89e2d6b..2a8864352da 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -7,6 +7,7 @@ #include <ydb/core/persqueue/events/global.h> #include <ydb/core/persqueue/user_info.h> #include <ydb/core/persqueue/write_meta.h> +#include <ydb/core/tx/scheme_board/events.h> #include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> @@ -685,6 +686,28 @@ Y_UNIT_TEST_SUITE(Cdc) { using TCdcStream = TShardedTableOptions::TCdcStream; + static NKikimrPQ::TPQConfig DefaultPQConfig() { + NKikimrPQ::TPQConfig pqConfig; + pqConfig.SetEnabled(true); + pqConfig.SetEnableProtoSourceIdInfo(true); + pqConfig.SetRoundRobinPartitionMapping(true); + pqConfig.SetTopicsAreFirstClassCitizen(true); + pqConfig.SetMaxReadCookies(10); + pqConfig.AddClientServiceType()->SetName("data-streams"); + pqConfig.SetCheckACL(false); + pqConfig.SetRequireCredentialsInNewProtocol(false); + pqConfig.MutableQuotingConfig()->SetEnableQuoting(false); + return pqConfig; + } + + static void SetupLogging(TTestActorRuntime& runtime) { + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::CHANGE_EXCHANGE, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NLog::PRI_DEBUG); + } + template <typename TDerived, typename TClient> class TTestEnv { public: @@ -735,29 +758,6 @@ Y_UNIT_TEST_SUITE(Cdc) { } private: - static NKikimrPQ::TPQConfig DefaultPQConfig() { - NKikimrPQ::TPQConfig pqConfig; - pqConfig.SetEnabled(true); - pqConfig.SetEnableProtoSourceIdInfo(true); - pqConfig.SetRoundRobinPartitionMapping(true); - pqConfig.SetTopicsAreFirstClassCitizen(true); - pqConfig.SetMaxReadCookies(10); - pqConfig.AddClientServiceType()->SetName("data-streams"); - pqConfig.SetCheckACL(false); - pqConfig.SetRequireCredentialsInNewProtocol(false); - pqConfig.MutableQuotingConfig()->SetEnableQuoting(false); - return pqConfig; - } - - static void SetupLogging(TTestActorRuntime& runtime) { - runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::CHANGE_EXCHANGE, NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NLog::PRI_DEBUG); - } - - private: TPortManager PortManager; TServer::TPtr Server; TActorId EdgeActor; @@ -1456,6 +1456,85 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(RacyCreateAndSend) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + bool added = false; + TVector<THolder<IEventHandle>> delayed; + + auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvChangeExchange::EvAddSender: + added = true; + break; + + case TSchemeBoardEvents::EvUpdate: + if (auto* msg = ev->Get<TSchemeBoardEvents::TEvUpdate>()) { + const auto desc = msg->GetRecord().GetDescribeSchemeResult(); + if (desc.GetPath() == "/Root/Table/Stream" && desc.GetPathDescription().GetSelf().GetCreateFinished()) { + delayed.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + } + break; + + default: + break; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + const auto txId = AsyncAlterAddStream(server, "/Root", "Table", + Updates(NKikimrSchemeOp::ECdcStreamFormatJson)); + + if (!added) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&added](IEventHandle&) { + return added; + }); + runtime.DispatchEvents(opts); + } + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )"); + + if (!delayed.size() || delayed.size() % 3) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&delayed](IEventHandle&) { + return delayed.size() && (delayed.size() % 3 == 0); + }); + runtime.DispatchEvents(opts); + } + + runtime.SetObserverFunc(prevObserver); + for (auto& ev : delayed) { + runtime.Send(ev.Release(), 0, true); + } + + WaitTxNotification(server, edgeActor, txId); + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + R"({"update":{"value":30},"key":[3]})", + }); + } + } // Cdc } // NKikimr diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index 82e2fa87c46..041a3007184 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -1413,10 +1413,12 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { case NKikimrSchemeOp::EPathTypeCdcStream: Kind = TNavigate::KindCdcStream; IsPrivatePath = CalcPathIsPrivate(entryDesc.GetPathType(), entryDesc.GetPathSubType()); - FillInfo(Kind, CdcStreamInfo, std::move(*pathDesc.MutableCdcStreamDescription())); - if (CdcStreamInfo->Description.HasPathId()) { - const auto& pathId = CdcStreamInfo->Description.GetPathId(); - CdcStreamInfo->PathId = TPathId(pathId.GetOwnerId(), pathId.GetLocalId()); + if (Created) { + FillInfo(Kind, CdcStreamInfo, std::move(*pathDesc.MutableCdcStreamDescription())); + if (CdcStreamInfo->Description.HasPathId()) { + const auto& pathId = CdcStreamInfo->Description.GetPathId(); + CdcStreamInfo->PathId = TPathId(pathId.GetOwnerId(), pathId.GetLocalId()); + } } break; case NKikimrSchemeOp::EPathTypeSequence: |
