aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-10-28 13:35:08 +0300
committersnaury <snaury@ydb.tech>2022-10-28 13:35:08 +0300
commit7ac0d812a0a46f738031ac8e9e3dfec4681bbd73 (patch)
treec234c20b2c9949781159774048caa82c846ea5ca
parent5f4f0d7775ea9bc0abd2e10f21823900202bf880 (diff)
downloadydb-7ac0d812a0a46f738031ac8e9e3dfec4681bbd73.tar.gz
Test for drop/write race in columnshard
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp26
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h8
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_schema.cpp50
3 files changed, 83 insertions, 1 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index f409132c1c..35c4b08ff5 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -87,6 +87,28 @@ bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui6
return (resWrite.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS);
}
+std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId,
+ ui64 tableId, const TString& dedupId, const TString& data,
+ std::shared_ptr<arrow::Schema> schema)
+{
+ auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, dedupId, data);
+ if (schema) {
+ write->SetArrowSchema(NArrow::SerializeSchema(*schema));
+ }
+ ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, write.release());
+ TAutoPtr<IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle);
+ UNIT_ASSERT(event);
+
+ auto& resWrite = Proto(event);
+ UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), TTestTxConfig::TxTablet0);
+ UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), 0);
+ if (resWrite.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
+ return resWrite.GetWriteId();
+ }
+ return {};
+}
+
void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const TVector<ui64>& pathIds,
NOlap::TSnapshot snap, ui64 scanId) {
auto scan = std::make_unique<TEvColumnShard::TEvScan>();
@@ -141,6 +163,10 @@ void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard,
UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::PREPARED);
}
+void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const TVector<ui64>& writeIds) {
+ ProposeCommit(runtime, sender, 0, txId, writeIds);
+}
+
void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, const TSet<ui64>& txIds) {
auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(planStep, 0, TTestTxConfig::TxTablet0);
for (ui64 txId : txIds) {
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index b218c56b47..8a9839f9ad 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -291,7 +291,9 @@ struct TTestSchema {
static TString CommitTxBody(ui64 metaShard, const TVector<ui64>& writeIds) {
NKikimrTxColumnShard::TCommitTxBody proto;
- proto.SetTxInitiator(metaShard);
+ if (metaShard) {
+ proto.SetTxInitiator(metaShard);
+ }
for (ui64 id : writeIds) {
proto.AddWriteIds(id);
}
@@ -337,9 +339,13 @@ bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString
void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap);
bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId,
const TString& data, std::shared_ptr<arrow::Schema> schema = {});
+std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId,
+ ui64 tableId, const TString& dedupId, const TString& data,
+ std::shared_ptr<arrow::Schema> schema = {});
void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const TVector<ui64>& pathIds,
NOlap::TSnapshot snap, ui64 scanId = 0);
void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 txId, const TVector<ui64>& writeIds);
+void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const TVector<ui64>& writeIds);
void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, const TSet<ui64>& txIds);
inline void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, ui64 txId) {
diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
index b530b362af..20e5e296ca 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
@@ -695,6 +695,52 @@ void TestDrop(bool reboots) {
}
}
+void TestDropWriteRace() {
+ TTestBasicRuntime runtime;
+ TTester::Setup(runtime);
+
+ TActorId sender = runtime.AllocateEdgeActor();
+ CreateTestBootstrapper(runtime,
+ CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard),
+ &CreateColumnShard);
+
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
+ runtime.DispatchEvents(options);
+
+ //
+
+ ui64 tableId = 1;
+ ui64 planStep = 1000000000; // greater then delays
+ ui64 txId = 100;
+
+ NLongTxService::TLongTxId longTxId;
+ UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));
+
+ bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk),
+ {++planStep, ++txId});
+ UNIT_ASSERT(ok);
+ PlanSchemaTx(runtime, sender, {planStep, txId});
+
+ TString data = MakeTestBlob({0, 100}, testYdbSchema);
+ UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
+
+ // Write into InsertTable
+ auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, "0", data);
+ UNIT_ASSERT(writeIdOpt);
+ ProposeCommit(runtime, sender, ++txId, {*writeIdOpt});
+ auto commitTxId = txId;
+
+ // Drop table
+ ok = ProposeSchemaTx(runtime, sender, TTestSchema::DropTableTxBody(tableId, 2), {++planStep, ++txId});
+ if (ok) {
+ PlanSchemaTx(runtime, sender, {planStep, txId});
+ }
+
+ // Plan commit
+ PlanCommit(runtime, sender, ++planStep, commitTxId);
+}
+
}
namespace NColumnShard {
@@ -845,6 +891,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) {
Y_UNIT_TEST(RebootDrop) {
TestDrop(true);
}
+
+ Y_UNIT_TEST(DropWriteRace) {
+ TestDropWriteRace();
+ }
}
}