aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-04-19 21:36:30 +0300
committerIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-04-19 21:36:30 +0300
commit6b1026b629f4ca00592d208d02e18d5c65409d38 (patch)
tree2e47d3427b62b3d31aebf4f24e621a2e31539f9c
parent721c4d6f6ca8aa408aeaf64ccd5dbcdb5e35c2d3 (diff)
downloadydb-6b1026b629f4ca00592d208d02e18d5c65409d38.tar.gz
Additional tests (& fixes): schema snapshots, split/merge KIKIMR-13698
ref:7ef99d44fd7498892ffa36bf7f026a40cdece608
-rw-r--r--ydb/core/persqueue/partition.cpp10
-rw-r--r--ydb/core/tx/datashard/change_exchange_split.cpp28
-rw-r--r--ydb/core/tx/datashard/change_record.cpp2
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_change_sender_activation.cpp13
-rw-r--r--ydb/core/tx/datashard/datashard_change_sending.cpp12
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h1
-rw-r--r--ydb/core/tx/datashard/datashard_split_dst.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard_split_src.cpp63
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_collector.cpp26
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp340
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp60
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h12
13 files changed, 463 insertions, 112 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index a014d32e7cd..d1eecea1323 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -4634,8 +4634,14 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, const TActorCon
}
if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed
- Y_VERIFY(sourceIdWriter.GetSourceIdsToWrite().empty());
- return request->Record.CmdWriteSize() > 0 || request->Record.CmdRenameSize() > 0 || request->Record.CmdDeleteRangeSize() > 0;
+ if (sourceIdWriter.GetSourceIdsToWrite().empty()) {
+ return request->Record.CmdWriteSize() > 0
+ || request->Record.CmdRenameSize() > 0
+ || request->Record.CmdDeleteRangeSize() > 0;
+ } else {
+ sourceIdWriter.FillRequest(request, Partition);
+ return true;
+ }
}
sourceIdWriter.FillRequest(request, Partition);
diff --git a/ydb/core/tx/datashard/change_exchange_split.cpp b/ydb/core/tx/datashard/change_exchange_split.cpp
index af95c80036b..a7fc8afe743 100644
--- a/ydb/core/tx/datashard/change_exchange_split.cpp
+++ b/ydb/core/tx/datashard/change_exchange_split.cpp
@@ -112,6 +112,7 @@ public:
group.SetId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(dstTabletId)));
}
+ NTabletPipe::SendData(SelfId(), PipeClient, ev.Release());
Become(&TThis::StateWork);
}
@@ -141,13 +142,14 @@ class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHel
LogPrefix = TStringBuilder()
<< "[ChangeExchangeSplitCdcWorker]"
<< "[" << SrcTabletId << "]"
- << SelfId() /* contains brackets */;
+ << SelfId() /* contains brackets */ << " ";
}
return LogPrefix.GetRef();
}
void Ack() {
+ LOG_I("Send ack");
Send(Parent, new TEvChangeExchange::TEvSplitAck());
PassAway();
}
@@ -335,14 +337,23 @@ class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHel
workers.emplace(partitionId, it->second);
Workers.erase(it);
} else {
+ LOG_T("Register new worker"
+ << ": partitionId# " << partitionId);
+
const auto worker = Register(new TCdcPartitionWorker(SelfId(), partitionId, tabletId, SrcTabletId, DstTabletIds));
workers.emplace(partitionId, worker);
Pending.emplace(worker, partitionId);
}
}
- for (const auto& [_, worker] : Workers) {
+ for (const auto& kv : Workers) {
+ const auto& partitionId = kv.first;
+ const auto& worker = kv.second;
+
if (worker) {
+ LOG_T("Kill stale worker"
+ << ": partitionId# " << partitionId);
+
Send(worker, new TEvents::TEvPoisonPill());
Pending.erase(worker);
}
@@ -352,6 +363,8 @@ class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHel
return Ack();
}
+ LOG_I("Wait " << Pending.size() << " worker(s)");
+
Workers = std::move(workers);
Become(&TThis::StateWork);
}
@@ -473,7 +486,7 @@ class TChangeExchageSplit: public TActorBootstrapped<TChangeExchageSplit> {
LogPrefix = TStringBuilder()
<< "[ChangeExchangeSplit]"
<< "[" << DataShard.TabletId << "]"
- << SelfId() /* contains brackets */;
+ << SelfId() /* contains brackets */ << " ";
}
return LogPrefix.GetRef();
@@ -495,6 +508,7 @@ class TChangeExchageSplit: public TActorBootstrapped<TChangeExchageSplit> {
}
void Ack() {
+ LOG_I("Send ack");
Send(DataShard.ActorId, new TEvChangeExchange::TEvSplitAck());
PassAway();
}
@@ -549,10 +563,16 @@ public:
void Bootstrap() {
if (!Workers) {
+ LOG_N("Auto-ack (no active worker)");
return Ack();
}
- for (auto& [pathId, worker] : Workers) {
+ for (auto& kv : Workers) {
+ const auto& pathId = kv.first;
+ auto& worker = kv.second;
+
+ LOG_D("Register worker"
+ << ": pathId# " << pathId);
Pending.emplace(RegisterWorker(pathId, worker), pathId);
}
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index 2a7db10a626..1ec9cb145e1 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -263,6 +263,8 @@ void TChangeRecord::Out(IOutputStream& out) const {
<< " PathId: " << PathId
<< " Kind: " << Kind
<< " Body: " << Body.size() << "b"
+ << " TableId: " << TableId
+ << " SchemaVersion: " << SchemaVersion
<< " }";
}
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 301722c108b..d5dccdc45c9 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -134,8 +134,9 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
/// Write
void Write(NKikimrClient::TPersQueueRequest&& request) {
- auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>(++Cookie);
+ auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>();
ev->Record = std::move(request);
+ ev->Record.MutablePartitionRequest()->SetCookie(++Cookie);
Send(Writer, std::move(ev));
Become(&TThis::StateWrite);
diff --git a/ydb/core/tx/datashard/datashard_change_sender_activation.cpp b/ydb/core/tx/datashard/datashard_change_sender_activation.cpp
index 2635668bece..7c3a6d31f90 100644
--- a/ydb/core/tx/datashard/datashard_change_sender_activation.cpp
+++ b/ydb/core/tx/datashard/datashard_change_sender_activation.cpp
@@ -99,16 +99,9 @@ public:
<< ": origin# " << Origin
<< ", at tablet# " << Self->TabletID());
- if (AllDstAcksReceived) {
- for (const auto& [ackTo, opIds] : Self->SrcAckPartitioningChangedTo) {
- for (const ui64 opId : opIds) {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ack split partitioning changed to schemeshard " << opId);
- ctx.Send(ackTo, new TEvDataShard::TEvSplitPartitioningChangedAck(opId, Self->TabletID()));
- }
- }
-
- Self->SrcAckPartitioningChangedTo.clear();
- Self->CheckStateChange(ctx);
+ if (AllDstAcksReceived && Self->SrcAckPartitioningChangedTo) {
+ Self->Execute(Self->CreateTxSplitPartitioningChanged(std::move(Self->SrcAckPartitioningChangedTo)), ctx);
+ Self->SrcAckPartitioningChangedTo.clear(); // to be sure
}
}
diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp
index 3ef9d1b96e4..7e532420bae 100644
--- a/ydb/core/tx/datashard/datashard_change_sending.cpp
+++ b/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -64,13 +64,13 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> {
<< ", it->BodySize: " << it->BodySize);
const auto schemaVersion = basic.GetValue<Schema::ChangeRecords::SchemaVersion>();
- TUserTable::TCPtr schema;
+ const auto tableId = TPathId(
+ basic.GetValue<Schema::ChangeRecords::TableOwnerId>(),
+ basic.GetValue<Schema::ChangeRecords::TablePathId>()
+ );
+ TUserTable::TCPtr schema;
if (schemaVersion) {
- const auto tableId = TPathId(
- basic.GetValue<Schema::ChangeRecords::TableOwnerId>(),
- basic.GetValue<Schema::ChangeRecords::TablePathId>()
- );
const auto snapshotKey = TSchemaSnapshotKey(tableId, schemaVersion);
if (const auto* snapshot = Self->GetSchemaSnapshotManager().FindSnapshot(snapshotKey)) {
schema = snapshot->Schema;
@@ -86,6 +86,8 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> {
basic.GetValue<Schema::ChangeRecords::PathOwnerId>(),
basic.GetValue<Schema::ChangeRecords::LocalPathId>()
))
+ .WithTableId(tableId)
+ .WithSchemaVersion(schemaVersion)
.WithSchema(schema)
.WithBody(details.GetValue<Schema::ChangeRecordDetails::Body>())
.Build());
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 86c536a4174..e06c0b64cd4 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1079,6 +1079,7 @@ class TDataShard
NTabletFlatExecutor::ITransaction* CreateTxSchemaChanged(TEvDataShard::TEvSchemaChangedResult::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxStartSplit();
NTabletFlatExecutor::ITransaction* CreateTxSplitSnapshotComplete(TIntrusivePtr<TSplitSnapshotContext> snapContext);
+ NTabletFlatExecutor::ITransaction* CreateTxSplitPartitioningChanged(THashMap<TActorId, THashSet<ui64>>&& waiters);
NTabletFlatExecutor::ITransaction* CreateTxInitiateBorrowedPartsReturn();
NTabletFlatExecutor::ITransaction* CreateTxCheckInReadSets();
NTabletFlatExecutor::ITransaction* CreateTxRemoveOldInReadSets();
diff --git a/ydb/core/tx/datashard/datashard_split_dst.cpp b/ydb/core/tx/datashard/datashard_split_dst.cpp
index 7665d376512..c2184ffe1df 100644
--- a/ydb/core/tx/datashard/datashard_split_dst.cpp
+++ b/ydb/core/tx/datashard/datashard_split_dst.cpp
@@ -50,6 +50,11 @@ public:
if (Self->GetPathOwnerId() == INVALID_TABLET_ID) {
Self->PersistOwnerPathId(tableId.OwnerId, txc);
}
+
+ if (info->NeedSchemaSnapshots()) {
+ const ui64 txId = Ev->Get()->Record.GetOperationCookie();
+ Self->AddSchemaSnapshot(tableId, info->GetTableSchemaVersion(), 0, txId, txc, ctx);
+ }
}
Self->DstSplitDescription = std::make_shared<NKikimrTxDataShard::TSplitMergeDescription>(Ev->Get()->Record.GetSplitDescription());
diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp
index a7349fc9cf3..a9ccd68001c 100644
--- a/ydb/core/tx/datashard/datashard_split_src.cpp
+++ b/ydb/core/tx/datashard/datashard_split_src.cpp
@@ -458,38 +458,24 @@ public:
class TDataShard::TTxSplitPartitioningChanged : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
-private:
- TEvDataShard::TEvSplitPartitioningChanged::TPtr Ev;
- bool DelayPartitioningChangedAck = false;
+ THashMap<TActorId, THashSet<ui64>> Waiters;
public:
- TTxSplitPartitioningChanged(TDataShard* ds, TEvDataShard::TEvSplitPartitioningChanged::TPtr& ev)
+ TTxSplitPartitioningChanged(TDataShard* ds, THashMap<TActorId, THashSet<ui64>>&& waiters)
: NTabletFlatExecutor::TTransactionBase<TDataShard>(ds)
- , Ev(ev)
+ , Waiters(std::move(waiters))
{}
TTxType GetTxType() const override { return TXTYPE_SPLIT_PARTITIONING_CHANGED; }
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
- ui64 opId = Ev->Get()->Record.GetOperationCookie();
-
- LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Got TEvSplitPartitioningChanged opId %" PRIu64 " at datashard %" PRIu64 " state %s",
- opId, Self->TabletID(), DatashardStateName(Self->State).data());
-
- if (Self->ChangesQueue || !Self->ChangeSenderActivator.AllAcked()) {
- LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " delay partitioning changed ack"
- << " ChangesQueue size: " << Self->ChangesQueue.size()
- << " siblings to be activated: " << Self->ChangeSenderActivator.Dump());
-
- DelayPartitioningChangedAck = true;
- Self->SrcAckPartitioningChangedTo[Ev->Sender].insert(opId);
- }
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ Y_VERIFY(!Self->ChangesQueue && Self->ChangeSenderActivator.AllAcked());
// TODO: At this point Src should start rejecting all new Tx with SchemaChanged status
if (Self->State != TShardState::SplitSrcWaitForPartitioningChanged) {
Y_VERIFY(Self->State == TShardState::PreOffline || Self->State == TShardState::Offline,
- "Unexpected TEvSplitPartitioningChanged opId %" PRIu64 " at datashard %" PRIu64 " state %s",
- Ev->Get()->Record.GetOperationCookie(), Self->TabletID(), DatashardStateName(Self->State).data());
+ "Unexpected TEvSplitPartitioningChanged at datashard %" PRIu64 " state %s",
+ Self->TabletID(), DatashardStateName(Self->State).data());
return true;
}
@@ -504,16 +490,13 @@ public:
}
void Complete(const TActorContext &ctx) override {
- TActorId ackTo = Ev->Sender;
- ui64 opId = Ev->Get()->Record.GetOperationCookie();
-
- if (DelayPartitioningChangedAck) {
- return;
+ for (const auto& [ackTo, opIds] : Waiters) {
+ for (const ui64 opId : opIds) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ack split partitioning changed to schemeshard " << opId);
+ ctx.Send(ackTo, new TEvDataShard::TEvSplitPartitioningChangedAck(opId, Self->TabletID()));
+ }
}
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ack split partitioning changed to schemeshard " << opId);
- ctx.Send(ackTo, new TEvDataShard::TEvSplitPartitioningChangedAck(opId, Self->TabletID()));
-
// TODO: properly check if there are no loans
Self->CheckStateChange(ctx);
}
@@ -528,7 +511,27 @@ void TDataShard::Handle(TEvDataShard::TEvSplitTransferSnapshotAck::TPtr& ev, con
}
void TDataShard::Handle(TEvDataShard::TEvSplitPartitioningChanged::TPtr& ev, const TActorContext& ctx) {
- Execute(new TTxSplitPartitioningChanged(this, ev), ctx);
+ const auto opId = ev->Get()->Record.GetOperationCookie();
+
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Got TEvSplitPartitioningChanged"
+ << ": opId: " << opId
+ << ", at datashard: " << TabletID()
+ << ", state: " << DatashardStateName(State).data());
+
+ SrcAckPartitioningChangedTo[ev->Sender].insert(opId);
+
+ if (ChangesQueue || !ChangeSenderActivator.AllAcked()) {
+ LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " delay partitioning changed ack"
+ << ", ChangesQueue size: " << ChangesQueue.size()
+ << ", siblings to be activated: " << ChangeSenderActivator.Dump());
+ } else {
+ Execute(CreateTxSplitPartitioningChanged(std::move(SrcAckPartitioningChangedTo)), ctx);
+ SrcAckPartitioningChangedTo.clear(); // to be sure
+ }
+}
+
+NTabletFlatExecutor::ITransaction* TDataShard::CreateTxSplitPartitioningChanged(THashMap<TActorId, THashSet<ui64>>&& waiters) {
+ return new TTxSplitPartitioningChanged(this, std::move(waiters));
}
}}
diff --git a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
index 83b7d09b6cd..854459e8430 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
@@ -3,7 +3,6 @@
#include <ydb/core/protos/change_exchange.pb.h>
#include <ydb/core/scheme/scheme_tablecell.h>
-#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>
namespace NKikimr {
@@ -107,31 +106,6 @@ auto GetChangeRecordsWithDetails(TTestActorRuntime& runtime, const TActorId& sen
return result;
}
-THolder<NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TActorId& sender, const TString& path) {
- using TNavigate = NSchemeCache::TSchemeCacheNavigate;
- using TEvRequest = TEvTxProxySchemeCache::TEvNavigateKeySet;
- using TEvResponse = TEvTxProxySchemeCache::TEvNavigateKeySetResult;
-
- auto request = MakeHolder<TNavigate>();
- auto& entry = request->ResultSet.emplace_back();
- entry.Path = SplitPath(path);
- entry.RequestType = TNavigate::TEntry::ERequestType::ByPath;
- entry.Operation = TNavigate::EOp::OpTable;
- entry.ShowPrivatePath = true;
- runtime.Send(new IEventHandle(MakeSchemeCacheID(), sender, new TEvRequest(request.Release())));
-
- auto ev = runtime.GrabEdgeEventRethrow<TEvResponse>(sender);
- UNIT_ASSERT(ev);
- UNIT_ASSERT(ev->Get());
-
- auto* response = ev->Get()->Request.Release();
- UNIT_ASSERT(response);
- UNIT_ASSERT(response->ErrorCount == 0);
- UNIT_ASSERT_VALUES_EQUAL(response->ResultSet.size(), 1);
-
- return THolder(response);
-}
-
using TStructKey = TVector<std::pair<TString, ui32>>;
using TStructValue = THashMap<TString, ui32>;
constexpr ui32 Null = 0;
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index eb1df20a3a7..8ae06589ebe 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -4,6 +4,9 @@
#include <library/cpp/json/json_reader.h>
#include <ydb/core/base/path.h>
+#include <ydb/core/persqueue/events/global.h>
+#include <ydb/core/persqueue/user_info.h>
+#include <ydb/core/persqueue/write_meta.h>
#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
@@ -688,15 +691,19 @@ Y_UNIT_TEST_SUITE(Cdc) {
explicit TTestEnv(
const TShardedTableOptions& tableDesc,
const TCdcStream& streamDesc,
+ bool useRealThreads = true,
const TString& root = "Root",
const TString& tableName = "Table")
{
auto settings = TServerSettings(PortManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetUseRealThreads(useRealThreads)
.SetDomainName(root)
.SetGrpcPort(PortManager.GetPort(2135));
Server = new TServer(settings);
- Server->EnableGRpc(settings.GrpcPort);
+ if (useRealThreads) {
+ Server->EnableGRpc(settings.GrpcPort);
+ }
const auto database = JoinPath({root});
auto& runtime = *Server->GetRuntime();
@@ -708,7 +715,9 @@ Y_UNIT_TEST_SUITE(Cdc) {
CreateShardedTable(Server, EdgeActor, database, tableName, tableDesc);
WaitTxNotification(Server, EdgeActor, AsyncAlterAddStream(Server, database, tableName, streamDesc));
- Client = TDerived::MakeClient(Server->GetDriver(), database);
+ if (useRealThreads) {
+ Client = TDerived::MakeClient(Server->GetDriver(), database);
+ }
}
TServer::TPtr GetServer() {
@@ -739,6 +748,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
static void SetupLogging(TTestActorRuntime& runtime) {
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::CHANGE_EXCHANGE, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NLog::PRI_DEBUG);
}
@@ -777,25 +788,25 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
- TCdcStream KeysOnly(NKikimrSchemeOp::ECdcStreamFormat format) {
+ TCdcStream KeysOnly(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") {
return TCdcStream{
- .Name = "Stream",
+ .Name = name,
.Mode = NKikimrSchemeOp::ECdcStreamModeKeysOnly,
.Format = format,
};
}
- TCdcStream Updates(NKikimrSchemeOp::ECdcStreamFormat format) {
+ TCdcStream Updates(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") {
return TCdcStream{
- .Name = "Stream",
+ .Name = name,
.Mode = NKikimrSchemeOp::ECdcStreamModeUpdate,
.Format = format,
};
}
- TCdcStream NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormat format) {
+ TCdcStream NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") {
return TCdcStream{
- .Name = "Stream",
+ .Name = name,
.Mode = NKikimrSchemeOp::ECdcStreamModeNewAndOldImages,
.Format = format,
};
@@ -826,8 +837,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
// add consumer
{
- auto res = client.AddReadRule("/Root/Table/Stream",
- TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName("user"))).ExtractValueSync();
+ auto res = client.AddReadRule("/Root/Table/Stream", TAddReadRuleSettings()
+ .ReadRule(TReadRuleSettings().ConsumerName("user"))).ExtractValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
@@ -860,8 +871,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
// remove consumer
{
- auto res = client.RemoveReadRule("/Root/Table/Stream",
- TRemoveReadRuleSettings().ConsumerName("user")).ExtractValueSync();
+ auto res = client.RemoveReadRule("/Root/Table/Stream", TRemoveReadRuleSettings()
+ .ConsumerName("user")).ExtractValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
}
@@ -1123,6 +1134,311 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
}
+ // Schema snapshots
+ using TActionFunc = std::function<ui64(TServer::TPtr)>;
+
+ ui64 ResolvePqTablet(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId) {
+ auto streamDesc = Ls(runtime, sender, path);
+
+ const auto& streamEntry = streamDesc->ResultSet.at(0);
+ UNIT_ASSERT(streamEntry.ListNodeEntry);
+
+ const auto& children = streamEntry.ListNodeEntry->Children;
+ UNIT_ASSERT_VALUES_EQUAL(children.size(), 1);
+
+ auto topicDesc = Navigate(runtime, sender, JoinPath(ChildPath(SplitPath(path), children.at(0).Name)),
+ NSchemeCache::TSchemeCacheNavigate::EOp::OpTopic);
+
+ const auto& topicEntry = topicDesc->ResultSet.at(0);
+ UNIT_ASSERT(topicEntry.PQGroupInfo);
+
+ const auto& pqDesc = topicEntry.PQGroupInfo->Description;
+ for (const auto& partition : pqDesc.GetPartitions()) {
+ if (partitionId == partition.GetPartitionId()) {
+ return partition.GetTabletId();
+ }
+ }
+
+ UNIT_ASSERT_C(false, "Cannot find partition: " << partitionId);
+ return 0;
+ }
+
+ auto GetRecords(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId) {
+ NKikimrClient::TPersQueueRequest request;
+ request.MutablePartitionRequest()->SetTopic(path);
+ request.MutablePartitionRequest()->SetPartition(partitionId);
+
+ auto& cmd = *request.MutablePartitionRequest()->MutableCmdRead();
+ cmd.SetClientId(NKikimr::NPQ::CLIENTID_TO_READ_INTERNALLY);
+ cmd.SetCount(10000);
+ cmd.SetOffset(0);
+ cmd.SetReadTimestampMs(0);
+ cmd.SetExternalOperation(true);
+
+ auto req = MakeHolder<TEvPersQueue::TEvRequest>();
+ req->Record = std::move(request);
+ ForwardToTablet(runtime, ResolvePqTablet(runtime, sender, path, partitionId), sender, req.Release());
+
+ auto resp = runtime.GrabEdgeEventRethrow<TEvPersQueue::TEvResponse>(sender);
+ UNIT_ASSERT(resp);
+
+ TVector<std::pair<TString, TString>> result;
+ for (const auto& r : resp->Get()->Record.GetPartitionResponse().GetCmdReadResult().GetResult()) {
+ const auto data = NKikimr::GetDeserializedData(r.GetData());
+ result.emplace_back(r.GetPartitionKey(), data.GetData());
+ }
+
+ return result;
+ }
+
+ void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
+ while (true) {
+ const auto records = GetRecords(*server->GetRuntime(), sender, path, 0);
+ if (records.size() == expected.size()) {
+ for (ui32 i = 0; i < expected.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(expected.at(i), records.at(i).second);
+ }
+
+ break;
+ }
+
+ SimulateSleep(server, TDuration::Seconds(1));
+ }
+ }
+
+ void ShouldDeliverChanges(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, TActionFunc action,
+ const TVector<TString>& queriesBefore, const TVector<TString>& queriesAfter, const TVector<TString>& records)
+ {
+ TTestPqEnv env(tableDesc, streamDesc, false);
+
+ bool preventEnqueueing = true;
+ TVector<THolder<IEventHandle>> enqueued;
+
+ env.GetServer()->GetRuntime()->SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvChangeExchange::EvEnqueueRecords:
+ if (preventEnqueueing) {
+ enqueued.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ } else {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ auto sendEnqueued = [&]() {
+ preventEnqueueing = false;
+ for (auto& ev : std::exchange(enqueued, TVector<THolder<IEventHandle>>())) {
+ env.GetServer()->GetRuntime()->Send(ev.Release(), 0, true);
+ }
+ };
+
+ for (const auto& query : queriesBefore) {
+ ExecSQL(env.GetServer(), env.GetEdgeActor(), query);
+ }
+
+ WaitTxNotification(env.GetServer(), env.GetEdgeActor(), action(env.GetServer()));
+
+ for (const auto& query : queriesAfter) {
+ ExecSQL(env.GetServer(), env.GetEdgeActor(), query);
+ }
+
+ sendEnqueued();
+ WaitForContent(env.GetServer(), env.GetEdgeActor(), "/Root/Table/Stream", records);
+ }
+
+ TShardedTableOptions WithExtraColumn() {
+ return TShardedTableOptions()
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false},
+ {"extra", "Uint32", false, false},
+ });
+ }
+
+ TShardedTableOptions::TIndex SimpleIndex() {
+ return TShardedTableOptions::TIndex{
+ "by_value", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobal
+ };
+ }
+
+ TShardedTableOptions WithSimpleIndex() {
+ return SimpleTable()
+ .Indexes({
+ SimpleIndex()
+ });
+ }
+
+ Y_UNIT_TEST(AddColumn) {
+ auto action = [](TServer::TPtr server) {
+ return AsyncAlterAddExtraColumn(server, "/Root", "Table");
+ };
+
+ ShouldDeliverChanges(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), action, {
+ R"(UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);)",
+ }, {
+ R"(UPSERT INTO `/Root/Table` (key, value, extra) VALUES (2, 20, 200);)",
+ }, {
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"extra":200,"value":20},"key":[2]})",
+ });
+ }
+
+ Y_UNIT_TEST(DropColumn) {
+ auto action = [](TServer::TPtr server) {
+ return AsyncAlterDropColumn(server, "/Root", "Table", "extra");
+ };
+
+ ShouldDeliverChanges(WithExtraColumn(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), action, {
+ R"(UPSERT INTO `/Root/Table` (key, value, extra) VALUES (1, 10, 100);)",
+ }, {
+ R"(UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);)",
+ }, {
+ R"({"update":{"extra":100,"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ });
+ }
+
+ Y_UNIT_TEST(AddIndex) {
+ auto action = [](TServer::TPtr server) {
+ return AsyncAlterAddIndex(server, "/Root", "/Root/Table", SimpleIndex());
+ };
+
+ ShouldDeliverChanges(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), action, {
+ R"(UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);)",
+ }, {
+ R"(UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);)",
+ }, {
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ });
+ }
+
+ Y_UNIT_TEST(DropIndex) {
+ auto action = [](TServer::TPtr server) {
+ return AsyncAlterDropIndex(server, "/Root", "Table", SimpleIndex().Name);
+ };
+
+ ShouldDeliverChanges(WithSimpleIndex(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), action, {
+ R"(UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);)",
+ }, {
+ R"(UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);)",
+ }, {
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ });
+ }
+
+ Y_UNIT_TEST(AddStream) {
+ auto action = [](TServer::TPtr server) {
+ return AsyncAlterAddStream(server, "/Root", "Table",
+ KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson, "AnotherStream"));
+ };
+
+ ShouldDeliverChanges(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), action, {
+ R"(UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);)",
+ }, {
+ R"(UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);)",
+ }, {
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ });
+ }
+
+ // Split/merge
+ Y_UNIT_TEST(ShouldDeliverChangesOnSplitMerge) {
+ TTestPqEnv env(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), false);
+
+ bool preventEnqueueing = true;
+ TVector<THolder<IEventHandle>> enqueued;
+ THashMap<ui64, ui32> splitAcks;
+
+ env.GetServer()->GetRuntime()->SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvChangeExchange::EvEnqueueRecords:
+ if (preventEnqueueing) {
+ enqueued.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ } else {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+
+ case TEvDataShard::EvSplitAck:
+ ++splitAcks[ev->Get<TEvDataShard::TEvSplitAck>()->Record.GetOperationCookie()];
+ break;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ auto waitForSplitAcks = [&](ui64 txId, ui32 count) {
+ if (splitAcks[txId] != count) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&](IEventHandle&) {
+ return splitAcks[txId] == count;
+ });
+ env.GetServer()->GetRuntime()->DispatchEvents(opts);
+ }
+ };
+
+ auto sendEnqueued = [&]() {
+ preventEnqueueing = false;
+ for (auto& ev : std::exchange(enqueued, TVector<THolder<IEventHandle>>())) {
+ env.GetServer()->GetRuntime()->Send(ev.Release(), 0, true);
+ }
+ };
+
+ SetSplitMergePartCountLimit(env.GetServer()->GetRuntime(), -1);
+
+ // split
+ ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20);
+ )");
+
+ auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
+ UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);
+
+ auto txId = AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 4);
+ waitForSplitAcks(txId, 1);
+ sendEnqueued();
+
+ WaitTxNotification(env.GetServer(), env.GetEdgeActor(), txId);
+ WaitForContent(env.GetServer(), env.GetEdgeActor(), "/Root/Table/Stream", {
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ });
+
+ // merge
+ preventEnqueueing = true;
+ ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 11),
+ (2, 21);
+ )");
+
+ tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
+ UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 2);
+
+ txId = AsyncMergeTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds);
+ waitForSplitAcks(txId, 2);
+
+ ExecSQL(env.GetServer(), env.GetEdgeActor(), "UPSERT INTO `/Root/Table` (key, value) VALUES (3, 32);");
+ sendEnqueued();
+
+ WaitTxNotification(env.GetServer(), env.GetEdgeActor(), txId);
+ WaitForContent(env.GetServer(), env.GetEdgeActor(), "/Root/Table/Stream", {
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ R"({"update":{"value":11},"key":[1]})",
+ R"({"update":{"value":21},"key":[2]})",
+ R"({"update":{"value":32},"key":[3]})",
+ });
+ }
+
} // Cdc
} // NKikimr
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index 9aff5bf788e..3d334951406 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1292,28 +1292,9 @@ std::pair<TTableInfoMap, ui64> GetTables(
return std::make_pair(result, ownerId);
}
-TTableId ResolveTableId(
- Tests::TServer::TPtr server,
- TActorId sender,
- const TString& path)
-{
- auto& runtime = *server->GetRuntime();
-
- {
- TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate());
- auto& entry = request->ResultSet.emplace_back();
- entry.Path = SplitPath(path);
- entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
- entry.ShowPrivatePath = true;
- runtime.Send(new IEventHandle(MakeSchemeCacheID(), sender, new TEvTxProxySchemeCache::TEvNavigateKeySet(request)));
- }
-
- auto ev = runtime.GrabEdgeEventRethrow<TEvTxProxySchemeCache::TEvNavigateKeySetResult>(sender);
- NSchemeCache::TSchemeCacheNavigate* req = ev->Get()->Request.Get();
- Y_VERIFY(req->ErrorCount == 0);
-
- auto& res = req->ResultSet.at(0);
- return res.TableId;
+TTableId ResolveTableId(Tests::TServer::TPtr server, TActorId sender, const TString& path) {
+ auto response = Navigate(*server->GetRuntime(), sender, path);
+ return response->ResultSet.at(0).TableId;
}
NTable::TRowVersionRanges GetRemovedRowVersions(
@@ -1775,6 +1756,41 @@ void SimulateSleep(TTestActorRuntime& runtime, TDuration duration) {
runtime.GrabEdgeEventRethrow<TEvents::TEvWakeup>(sender);
}
+THolder<NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TActorId& sender,
+ const TString& path, NSchemeCache::TSchemeCacheNavigate::EOp op)
+{
+ using TNavigate = NSchemeCache::TSchemeCacheNavigate;
+ using TEvRequest = TEvTxProxySchemeCache::TEvNavigateKeySet;
+ using TEvResponse = TEvTxProxySchemeCache::TEvNavigateKeySetResult;
+
+ auto request = MakeHolder<TNavigate>();
+ auto& entry = request->ResultSet.emplace_back();
+ entry.Path = SplitPath(path);
+ entry.RequestType = TNavigate::TEntry::ERequestType::ByPath;
+ entry.Operation = op;
+ entry.ShowPrivatePath = true;
+ runtime.Send(new IEventHandle(MakeSchemeCacheID(), sender, new TEvRequest(request.Release())));
+
+ auto ev = runtime.GrabEdgeEventRethrow<TEvResponse>(sender);
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get());
+
+ auto* response = ev->Get()->Request.Release();
+ UNIT_ASSERT(response);
+ UNIT_ASSERT(response->ErrorCount == 0);
+ UNIT_ASSERT_VALUES_EQUAL(response->ResultSet.size(), 1);
+
+ return THolder(response);
+}
+
+THolder<NSchemeCache::TSchemeCacheNavigate> Ls(
+ TTestActorRuntime& runtime,
+ const TActorId& sender,
+ const TString& path)
+{
+ return Navigate(runtime, sender, path, NSchemeCache::TSchemeCacheNavigate::EOp::OpList);
+}
+
void SendSQL(Tests::TServer::TPtr server,
TActorId sender,
const TString &sql,
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 352c94e1b82..144f75526c6 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -9,6 +9,7 @@
#include <ydb/core/testlib/minikql_compile.h>
#include <ydb/core/testlib/tablet_helpers.h>
#include <ydb/core/testlib/test_client.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <library/cpp/testing/unittest/registar.h>
@@ -619,6 +620,17 @@ void WaitTxNotification(Tests::TServer::TPtr server, ui64 txId);
void SimulateSleep(Tests::TServer::TPtr server, TDuration duration);
void SimulateSleep(TTestActorRuntime& runtime, TDuration duration);
+THolder<NSchemeCache::TSchemeCacheNavigate> Navigate(
+ TTestActorRuntime& runtime,
+ const TActorId& sender,
+ const TString& path,
+ NSchemeCache::TSchemeCacheNavigate::EOp op = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable);
+
+THolder<NSchemeCache::TSchemeCacheNavigate> Ls(
+ TTestActorRuntime& runtime,
+ const TActorId& sender,
+ const TString& path);
+
void SendSQL(Tests::TServer::TPtr server,
TActorId sender,
const TString &sql,