summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <[email protected]>2022-07-05 21:00:48 +0300
committerDaniil Cherednik <[email protected]>2022-07-05 21:00:48 +0300
commitd78d29cdaaa0fed635ca52411af74b8442275bc4 (patch)
tree655272bd178733794653cce248c3ca8f93e1e45e
parent9d40453058c96c460db9d1e3e8bae45cb071df1d (diff)
22-2: Check 'Created' flag for cdc streams KIKIMR-15223
merge from trunk: r9673198 REVIEW: 2703168 x-ydb-stable-ref: ebb7c532e79dcd509f4df402007654ac3cc476d9
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp125
-rw-r--r--ydb/core/tx/scheme_board/cache.cpp10
2 files changed, 108 insertions, 27 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 830b89e2d6b..2a8864352da 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -7,6 +7,7 @@
#include <ydb/core/persqueue/events/global.h>
#include <ydb/core/persqueue/user_info.h>
#include <ydb/core/persqueue/write_meta.h>
+#include <ydb/core/tx/scheme_board/events.h>
#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
@@ -685,6 +686,28 @@ Y_UNIT_TEST_SUITE(Cdc) {
using TCdcStream = TShardedTableOptions::TCdcStream;
+ static NKikimrPQ::TPQConfig DefaultPQConfig() {
+ NKikimrPQ::TPQConfig pqConfig;
+ pqConfig.SetEnabled(true);
+ pqConfig.SetEnableProtoSourceIdInfo(true);
+ pqConfig.SetRoundRobinPartitionMapping(true);
+ pqConfig.SetTopicsAreFirstClassCitizen(true);
+ pqConfig.SetMaxReadCookies(10);
+ pqConfig.AddClientServiceType()->SetName("data-streams");
+ pqConfig.SetCheckACL(false);
+ pqConfig.SetRequireCredentialsInNewProtocol(false);
+ pqConfig.MutableQuotingConfig()->SetEnableQuoting(false);
+ return pqConfig;
+ }
+
+ static void SetupLogging(TTestActorRuntime& runtime) {
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::CHANGE_EXCHANGE, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NLog::PRI_DEBUG);
+ }
+
template <typename TDerived, typename TClient>
class TTestEnv {
public:
@@ -735,29 +758,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
private:
- static NKikimrPQ::TPQConfig DefaultPQConfig() {
- NKikimrPQ::TPQConfig pqConfig;
- pqConfig.SetEnabled(true);
- pqConfig.SetEnableProtoSourceIdInfo(true);
- pqConfig.SetRoundRobinPartitionMapping(true);
- pqConfig.SetTopicsAreFirstClassCitizen(true);
- pqConfig.SetMaxReadCookies(10);
- pqConfig.AddClientServiceType()->SetName("data-streams");
- pqConfig.SetCheckACL(false);
- pqConfig.SetRequireCredentialsInNewProtocol(false);
- pqConfig.MutableQuotingConfig()->SetEnableQuoting(false);
- return pqConfig;
- }
-
- static void SetupLogging(TTestActorRuntime& runtime) {
- runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
- runtime.SetLogPriority(NKikimrServices::CHANGE_EXCHANGE, NLog::PRI_TRACE);
- runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG);
- runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NLog::PRI_DEBUG);
- runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NLog::PRI_DEBUG);
- }
-
- private:
TPortManager PortManager;
TServer::TPtr Server;
TActorId EdgeActor;
@@ -1456,6 +1456,85 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(RacyCreateAndSend) {
+ TPortManager portManager;
+ TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetUseRealThreads(false)
+ .SetDomainName("Root")
+ );
+
+ auto& runtime = *server->GetRuntime();
+ const auto edgeActor = runtime.AllocateEdgeActor();
+
+ SetupLogging(runtime);
+ InitRoot(server, edgeActor);
+ CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
+
+ bool added = false;
+ TVector<THolder<IEventHandle>> delayed;
+
+ auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvChangeExchange::EvAddSender:
+ added = true;
+ break;
+
+ case TSchemeBoardEvents::EvUpdate:
+ if (auto* msg = ev->Get<TSchemeBoardEvents::TEvUpdate>()) {
+ const auto desc = msg->GetRecord().GetDescribeSchemeResult();
+ if (desc.GetPath() == "/Root/Table/Stream" && desc.GetPathDescription().GetSelf().GetCreateFinished()) {
+ delayed.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ break;
+
+ default:
+ break;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ const auto txId = AsyncAlterAddStream(server, "/Root", "Table",
+ Updates(NKikimrSchemeOp::ECdcStreamFormatJson));
+
+ if (!added) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&added](IEventHandle&) {
+ return added;
+ });
+ runtime.DispatchEvents(opts);
+ }
+
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )");
+
+ if (!delayed.size() || delayed.size() % 3) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&delayed](IEventHandle&) {
+ return delayed.size() && (delayed.size() % 3 == 0);
+ });
+ runtime.DispatchEvents(opts);
+ }
+
+ runtime.SetObserverFunc(prevObserver);
+ for (auto& ev : delayed) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ WaitTxNotification(server, edgeActor, txId);
+ WaitForContent(server, edgeActor, "/Root/Table/Stream", {
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ R"({"update":{"value":30},"key":[3]})",
+ });
+ }
+
} // Cdc
} // NKikimr
diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp
index 82e2fa87c46..041a3007184 100644
--- a/ydb/core/tx/scheme_board/cache.cpp
+++ b/ydb/core/tx/scheme_board/cache.cpp
@@ -1413,10 +1413,12 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
case NKikimrSchemeOp::EPathTypeCdcStream:
Kind = TNavigate::KindCdcStream;
IsPrivatePath = CalcPathIsPrivate(entryDesc.GetPathType(), entryDesc.GetPathSubType());
- FillInfo(Kind, CdcStreamInfo, std::move(*pathDesc.MutableCdcStreamDescription()));
- if (CdcStreamInfo->Description.HasPathId()) {
- const auto& pathId = CdcStreamInfo->Description.GetPathId();
- CdcStreamInfo->PathId = TPathId(pathId.GetOwnerId(), pathId.GetLocalId());
+ if (Created) {
+ FillInfo(Kind, CdcStreamInfo, std::move(*pathDesc.MutableCdcStreamDescription()));
+ if (CdcStreamInfo->Description.HasPathId()) {
+ const auto& pathId = CdcStreamInfo->Description.GetPathId();
+ CdcStreamInfo->PathId = TPathId(pathId.GetOwnerId(), pathId.GetLocalId());
+ }
}
break;
case NKikimrSchemeOp::EPathTypeSequence: