diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-04-19 21:36:30 +0300 |
---|---|---|
committer | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-04-19 21:36:30 +0300 |
commit | 6b1026b629f4ca00592d208d02e18d5c65409d38 (patch) | |
tree | 2e47d3427b62b3d31aebf4f24e621a2e31539f9c | |
parent | 721c4d6f6ca8aa408aeaf64ccd5dbcdb5e35c2d3 (diff) | |
download | ydb-6b1026b629f4ca00592d208d02e18d5c65409d38.tar.gz |
Additional tests (& fixes): schema snapshots, split/merge KIKIMR-13698
ref:7ef99d44fd7498892ffa36bf7f026a40cdece608
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_exchange_split.cpp | 28 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_change_sender_activation.cpp | 13 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_change_sending.cpp | 12 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_split_dst.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_split_src.cpp | 63 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_collector.cpp | 26 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 340 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.cpp | 60 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.h | 12 |
13 files changed, 463 insertions, 112 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index a014d32e7cd..d1eecea1323 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -4634,8 +4634,14 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, const TActorCon } if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed - Y_VERIFY(sourceIdWriter.GetSourceIdsToWrite().empty()); - return request->Record.CmdWriteSize() > 0 || request->Record.CmdRenameSize() > 0 || request->Record.CmdDeleteRangeSize() > 0; + if (sourceIdWriter.GetSourceIdsToWrite().empty()) { + return request->Record.CmdWriteSize() > 0 + || request->Record.CmdRenameSize() > 0 + || request->Record.CmdDeleteRangeSize() > 0; + } else { + sourceIdWriter.FillRequest(request, Partition); + return true; + } } sourceIdWriter.FillRequest(request, Partition); diff --git a/ydb/core/tx/datashard/change_exchange_split.cpp b/ydb/core/tx/datashard/change_exchange_split.cpp index af95c80036b..a7fc8afe743 100644 --- a/ydb/core/tx/datashard/change_exchange_split.cpp +++ b/ydb/core/tx/datashard/change_exchange_split.cpp @@ -112,6 +112,7 @@ public: group.SetId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(dstTabletId))); } + NTabletPipe::SendData(SelfId(), PipeClient, ev.Release()); Become(&TThis::StateWork); } @@ -141,13 +142,14 @@ class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHel LogPrefix = TStringBuilder() << "[ChangeExchangeSplitCdcWorker]" << "[" << SrcTabletId << "]" - << SelfId() /* contains brackets */; + << SelfId() /* contains brackets */ << " "; } return LogPrefix.GetRef(); } void Ack() { + LOG_I("Send ack"); Send(Parent, new TEvChangeExchange::TEvSplitAck()); PassAway(); } @@ -335,14 +337,23 @@ class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHel workers.emplace(partitionId, it->second); Workers.erase(it); } else { + LOG_T("Register new worker" + << ": partitionId# " << partitionId); + const auto worker = Register(new TCdcPartitionWorker(SelfId(), partitionId, tabletId, SrcTabletId, DstTabletIds)); workers.emplace(partitionId, worker); Pending.emplace(worker, partitionId); } } - for (const auto& [_, worker] : Workers) { + for (const auto& kv : Workers) { + const auto& partitionId = kv.first; + const auto& worker = kv.second; + if (worker) { + LOG_T("Kill stale worker" + << ": partitionId# " << partitionId); + Send(worker, new TEvents::TEvPoisonPill()); Pending.erase(worker); } @@ -352,6 +363,8 @@ class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHel return Ack(); } + LOG_I("Wait " << Pending.size() << " worker(s)"); + Workers = std::move(workers); Become(&TThis::StateWork); } @@ -473,7 +486,7 @@ class TChangeExchageSplit: public TActorBootstrapped<TChangeExchageSplit> { LogPrefix = TStringBuilder() << "[ChangeExchangeSplit]" << "[" << DataShard.TabletId << "]" - << SelfId() /* contains brackets */; + << SelfId() /* contains brackets */ << " "; } return LogPrefix.GetRef(); @@ -495,6 +508,7 @@ class TChangeExchageSplit: public TActorBootstrapped<TChangeExchageSplit> { } void Ack() { + LOG_I("Send ack"); Send(DataShard.ActorId, new TEvChangeExchange::TEvSplitAck()); PassAway(); } @@ -549,10 +563,16 @@ public: void Bootstrap() { if (!Workers) { + LOG_N("Auto-ack (no active worker)"); return Ack(); } - for (auto& [pathId, worker] : Workers) { + for (auto& kv : Workers) { + const auto& pathId = kv.first; + auto& worker = kv.second; + + LOG_D("Register worker" + << ": pathId# " << pathId); Pending.emplace(RegisterWorker(pathId, worker), pathId); } diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index 2a7db10a626..1ec9cb145e1 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -263,6 +263,8 @@ void TChangeRecord::Out(IOutputStream& out) const { << " PathId: " << PathId << " Kind: " << Kind << " Body: " << Body.size() << "b" + << " TableId: " << TableId + << " SchemaVersion: " << SchemaVersion << " }"; } diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 301722c108b..d5dccdc45c9 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -134,8 +134,9 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti /// Write void Write(NKikimrClient::TPersQueueRequest&& request) { - auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>(++Cookie); + auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>(); ev->Record = std::move(request); + ev->Record.MutablePartitionRequest()->SetCookie(++Cookie); Send(Writer, std::move(ev)); Become(&TThis::StateWrite); diff --git a/ydb/core/tx/datashard/datashard_change_sender_activation.cpp b/ydb/core/tx/datashard/datashard_change_sender_activation.cpp index 2635668bece..7c3a6d31f90 100644 --- a/ydb/core/tx/datashard/datashard_change_sender_activation.cpp +++ b/ydb/core/tx/datashard/datashard_change_sender_activation.cpp @@ -99,16 +99,9 @@ public: << ": origin# " << Origin << ", at tablet# " << Self->TabletID()); - if (AllDstAcksReceived) { - for (const auto& [ackTo, opIds] : Self->SrcAckPartitioningChangedTo) { - for (const ui64 opId : opIds) { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ack split partitioning changed to schemeshard " << opId); - ctx.Send(ackTo, new TEvDataShard::TEvSplitPartitioningChangedAck(opId, Self->TabletID())); - } - } - - Self->SrcAckPartitioningChangedTo.clear(); - Self->CheckStateChange(ctx); + if (AllDstAcksReceived && Self->SrcAckPartitioningChangedTo) { + Self->Execute(Self->CreateTxSplitPartitioningChanged(std::move(Self->SrcAckPartitioningChangedTo)), ctx); + Self->SrcAckPartitioningChangedTo.clear(); // to be sure } } diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp index 3ef9d1b96e4..7e532420bae 100644 --- a/ydb/core/tx/datashard/datashard_change_sending.cpp +++ b/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -64,13 +64,13 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> { << ", it->BodySize: " << it->BodySize); const auto schemaVersion = basic.GetValue<Schema::ChangeRecords::SchemaVersion>(); - TUserTable::TCPtr schema; + const auto tableId = TPathId( + basic.GetValue<Schema::ChangeRecords::TableOwnerId>(), + basic.GetValue<Schema::ChangeRecords::TablePathId>() + ); + TUserTable::TCPtr schema; if (schemaVersion) { - const auto tableId = TPathId( - basic.GetValue<Schema::ChangeRecords::TableOwnerId>(), - basic.GetValue<Schema::ChangeRecords::TablePathId>() - ); const auto snapshotKey = TSchemaSnapshotKey(tableId, schemaVersion); if (const auto* snapshot = Self->GetSchemaSnapshotManager().FindSnapshot(snapshotKey)) { schema = snapshot->Schema; @@ -86,6 +86,8 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> { basic.GetValue<Schema::ChangeRecords::PathOwnerId>(), basic.GetValue<Schema::ChangeRecords::LocalPathId>() )) + .WithTableId(tableId) + .WithSchemaVersion(schemaVersion) .WithSchema(schema) .WithBody(details.GetValue<Schema::ChangeRecordDetails::Body>()) .Build()); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 86c536a4174..e06c0b64cd4 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1079,6 +1079,7 @@ class TDataShard NTabletFlatExecutor::ITransaction* CreateTxSchemaChanged(TEvDataShard::TEvSchemaChangedResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxStartSplit(); NTabletFlatExecutor::ITransaction* CreateTxSplitSnapshotComplete(TIntrusivePtr<TSplitSnapshotContext> snapContext); + NTabletFlatExecutor::ITransaction* CreateTxSplitPartitioningChanged(THashMap<TActorId, THashSet<ui64>>&& waiters); NTabletFlatExecutor::ITransaction* CreateTxInitiateBorrowedPartsReturn(); NTabletFlatExecutor::ITransaction* CreateTxCheckInReadSets(); NTabletFlatExecutor::ITransaction* CreateTxRemoveOldInReadSets(); diff --git a/ydb/core/tx/datashard/datashard_split_dst.cpp b/ydb/core/tx/datashard/datashard_split_dst.cpp index 7665d376512..c2184ffe1df 100644 --- a/ydb/core/tx/datashard/datashard_split_dst.cpp +++ b/ydb/core/tx/datashard/datashard_split_dst.cpp @@ -50,6 +50,11 @@ public: if (Self->GetPathOwnerId() == INVALID_TABLET_ID) { Self->PersistOwnerPathId(tableId.OwnerId, txc); } + + if (info->NeedSchemaSnapshots()) { + const ui64 txId = Ev->Get()->Record.GetOperationCookie(); + Self->AddSchemaSnapshot(tableId, info->GetTableSchemaVersion(), 0, txId, txc, ctx); + } } Self->DstSplitDescription = std::make_shared<NKikimrTxDataShard::TSplitMergeDescription>(Ev->Get()->Record.GetSplitDescription()); diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp index a7349fc9cf3..a9ccd68001c 100644 --- a/ydb/core/tx/datashard/datashard_split_src.cpp +++ b/ydb/core/tx/datashard/datashard_split_src.cpp @@ -458,38 +458,24 @@ public: class TDataShard::TTxSplitPartitioningChanged : public NTabletFlatExecutor::TTransactionBase<TDataShard> { -private: - TEvDataShard::TEvSplitPartitioningChanged::TPtr Ev; - bool DelayPartitioningChangedAck = false; + THashMap<TActorId, THashSet<ui64>> Waiters; public: - TTxSplitPartitioningChanged(TDataShard* ds, TEvDataShard::TEvSplitPartitioningChanged::TPtr& ev) + TTxSplitPartitioningChanged(TDataShard* ds, THashMap<TActorId, THashSet<ui64>>&& waiters) : NTabletFlatExecutor::TTransactionBase<TDataShard>(ds) - , Ev(ev) + , Waiters(std::move(waiters)) {} TTxType GetTxType() const override { return TXTYPE_SPLIT_PARTITIONING_CHANGED; } - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { - ui64 opId = Ev->Get()->Record.GetOperationCookie(); - - LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Got TEvSplitPartitioningChanged opId %" PRIu64 " at datashard %" PRIu64 " state %s", - opId, Self->TabletID(), DatashardStateName(Self->State).data()); - - if (Self->ChangesQueue || !Self->ChangeSenderActivator.AllAcked()) { - LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " delay partitioning changed ack" - << " ChangesQueue size: " << Self->ChangesQueue.size() - << " siblings to be activated: " << Self->ChangeSenderActivator.Dump()); - - DelayPartitioningChangedAck = true; - Self->SrcAckPartitioningChangedTo[Ev->Sender].insert(opId); - } + bool Execute(TTransactionContext& txc, const TActorContext&) override { + Y_VERIFY(!Self->ChangesQueue && Self->ChangeSenderActivator.AllAcked()); // TODO: At this point Src should start rejecting all new Tx with SchemaChanged status if (Self->State != TShardState::SplitSrcWaitForPartitioningChanged) { Y_VERIFY(Self->State == TShardState::PreOffline || Self->State == TShardState::Offline, - "Unexpected TEvSplitPartitioningChanged opId %" PRIu64 " at datashard %" PRIu64 " state %s", - Ev->Get()->Record.GetOperationCookie(), Self->TabletID(), DatashardStateName(Self->State).data()); + "Unexpected TEvSplitPartitioningChanged at datashard %" PRIu64 " state %s", + Self->TabletID(), DatashardStateName(Self->State).data()); return true; } @@ -504,16 +490,13 @@ public: } void Complete(const TActorContext &ctx) override { - TActorId ackTo = Ev->Sender; - ui64 opId = Ev->Get()->Record.GetOperationCookie(); - - if (DelayPartitioningChangedAck) { - return; + for (const auto& [ackTo, opIds] : Waiters) { + for (const ui64 opId : opIds) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ack split partitioning changed to schemeshard " << opId); + ctx.Send(ackTo, new TEvDataShard::TEvSplitPartitioningChangedAck(opId, Self->TabletID())); + } } - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ack split partitioning changed to schemeshard " << opId); - ctx.Send(ackTo, new TEvDataShard::TEvSplitPartitioningChangedAck(opId, Self->TabletID())); - // TODO: properly check if there are no loans Self->CheckStateChange(ctx); } @@ -528,7 +511,27 @@ void TDataShard::Handle(TEvDataShard::TEvSplitTransferSnapshotAck::TPtr& ev, con } void TDataShard::Handle(TEvDataShard::TEvSplitPartitioningChanged::TPtr& ev, const TActorContext& ctx) { - Execute(new TTxSplitPartitioningChanged(this, ev), ctx); + const auto opId = ev->Get()->Record.GetOperationCookie(); + + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Got TEvSplitPartitioningChanged" + << ": opId: " << opId + << ", at datashard: " << TabletID() + << ", state: " << DatashardStateName(State).data()); + + SrcAckPartitioningChangedTo[ev->Sender].insert(opId); + + if (ChangesQueue || !ChangeSenderActivator.AllAcked()) { + LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " delay partitioning changed ack" + << ", ChangesQueue size: " << ChangesQueue.size() + << ", siblings to be activated: " << ChangeSenderActivator.Dump()); + } else { + Execute(CreateTxSplitPartitioningChanged(std::move(SrcAckPartitioningChangedTo)), ctx); + SrcAckPartitioningChangedTo.clear(); // to be sure + } +} + +NTabletFlatExecutor::ITransaction* TDataShard::CreateTxSplitPartitioningChanged(THashMap<TActorId, THashSet<ui64>>&& waiters) { + return new TTxSplitPartitioningChanged(this, std::move(waiters)); } }} diff --git a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp index 83b7d09b6cd..854459e8430 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp @@ -3,7 +3,6 @@ #include <ydb/core/protos/change_exchange.pb.h> #include <ydb/core/scheme/scheme_tablecell.h> -#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/public/lib/deprecated/kicli/kicli.h> namespace NKikimr { @@ -107,31 +106,6 @@ auto GetChangeRecordsWithDetails(TTestActorRuntime& runtime, const TActorId& sen return result; } -THolder<NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TActorId& sender, const TString& path) { - using TNavigate = NSchemeCache::TSchemeCacheNavigate; - using TEvRequest = TEvTxProxySchemeCache::TEvNavigateKeySet; - using TEvResponse = TEvTxProxySchemeCache::TEvNavigateKeySetResult; - - auto request = MakeHolder<TNavigate>(); - auto& entry = request->ResultSet.emplace_back(); - entry.Path = SplitPath(path); - entry.RequestType = TNavigate::TEntry::ERequestType::ByPath; - entry.Operation = TNavigate::EOp::OpTable; - entry.ShowPrivatePath = true; - runtime.Send(new IEventHandle(MakeSchemeCacheID(), sender, new TEvRequest(request.Release()))); - - auto ev = runtime.GrabEdgeEventRethrow<TEvResponse>(sender); - UNIT_ASSERT(ev); - UNIT_ASSERT(ev->Get()); - - auto* response = ev->Get()->Request.Release(); - UNIT_ASSERT(response); - UNIT_ASSERT(response->ErrorCount == 0); - UNIT_ASSERT_VALUES_EQUAL(response->ResultSet.size(), 1); - - return THolder(response); -} - using TStructKey = TVector<std::pair<TString, ui32>>; using TStructValue = THashMap<TString, ui32>; constexpr ui32 Null = 0; diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index eb1df20a3a7..8ae06589ebe 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -4,6 +4,9 @@ #include <library/cpp/json/json_reader.h> #include <ydb/core/base/path.h> +#include <ydb/core/persqueue/events/global.h> +#include <ydb/core/persqueue/user_info.h> +#include <ydb/core/persqueue/write_meta.h> #include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> @@ -688,15 +691,19 @@ Y_UNIT_TEST_SUITE(Cdc) { explicit TTestEnv( const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, + bool useRealThreads = true, const TString& root = "Root", const TString& tableName = "Table") { auto settings = TServerSettings(PortManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(useRealThreads) .SetDomainName(root) .SetGrpcPort(PortManager.GetPort(2135)); Server = new TServer(settings); - Server->EnableGRpc(settings.GrpcPort); + if (useRealThreads) { + Server->EnableGRpc(settings.GrpcPort); + } const auto database = JoinPath({root}); auto& runtime = *Server->GetRuntime(); @@ -708,7 +715,9 @@ Y_UNIT_TEST_SUITE(Cdc) { CreateShardedTable(Server, EdgeActor, database, tableName, tableDesc); WaitTxNotification(Server, EdgeActor, AsyncAlterAddStream(Server, database, tableName, streamDesc)); - Client = TDerived::MakeClient(Server->GetDriver(), database); + if (useRealThreads) { + Client = TDerived::MakeClient(Server->GetDriver(), database); + } } TServer::TPtr GetServer() { @@ -739,6 +748,8 @@ Y_UNIT_TEST_SUITE(Cdc) { static void SetupLogging(TTestActorRuntime& runtime) { runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::CHANGE_EXCHANGE, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG); runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NLog::PRI_DEBUG); runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NLog::PRI_DEBUG); } @@ -777,25 +788,25 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } - TCdcStream KeysOnly(NKikimrSchemeOp::ECdcStreamFormat format) { + TCdcStream KeysOnly(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") { return TCdcStream{ - .Name = "Stream", + .Name = name, .Mode = NKikimrSchemeOp::ECdcStreamModeKeysOnly, .Format = format, }; } - TCdcStream Updates(NKikimrSchemeOp::ECdcStreamFormat format) { + TCdcStream Updates(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") { return TCdcStream{ - .Name = "Stream", + .Name = name, .Mode = NKikimrSchemeOp::ECdcStreamModeUpdate, .Format = format, }; } - TCdcStream NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormat format) { + TCdcStream NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") { return TCdcStream{ - .Name = "Stream", + .Name = name, .Mode = NKikimrSchemeOp::ECdcStreamModeNewAndOldImages, .Format = format, }; @@ -826,8 +837,8 @@ Y_UNIT_TEST_SUITE(Cdc) { // add consumer { - auto res = client.AddReadRule("/Root/Table/Stream", - TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName("user"))).ExtractValueSync(); + auto res = client.AddReadRule("/Root/Table/Stream", TAddReadRuleSettings() + .ReadRule(TReadRuleSettings().ConsumerName("user"))).ExtractValueSync(); UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); } @@ -860,8 +871,8 @@ Y_UNIT_TEST_SUITE(Cdc) { // remove consumer { - auto res = client.RemoveReadRule("/Root/Table/Stream", - TRemoveReadRuleSettings().ConsumerName("user")).ExtractValueSync(); + auto res = client.RemoveReadRule("/Root/Table/Stream", TRemoveReadRuleSettings() + .ConsumerName("user")).ExtractValueSync(); UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); } } @@ -1123,6 +1134,311 @@ Y_UNIT_TEST_SUITE(Cdc) { } } + // Schema snapshots + using TActionFunc = std::function<ui64(TServer::TPtr)>; + + ui64 ResolvePqTablet(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId) { + auto streamDesc = Ls(runtime, sender, path); + + const auto& streamEntry = streamDesc->ResultSet.at(0); + UNIT_ASSERT(streamEntry.ListNodeEntry); + + const auto& children = streamEntry.ListNodeEntry->Children; + UNIT_ASSERT_VALUES_EQUAL(children.size(), 1); + + auto topicDesc = Navigate(runtime, sender, JoinPath(ChildPath(SplitPath(path), children.at(0).Name)), + NSchemeCache::TSchemeCacheNavigate::EOp::OpTopic); + + const auto& topicEntry = topicDesc->ResultSet.at(0); + UNIT_ASSERT(topicEntry.PQGroupInfo); + + const auto& pqDesc = topicEntry.PQGroupInfo->Description; + for (const auto& partition : pqDesc.GetPartitions()) { + if (partitionId == partition.GetPartitionId()) { + return partition.GetTabletId(); + } + } + + UNIT_ASSERT_C(false, "Cannot find partition: " << partitionId); + return 0; + } + + auto GetRecords(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId) { + NKikimrClient::TPersQueueRequest request; + request.MutablePartitionRequest()->SetTopic(path); + request.MutablePartitionRequest()->SetPartition(partitionId); + + auto& cmd = *request.MutablePartitionRequest()->MutableCmdRead(); + cmd.SetClientId(NKikimr::NPQ::CLIENTID_TO_READ_INTERNALLY); + cmd.SetCount(10000); + cmd.SetOffset(0); + cmd.SetReadTimestampMs(0); + cmd.SetExternalOperation(true); + + auto req = MakeHolder<TEvPersQueue::TEvRequest>(); + req->Record = std::move(request); + ForwardToTablet(runtime, ResolvePqTablet(runtime, sender, path, partitionId), sender, req.Release()); + + auto resp = runtime.GrabEdgeEventRethrow<TEvPersQueue::TEvResponse>(sender); + UNIT_ASSERT(resp); + + TVector<std::pair<TString, TString>> result; + for (const auto& r : resp->Get()->Record.GetPartitionResponse().GetCmdReadResult().GetResult()) { + const auto data = NKikimr::GetDeserializedData(r.GetData()); + result.emplace_back(r.GetPartitionKey(), data.GetData()); + } + + return result; + } + + void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) { + while (true) { + const auto records = GetRecords(*server->GetRuntime(), sender, path, 0); + if (records.size() == expected.size()) { + for (ui32 i = 0; i < expected.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(expected.at(i), records.at(i).second); + } + + break; + } + + SimulateSleep(server, TDuration::Seconds(1)); + } + } + + void ShouldDeliverChanges(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, TActionFunc action, + const TVector<TString>& queriesBefore, const TVector<TString>& queriesAfter, const TVector<TString>& records) + { + TTestPqEnv env(tableDesc, streamDesc, false); + + bool preventEnqueueing = true; + TVector<THolder<IEventHandle>> enqueued; + + env.GetServer()->GetRuntime()->SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvChangeExchange::EvEnqueueRecords: + if (preventEnqueueing) { + enqueued.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } else { + return TTestActorRuntime::EEventAction::PROCESS; + } + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + auto sendEnqueued = [&]() { + preventEnqueueing = false; + for (auto& ev : std::exchange(enqueued, TVector<THolder<IEventHandle>>())) { + env.GetServer()->GetRuntime()->Send(ev.Release(), 0, true); + } + }; + + for (const auto& query : queriesBefore) { + ExecSQL(env.GetServer(), env.GetEdgeActor(), query); + } + + WaitTxNotification(env.GetServer(), env.GetEdgeActor(), action(env.GetServer())); + + for (const auto& query : queriesAfter) { + ExecSQL(env.GetServer(), env.GetEdgeActor(), query); + } + + sendEnqueued(); + WaitForContent(env.GetServer(), env.GetEdgeActor(), "/Root/Table/Stream", records); + } + + TShardedTableOptions WithExtraColumn() { + return TShardedTableOptions() + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + {"extra", "Uint32", false, false}, + }); + } + + TShardedTableOptions::TIndex SimpleIndex() { + return TShardedTableOptions::TIndex{ + "by_value", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobal + }; + } + + TShardedTableOptions WithSimpleIndex() { + return SimpleTable() + .Indexes({ + SimpleIndex() + }); + } + + Y_UNIT_TEST(AddColumn) { + auto action = [](TServer::TPtr server) { + return AsyncAlterAddExtraColumn(server, "/Root", "Table"); + }; + + ShouldDeliverChanges(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), action, { + R"(UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);)", + }, { + R"(UPSERT INTO `/Root/Table` (key, value, extra) VALUES (2, 20, 200);)", + }, { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"extra":200,"value":20},"key":[2]})", + }); + } + + Y_UNIT_TEST(DropColumn) { + auto action = [](TServer::TPtr server) { + return AsyncAlterDropColumn(server, "/Root", "Table", "extra"); + }; + + ShouldDeliverChanges(WithExtraColumn(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), action, { + R"(UPSERT INTO `/Root/Table` (key, value, extra) VALUES (1, 10, 100);)", + }, { + R"(UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);)", + }, { + R"({"update":{"extra":100,"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + }); + } + + Y_UNIT_TEST(AddIndex) { + auto action = [](TServer::TPtr server) { + return AsyncAlterAddIndex(server, "/Root", "/Root/Table", SimpleIndex()); + }; + + ShouldDeliverChanges(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), action, { + R"(UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);)", + }, { + R"(UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);)", + }, { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + }); + } + + Y_UNIT_TEST(DropIndex) { + auto action = [](TServer::TPtr server) { + return AsyncAlterDropIndex(server, "/Root", "Table", SimpleIndex().Name); + }; + + ShouldDeliverChanges(WithSimpleIndex(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), action, { + R"(UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);)", + }, { + R"(UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);)", + }, { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + }); + } + + Y_UNIT_TEST(AddStream) { + auto action = [](TServer::TPtr server) { + return AsyncAlterAddStream(server, "/Root", "Table", + KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson, "AnotherStream")); + }; + + ShouldDeliverChanges(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), action, { + R"(UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);)", + }, { + R"(UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);)", + }, { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + }); + } + + // Split/merge + Y_UNIT_TEST(ShouldDeliverChangesOnSplitMerge) { + TTestPqEnv env(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), false); + + bool preventEnqueueing = true; + TVector<THolder<IEventHandle>> enqueued; + THashMap<ui64, ui32> splitAcks; + + env.GetServer()->GetRuntime()->SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvChangeExchange::EvEnqueueRecords: + if (preventEnqueueing) { + enqueued.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } else { + return TTestActorRuntime::EEventAction::PROCESS; + } + + case TEvDataShard::EvSplitAck: + ++splitAcks[ev->Get<TEvDataShard::TEvSplitAck>()->Record.GetOperationCookie()]; + break; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + auto waitForSplitAcks = [&](ui64 txId, ui32 count) { + if (splitAcks[txId] != count) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&](IEventHandle&) { + return splitAcks[txId] == count; + }); + env.GetServer()->GetRuntime()->DispatchEvents(opts); + } + }; + + auto sendEnqueued = [&]() { + preventEnqueueing = false; + for (auto& ev : std::exchange(enqueued, TVector<THolder<IEventHandle>>())) { + env.GetServer()->GetRuntime()->Send(ev.Release(), 0, true); + } + }; + + SetSplitMergePartCountLimit(env.GetServer()->GetRuntime(), -1); + + // split + ExecSQL(env.GetServer(), env.GetEdgeActor(), R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20); + )"); + + auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1); + + auto txId = AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 4); + waitForSplitAcks(txId, 1); + sendEnqueued(); + + WaitTxNotification(env.GetServer(), env.GetEdgeActor(), txId); + WaitForContent(env.GetServer(), env.GetEdgeActor(), "/Root/Table/Stream", { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + }); + + // merge + preventEnqueueing = true; + ExecSQL(env.GetServer(), env.GetEdgeActor(), R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 11), + (2, 21); + )"); + + tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 2); + + txId = AsyncMergeTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds); + waitForSplitAcks(txId, 2); + + ExecSQL(env.GetServer(), env.GetEdgeActor(), "UPSERT INTO `/Root/Table` (key, value) VALUES (3, 32);"); + sendEnqueued(); + + WaitTxNotification(env.GetServer(), env.GetEdgeActor(), txId); + WaitForContent(env.GetServer(), env.GetEdgeActor(), "/Root/Table/Stream", { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + R"({"update":{"value":11},"key":[1]})", + R"({"update":{"value":21},"key":[2]})", + R"({"update":{"value":32},"key":[3]})", + }); + } + } // Cdc } // NKikimr diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index 9aff5bf788e..3d334951406 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1292,28 +1292,9 @@ std::pair<TTableInfoMap, ui64> GetTables( return std::make_pair(result, ownerId); } -TTableId ResolveTableId( - Tests::TServer::TPtr server, - TActorId sender, - const TString& path) -{ - auto& runtime = *server->GetRuntime(); - - { - TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate()); - auto& entry = request->ResultSet.emplace_back(); - entry.Path = SplitPath(path); - entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable; - entry.ShowPrivatePath = true; - runtime.Send(new IEventHandle(MakeSchemeCacheID(), sender, new TEvTxProxySchemeCache::TEvNavigateKeySet(request))); - } - - auto ev = runtime.GrabEdgeEventRethrow<TEvTxProxySchemeCache::TEvNavigateKeySetResult>(sender); - NSchemeCache::TSchemeCacheNavigate* req = ev->Get()->Request.Get(); - Y_VERIFY(req->ErrorCount == 0); - - auto& res = req->ResultSet.at(0); - return res.TableId; +TTableId ResolveTableId(Tests::TServer::TPtr server, TActorId sender, const TString& path) { + auto response = Navigate(*server->GetRuntime(), sender, path); + return response->ResultSet.at(0).TableId; } NTable::TRowVersionRanges GetRemovedRowVersions( @@ -1775,6 +1756,41 @@ void SimulateSleep(TTestActorRuntime& runtime, TDuration duration) { runtime.GrabEdgeEventRethrow<TEvents::TEvWakeup>(sender); } +THolder<NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TActorId& sender, + const TString& path, NSchemeCache::TSchemeCacheNavigate::EOp op) +{ + using TNavigate = NSchemeCache::TSchemeCacheNavigate; + using TEvRequest = TEvTxProxySchemeCache::TEvNavigateKeySet; + using TEvResponse = TEvTxProxySchemeCache::TEvNavigateKeySetResult; + + auto request = MakeHolder<TNavigate>(); + auto& entry = request->ResultSet.emplace_back(); + entry.Path = SplitPath(path); + entry.RequestType = TNavigate::TEntry::ERequestType::ByPath; + entry.Operation = op; + entry.ShowPrivatePath = true; + runtime.Send(new IEventHandle(MakeSchemeCacheID(), sender, new TEvRequest(request.Release()))); + + auto ev = runtime.GrabEdgeEventRethrow<TEvResponse>(sender); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()); + + auto* response = ev->Get()->Request.Release(); + UNIT_ASSERT(response); + UNIT_ASSERT(response->ErrorCount == 0); + UNIT_ASSERT_VALUES_EQUAL(response->ResultSet.size(), 1); + + return THolder(response); +} + +THolder<NSchemeCache::TSchemeCacheNavigate> Ls( + TTestActorRuntime& runtime, + const TActorId& sender, + const TString& path) +{ + return Navigate(runtime, sender, path, NSchemeCache::TSchemeCacheNavigate::EOp::OpList); +} + void SendSQL(Tests::TServer::TPtr server, TActorId sender, const TString &sql, diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 352c94e1b82..144f75526c6 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -9,6 +9,7 @@ #include <ydb/core/testlib/minikql_compile.h> #include <ydb/core/testlib/tablet_helpers.h> #include <ydb/core/testlib/test_client.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <library/cpp/testing/unittest/registar.h> @@ -619,6 +620,17 @@ void WaitTxNotification(Tests::TServer::TPtr server, ui64 txId); void SimulateSleep(Tests::TServer::TPtr server, TDuration duration); void SimulateSleep(TTestActorRuntime& runtime, TDuration duration); +THolder<NSchemeCache::TSchemeCacheNavigate> Navigate( + TTestActorRuntime& runtime, + const TActorId& sender, + const TString& path, + NSchemeCache::TSchemeCacheNavigate::EOp op = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable); + +THolder<NSchemeCache::TSchemeCacheNavigate> Ls( + TTestActorRuntime& runtime, + const TActorId& sender, + const TString& path); + void SendSQL(Tests::TServer::TPtr server, TActorId sender, const TString &sql, |