diff options
author | snaury <snaury@ydb.tech> | 2022-10-28 13:35:08 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-10-28 13:35:08 +0300 |
commit | 7ac0d812a0a46f738031ac8e9e3dfec4681bbd73 (patch) | |
tree | c234c20b2c9949781159774048caa82c846ea5ca | |
parent | 5f4f0d7775ea9bc0abd2e10f21823900202bf880 (diff) | |
download | ydb-7ac0d812a0a46f738031ac8e9e3dfec4681bbd73.tar.gz |
Test for drop/write race in columnshard
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_ut_common.cpp | 26 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_ut_common.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_columnshard_schema.cpp | 50 |
3 files changed, 83 insertions, 1 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index f409132c1c..35c4b08ff5 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -87,6 +87,28 @@ bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui6 return (resWrite.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS); } +std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId, + ui64 tableId, const TString& dedupId, const TString& data, + std::shared_ptr<arrow::Schema> schema) +{ + auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, dedupId, data); + if (schema) { + write->SetArrowSchema(NArrow::SerializeSchema(*schema)); + } + ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, write.release()); + TAutoPtr<IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle); + UNIT_ASSERT(event); + + auto& resWrite = Proto(event); + UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), TTestTxConfig::TxTablet0); + UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), 0); + if (resWrite.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS) { + return resWrite.GetWriteId(); + } + return {}; +} + void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const TVector<ui64>& pathIds, NOlap::TSnapshot snap, ui64 scanId) { auto scan = std::make_unique<TEvColumnShard::TEvScan>(); @@ -141,6 +163,10 @@ void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::PREPARED); } +void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const TVector<ui64>& writeIds) { + ProposeCommit(runtime, sender, 0, txId, writeIds); +} + void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, const TSet<ui64>& txIds) { auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(planStep, 0, TTestTxConfig::TxTablet0); for (ui64 txId : txIds) { diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index b218c56b47..8a9839f9ad 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -291,7 +291,9 @@ struct TTestSchema { static TString CommitTxBody(ui64 metaShard, const TVector<ui64>& writeIds) { NKikimrTxColumnShard::TCommitTxBody proto; - proto.SetTxInitiator(metaShard); + if (metaShard) { + proto.SetTxInitiator(metaShard); + } for (ui64 id : writeIds) { proto.AddWriteIds(id); } @@ -337,9 +339,13 @@ bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap); bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId, const TString& data, std::shared_ptr<arrow::Schema> schema = {}); +std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId, + ui64 tableId, const TString& dedupId, const TString& data, + std::shared_ptr<arrow::Schema> schema = {}); void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const TVector<ui64>& pathIds, NOlap::TSnapshot snap, ui64 scanId = 0); void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 txId, const TVector<ui64>& writeIds); +void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const TVector<ui64>& writeIds); void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, const TSet<ui64>& txIds); inline void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, ui64 txId) { diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp index b530b362af..20e5e296ca 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp @@ -695,6 +695,52 @@ void TestDrop(bool reboots) { } } +void TestDropWriteRace() { + TTestBasicRuntime runtime; + TTester::Setup(runtime); + + TActorId sender = runtime.AllocateEdgeActor(); + CreateTestBootstrapper(runtime, + CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), + &CreateColumnShard); + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); + runtime.DispatchEvents(options); + + // + + ui64 tableId = 1; + ui64 planStep = 1000000000; // greater then delays + ui64 txId = 100; + + NLongTxService::TLongTxId longTxId; + UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1")); + + bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk), + {++planStep, ++txId}); + UNIT_ASSERT(ok); + PlanSchemaTx(runtime, sender, {planStep, txId}); + + TString data = MakeTestBlob({0, 100}, testYdbSchema); + UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT); + + // Write into InsertTable + auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, "0", data); + UNIT_ASSERT(writeIdOpt); + ProposeCommit(runtime, sender, ++txId, {*writeIdOpt}); + auto commitTxId = txId; + + // Drop table + ok = ProposeSchemaTx(runtime, sender, TTestSchema::DropTableTxBody(tableId, 2), {++planStep, ++txId}); + if (ok) { + PlanSchemaTx(runtime, sender, {planStep, txId}); + } + + // Plan commit + PlanCommit(runtime, sender, ++planStep, commitTxId); +} + } namespace NColumnShard { @@ -845,6 +891,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { Y_UNIT_TEST(RebootDrop) { TestDrop(true); } + + Y_UNIT_TEST(DropWriteRace) { + TestDropWriteRace(); + } } } |