diff options
author | nsofya <nsofya@yandex-team.com> | 2023-08-02 15:50:39 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-08-02 15:50:39 +0300 |
commit | c1e8ecaf762b945d005bd42ed005e44c1604346d (patch) | |
tree | 70beb4a2b8d19712af0a106dd8d4dc4bd2a7f019 | |
parent | d199195371ee005f086b1a6d061c955c828b199d (diff) | |
download | ydb-c1e8ecaf762b945d005bd42ed005e44c1604346d.tar.gz |
KIKIMR-18263: Do not use legacy write in tests
-rw-r--r-- | ydb/core/tx/columnshard/columnshard.h | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 35 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_ut_common.cpp | 84 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_ut_common.h | 23 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp | 148 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp | 35 | ||||
-rw-r--r-- | ydb/core/tx/ev_write/write_data.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_olap.cpp | 112 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_olap/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_olap/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_olap/ya.make | 1 |
13 files changed, 199 insertions, 256 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h index 2a26e86a248..50899858f2a 100644 --- a/ydb/core/tx/columnshard/columnshard.h +++ b/ydb/core/tx/columnshard/columnshard.h @@ -233,17 +233,6 @@ struct TEvColumnShard { struct TEvWrite : public TEventPB<TEvWrite, NKikimrTxColumnShard::TEvWrite, TEvColumnShard::EvWrite> { TEvWrite() = default; - TEvWrite(const TActorId& source, ui64 metaShard, ui64 writeId, ui64 tableId, - const TString& dedupId, const TString& data, const ui32 writePartId) { - ActorIdToProto(source, Record.MutableSource()); - Record.SetTxInitiator(metaShard); - Record.SetWriteId(writeId); - Record.SetTableId(tableId); - Record.SetDedupId(dedupId); - Record.SetData(data); - Record.SetWritePartId(writePartId); - } - TEvWrite(const TActorId& source, const NLongTxService::TLongTxId& longTxId, ui64 tableId, const TString& dedupId, const TString& data, const ui32 writePartId) { ActorIdToProto(source, Record.MutableSource()); diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index f2a4a911454..9e5c688ca76 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -228,19 +228,15 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex const auto& record = Proto(ev->Get()); const ui64 tableId = record.GetTableId(); - const ui64 metaShard = record.GetTxInitiator(); const ui64 writeId = record.GetWriteId(); const TString dedupId = record.GetDedupId(); const auto source = ev->Get()->GetSource(); NEvWrite::TWriteMeta writeMeta(writeId, tableId, source); - writeMeta.SetMetaShard(metaShard); writeMeta.SetDedupId(dedupId); - if (record.HasLongTxId()) { - Y_VERIFY(metaShard == 0); - writeMeta.SetLongTxId(NLongTxService::TLongTxId::FromProto(record.GetLongTxId())); - writeMeta.SetWritePartId(record.GetWritePartId()); - } + Y_VERIFY(record.HasLongTxId()); + writeMeta.SetLongTxId(NLongTxService::TLongTxId::FromProto(record.GetLongTxId())); + writeMeta.SetWritePartId(record.GetWritePartId()); if (!TablesManager.IsReadyForWrite(tableId)) { LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index") @@ -269,20 +265,17 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED); OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx); } else { - if (writeMeta.HasLongTxId()) { - // TODO: multiple blobs in one longTx ({longTxId, dedupId} -> writeId) - if (ui64 writeId = (ui64) HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) { - LOG_S_DEBUG("Write (duplicate) into pathId " << writeMeta.GetTableId() - << " longTx " << writeMeta.GetLongTxIdUnsafe().ToString() - << " at tablet " << TabletID()); - - IncCounter(COUNTER_WRITE_DUPLICATE); - - auto result = std::make_unique<TEvColumnShard::TEvWriteResult>( - TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS); - ctx.Send(writeMeta.GetSource(), result.release()); - return; - } + if (ui64 writeId = (ui64) HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) { + LOG_S_DEBUG("Write (duplicate) into pathId " << writeMeta.GetTableId() + << " longTx " << writeMeta.GetLongTxIdUnsafe().ToString() + << " at tablet " << TabletID()); + + IncCounter(COUNTER_WRITE_DUPLICATE); + + auto result = std::make_unique<TEvColumnShard::TEvWriteResult>( + TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS); + ctx.Send(writeMeta.GetSource(), result.release()); + return; } WritesMonitor.RegisterWrite(writeData.GetSize()); diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index 02f8f5df731..04db9777887 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -91,40 +91,62 @@ void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot } } -bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId, - const TString& data, std::shared_ptr<arrow::Schema> schema, bool waitResult) { - const TString dedupId = ToString(writeId); - auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, metaShard, writeId, tableId, dedupId, data, 1); - if (schema) { - write->SetArrowSchema(NArrow::SerializeSchema(*schema)); +ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 shardId, std::vector<ui64>* writeIds) { + TAutoPtr<IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle); + UNIT_ASSERT(event); + + auto& resWrite = Proto(event); + UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), shardId); + UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), 0); + if (writeIds && resWrite.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS) { + writeIds->push_back(resWrite.GetWriteId()); } - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, write.release()); + return resWrite.GetStatus(); +} - if (waitResult) { - return WaitWriteResult(runtime, metaShard) == NKikimrTxColumnShard::EResultStatus::SUCCESS; +bool WriteDataImpl(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 tableId, + const NLongTxService::TLongTxId& longTxId, const ui64 writeId, + const TString& data, const std::shared_ptr<arrow::Schema>& schema, std::vector<ui64>* writeIds) { + const TString dedupId = ToString(writeId); + + auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, dedupId, data, writeId); + Y_VERIFY(schema); + write->SetArrowSchema(NArrow::SerializeSchema(*schema)); + ForwardToTablet(runtime, shardId, sender, write.release()); + + if (writeIds) { + return WaitWriteResult(runtime, shardId, writeIds) == NKikimrTxColumnShard::EResultStatus::SUCCESS; } return true; } -ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 metaShard) { - TAutoPtr<IEventHandle> handle; - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle); - UNIT_ASSERT(event); +bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 writeId, const ui64 tableId, const TString& data, + const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, std::vector<ui64>* writeIds) { + NLongTxService::TLongTxId longTxId; + UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1")); + return WriteDataImpl(runtime, sender, shardId, tableId, longTxId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), writeIds); - auto& resWrite = Proto(event); - UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), metaShard); - return resWrite.GetStatus(); +} + +bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 writeId, const ui64 tableId, const TString& data, + const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, bool waitResult, std::vector<ui64>* writeIds) { + NLongTxService::TLongTxId longTxId; + UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1")); + if (writeIds) { + return WriteDataImpl(runtime, sender, TTestTxConfig::TxTablet0, tableId, longTxId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), writeIds); + } + std::vector<ui64> ids; + return WriteDataImpl(runtime, sender, TTestTxConfig::TxTablet0, tableId, longTxId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), waitResult ? &ids : nullptr); } std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId, - ui64 tableId, const TString& dedupId, const TString& data, - std::shared_ptr<arrow::Schema> schema) + ui64 tableId, const ui64 writePartId, const TString& data, + const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema) { - auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, dedupId, data, 1); - if (schema) { - write->SetArrowSchema(NArrow::SerializeSchema(*schema)); - } + auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, "0", data, writePartId); + write->SetArrowSchema(NArrow::SerializeSchema(*NArrow::MakeArrowSchema(ydbSchema))); + ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, write.release()); TAutoPtr<IEventHandle> handle; auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle); @@ -177,11 +199,11 @@ void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vec ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, scan.release()); } -void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 txId, const std::vector<ui64>& writeIds) { +void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds) { NKikimrTxColumnShard::ETransactionKind txKind = NKikimrTxColumnShard::ETransactionKind::TX_KIND_COMMIT; - TString txBody = TTestSchema::CommitTxBody(metaShard, writeIds); + TString txBody = TTestSchema::CommitTxBody(0, writeIds); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, + ForwardToTablet(runtime, shardId, sender, new TEvColumnShard::TEvProposeTransaction(txKind, sender, txId, txBody)); TAutoPtr<IEventHandle> handle; auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(handle); @@ -194,18 +216,22 @@ void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, } void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds) { - ProposeCommit(runtime, sender, 0, txId, writeIds); + ProposeCommit(runtime, sender, TTestTxConfig::TxTablet0, txId, writeIds); } void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, const TSet<ui64>& txIds) { - auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(planStep, 0, TTestTxConfig::TxTablet0); + PlanCommit(runtime, sender, TTestTxConfig::TxTablet0, planStep, txIds); +} + +void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 planStep, const TSet<ui64>& txIds) { + auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(planStep, 0, shardId); for (ui64 txId : txIds) { auto tx = plan->Record.AddTransactions(); tx->SetTxId(txId); ActorIdToProto(sender, tx->MutableAckTo()); } - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, plan.release()); + ForwardToTablet(runtime, shardId, sender, plan.release()); TAutoPtr<IEventHandle> handle; for (ui32 i = 0; i < txIds.size(); ++i) { diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index ed09885ace4..07a79487aa9 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -412,17 +412,28 @@ struct TTestSchema { bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap); void ProvideTieringSnapshot(TTestBasicRuntime& runtime, TActorId& sender, NMetadata::NFetcher::ISnapshot::TPtr snapshot); void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap); + void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true); -bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId, - const TString& data, std::shared_ptr<arrow::Schema> schema = {}, bool waitResult = true); + +bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 writeId, const ui64 tableId, const TString& data, + const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, std::vector<ui64>* writeIds); + +bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 writeId, const ui64 tableId, const TString& data, + const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, bool waitResult = true, std::vector<ui64>* writeIds = nullptr); + std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId, - ui64 tableId, const TString& dedupId, const TString& data, - std::shared_ptr<arrow::Schema> schema = {}); -ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 metaShard); + ui64 tableId, const ui64 writePartId, const TString& data, + const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema); + +ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 shardId, std::vector<ui64>* writeIds = nullptr); + void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vector<ui64>& pathIds, NOlap::TSnapshot snap, ui64 scanId = 0); -void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 txId, const std::vector<ui64>& writeIds); + +void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds); void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds); + +void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 planStep, const TSet<ui64>& txIds); void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, const TSet<ui64>& txIds); inline void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, ui64 txId) { diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 14a5cceeca9..2dd54ea1f48 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -377,8 +377,6 @@ void TestWrite(const TestTableDescription& table) { runtime.DispatchEvents(options); // - - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; @@ -386,36 +384,36 @@ void TestWrite(const TestTableDescription& table) { const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema = table.Schema; - bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, ydbSchema)); + bool ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, ydbSchema), ydbSchema); UNIT_ASSERT(ok); std::vector<std::pair<TString, TTypeInfo>> schema = ydbSchema; // no data - ok = WriteData(runtime, sender, metaShard, writeId, tableId, TString()); + ok = WriteData(runtime, sender, writeId++, tableId, TString(), ydbSchema); UNIT_ASSERT(!ok); // null column in PK TTestBlobOptions optsNulls; optsNulls.NullColumns.emplace("timestamp"); - ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, ydbSchema, optsNulls)); + ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, ydbSchema, optsNulls), ydbSchema); UNIT_ASSERT(!ok); // missing columns schema.resize(4); - ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema)); - UNIT_ASSERT(!ok); + ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema); + UNIT_ASSERT(ok); // wrong first key column type (with supported layout: Int64 vs Timestamp) // It fails only if we specify source schema. No way to detect it from serialized batch data. schema = ydbSchema; schema[0].second = TTypeInfo(NTypeIds::Int64); - ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema), - NArrow::MakeArrowSchema(schema)); + ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), + schema); UNIT_ASSERT(!ok); // wrong type (no additional schema - fails in case of wrong layout) @@ -423,7 +421,7 @@ void TestWrite(const TestTableDescription& table) { for (size_t i = 0; i < ydbSchema.size(); ++i) { schema = ydbSchema; schema[i].second = TTypeInfo(NTypeIds::Int8); - ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema)); + ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema); UNIT_ASSERT(!ok); } @@ -432,15 +430,14 @@ void TestWrite(const TestTableDescription& table) { for (size_t i = 0; i < ydbSchema.size(); ++i) { schema = ydbSchema; schema[i].second = TTypeInfo(NTypeIds::Int64); - ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema), - NArrow::MakeArrowSchema(schema)); + ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema); UNIT_ASSERT(ok == (ydbSchema[i].second == TTypeInfo(NTypeIds::Int64))); } schema = ydbSchema; schema[1].second = TTypeInfo(NTypeIds::Utf8); schema[5].second = TTypeInfo(NTypeIds::Int32); - ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema)); + ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema); UNIT_ASSERT(!ok); // reordered columns @@ -452,11 +449,7 @@ void TestWrite(const TestTableDescription& table) { schema.push_back({name, typeInfo}); } - ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema)); - UNIT_ASSERT(!ok); - - ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema), - NArrow::MakeArrowSchema(schema)); + ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema); UNIT_ASSERT(ok); // too much data @@ -464,7 +457,7 @@ void TestWrite(const TestTableDescription& table) { TString bigData = MakeTestBlob({0, 150 * 1000}, ydbSchema); UNIT_ASSERT(bigData.size() > NColumnShard::TLimits::GetMaxBlobSize()); UNIT_ASSERT(bigData.size() < NColumnShard::TLimits::GetMaxBlobSize() + 2 * 1024 * 1024); - ok = WriteData(runtime, sender, metaShard, writeId, tableId, bigData); + ok = WriteData(runtime, sender, writeId++, tableId, bigData, ydbSchema); UNIT_ASSERT(!ok); } @@ -480,8 +473,6 @@ void TestWriteOverload(const TestTableDescription& table) { runtime.DispatchEvents(options); // - - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; @@ -521,17 +512,17 @@ void TestWriteOverload(const TestTableDescription& table) { const ui32 toSend = toCatch + 1; for (ui32 i = 0; i < toSend; ++i) { - UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, testBlob, {}, false)); + UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testBlob, table.Schema, false)); } - UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, metaShard), (ui32)NKikimrTxColumnShard::EResultStatus::OVERLOADED); + UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, TTestTxConfig::TxTablet0), (ui32)NKikimrTxColumnShard::EResultStatus::OVERLOADED); while (capturedWrites.size()) { resendOneCaptured(); - UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, metaShard), (ui32)NKikimrTxColumnShard::EResultStatus::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, TTestTxConfig::TxTablet0), (ui32)NKikimrTxColumnShard::EResultStatus::SUCCESS); } - UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, testBlob)); // OK after overload + UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testBlob, table.Schema)); // OK after overload } // TODO: Improve test. It does not catch KIKIMR-14890 @@ -565,8 +556,9 @@ void TestWriteReadDup(const TestTableDescription& table = {}) { for (ui64 planStep = initPlanStep; planStep < initPlanStep + 50; ++planStep) { TSet<ui64> txIds; for (ui32 i = 0; i <= 5; ++i) { - UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, testData)); - ProposeCommit(runtime, sender, metaShard, ++txId, {writeId}); + std::vector<ui64> writeIds; + UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds)); + ProposeCommit(runtime, sender, ++txId, writeIds); txIds.insert(txId); } PlanCommit(runtime, sender, planStep, txIds); @@ -617,7 +609,7 @@ void TestWriteReadLongTxDup() { auto data = MakeTestBlob({portion.first + i, portion.second + i}, ydbSchema); UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT); - auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, "0", data); + auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, 1, data, ydbSchema); UNIT_ASSERT(writeIdOpt); if (!i) { writeId = *writeIdOpt; @@ -666,18 +658,18 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); runtime.DispatchEvents(options); - auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId, - const TString& data) { - bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, data); + auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 writeId, ui64 tableId, + const TString& data, const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, std::vector<ui64>& intWriteIds) { + bool ok = WriteData(runtime, sender, writeId, tableId, data, ydbSchema, true, &intWriteIds); if (reboots) { RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); } return ok; }; - auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 txId, + auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds) { - ProposeCommit(runtime, sender, metaShard, txId, writeIds); + ProposeCommit(runtime, sender, txId, writeIds); if (reboots) { RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); } @@ -714,7 +706,8 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString // write 1: ins:1, cmt:0, idx:0 - UNIT_ASSERT(write(runtime, sender, metaShard, writeId, tableId, MakeTestBlob(portion[0], ydbSchema))); + std::vector<ui64> intWriteIds; + UNIT_ASSERT(write(runtime, sender, writeId, tableId, MakeTestBlob(portion[0], ydbSchema), ydbSchema, intWriteIds)); // read @@ -736,7 +729,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString ui64 planStep = 21; ui64 txId = 100; - proposeCommit(runtime, sender, metaShard, txId, {writeId}); + proposeCommit(runtime, sender, txId, intWriteIds); planCommit(runtime, sender, planStep, txId); // read 2 (committed, old snapshot) @@ -822,23 +815,25 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString // write 2 (big portion of data): ins:1, cmt:1, idx:0 ++writeId; + intWriteIds.clear(); { TString triggerData = MakeTestBlob(portion[1], ydbSchema); UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); - UNIT_ASSERT(write(runtime, sender, metaShard, writeId, tableId, triggerData)); + UNIT_ASSERT(write(runtime, sender, writeId, tableId, triggerData, ydbSchema, intWriteIds)); } // commit 2 (init indexation): ins:0, cmt:0, idx:1 planStep = 22; ++txId; - proposeCommit(runtime, sender, metaShard, txId, {writeId}); + proposeCommit(runtime, sender, txId, intWriteIds); planCommit(runtime, sender, planStep, txId); // write 3: ins:1, cmt:0, idx:1 ++writeId; - UNIT_ASSERT(write(runtime, sender, metaShard, writeId, tableId, MakeTestBlob(portion[2], ydbSchema))); + intWriteIds.clear(); + UNIT_ASSERT(write(runtime, sender, writeId, tableId, MakeTestBlob(portion[2], ydbSchema), ydbSchema, intWriteIds)); // read 6, planstep 0 @@ -909,13 +904,14 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString planStep = 23; ++txId; - proposeCommit(runtime, sender, metaShard, txId, {writeId}); + proposeCommit(runtime, sender, txId, intWriteIds); planCommit(runtime, sender, planStep, txId); // write 4: ins:1, cmt:1, idx:1 ++writeId; - UNIT_ASSERT(write(runtime, sender, metaShard, writeId, tableId, MakeTestBlob(portion[3], ydbSchema))); + intWriteIds.clear(); + UNIT_ASSERT(write(runtime, sender, writeId, tableId, MakeTestBlob(portion[3], ydbSchema), ydbSchema, intWriteIds)); // read 9 (committed, indexed) @@ -935,7 +931,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString planStep = 24; ++txId; - proposeCommit(runtime, sender, metaShard, txId, {writeId}); + proposeCommit(runtime, sender, txId, intWriteIds); planCommit(runtime, sender, planStep, txId); // read 10 @@ -1092,18 +1088,18 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); runtime.DispatchEvents(options); - auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId, - const TString& data) { - bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, data); + auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 writeId, ui64 tableId, + const TString& data, const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, std::vector<ui64>& writeIds) { + bool ok = WriteData(runtime, sender, writeId, tableId, data, ydbSchema, true, &writeIds); if (reboots) { RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); } return ok; }; - auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 txId, + auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds) { - ProposeCommit(runtime, sender, metaShard, txId, writeIds); + ProposeCommit(runtime, sender, txId, writeIds); if (reboots) { RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); } @@ -1150,15 +1146,14 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table std::pair<ui64, ui64> portion = {pos, pos + portionSize}; TString data = MakeTestBlob(portion, ydbSchema); - ids.push_back(writeId); - UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, data)); + UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, data, ydbSchema, true, &ids)); } if (reboots) { RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); } - proposeCommit(runtime, sender, metaShard, txId, ids); + proposeCommit(runtime, sender, txId, ids); planCommit(runtime, sender, planStep, txId); } std::pair<ui64, ui64> smallWrites = {triggerPortionSize, pos}; @@ -1168,9 +1163,10 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table ui32 numTxs = engineLimits.GranuleSizeForOverloadPrevent / triggerData.size() + 1; for (ui32 i = 0; i < numTxs; ++i, ++writeId, ++planStep, ++txId) { - UNIT_ASSERT(write(runtime, sender, metaShard, writeId, tableId, triggerData)); + std::vector<ui64> writeIds; + UNIT_ASSERT(write(runtime, sender, writeId, tableId, triggerData, ydbSchema, writeIds)); - proposeCommit(runtime, sender, metaShard, txId, {writeId}); + proposeCommit(runtime, sender, txId, writeIds); planCommit(runtime, sender, planStep, txId); } @@ -1406,10 +1402,11 @@ void TestReadWithProgram(const TestTableDescription& table = {}) SetupSchema(runtime, sender, tableId, table); { // write some data - bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, table.Schema)); + std::vector<ui64> writeIds; + bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({0, 100}, table.Schema), table.Schema, true, &writeIds); UNIT_ASSERT(ok); - ProposeCommit(runtime, sender, metaShard, txId, {writeId}); + ProposeCommit(runtime, sender, txId, writeIds); PlanCommit(runtime, sender, planStep, txId); } @@ -1533,10 +1530,11 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) { SetupSchema(runtime, sender, tableId, table); { // write some data - bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, table.Schema)); + std::vector<ui64> writeIds; + bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({0, 100}, table.Schema), table.Schema, true, &writeIds); UNIT_ASSERT(ok); - ProposeCommit(runtime, sender, metaShard, txId, {writeId}); + ProposeCommit(runtime, sender, txId, writeIds); PlanCommit(runtime, sender, planStep, txId); } @@ -1627,10 +1625,11 @@ void TestSomePrograms(const TestTableDescription& table) { SetupSchema(runtime, sender, tableId, table); { // write some data - bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, table.Schema)); + std::vector<ui64> writeIds; + bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({0, 100}, table.Schema), table.Schema, true, &writeIds); UNIT_ASSERT(ok); - ProposeCommit(runtime, sender, metaShard, txId, {writeId}); + ProposeCommit(runtime, sender, txId, writeIds); PlanCommit(runtime, sender, planStep, txId); } @@ -1712,10 +1711,11 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche SetupSchema(runtime, sender, tableId, table); { // write some data - bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, testDataBlob); + std::vector<ui64> writeIds; + bool ok = WriteData(runtime, sender, writeId, tableId, testDataBlob, table.Schema, true, &writeIds); UNIT_ASSERT(ok); - ProposeCommit(runtime, sender, metaShard, txId, {writeId}); + ProposeCommit(runtime, sender, txId, writeIds); PlanCommit(runtime, sender, planStep, txId); } @@ -2392,9 +2392,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::GetMaxBlobSize()); - UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData)); + std::vector<ui64> writeIds; + UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, triggerData, table.Schema, true, &writeIds)); - ProposeCommit(runtime, sender, metaShard, txId, { writeId }); + ProposeCommit(runtime, sender, txId, writeIds); PlanCommit(runtime, sender, planStep, txId); } } @@ -2674,9 +2675,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { std::pair<ui64, ui64> triggerPortion = {1, 1000}; TString triggerData = MakeTestBlob(triggerPortion, ydbSchema); - UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData)); + std::vector<ui64> writeIds; + UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, triggerData, ydbSchema, true, &writeIds)); - ProposeCommit(runtime, sender, metaShard, txId, {writeId}); + ProposeCommit(runtime, sender, txId, writeIds); PlanCommit(runtime, sender, planStep, txId); } @@ -2884,9 +2886,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { // Overwrite the same data multiple times to produce multiple portions at different timestamps ui32 numWrites = 14; for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) { - UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData)); + std::vector<ui64> writeIds; + UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, triggerData, ydbSchema, true, &writeIds)); - ProposeCommit(runtime, sender, metaShard, txId, {writeId}); + ProposeCommit(runtime, sender, txId, writeIds); PlanCommit(runtime, sender, planStep, txId); } @@ -2894,9 +2897,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { { TString smallData = MakeTestBlob({0, 2}, ydbSchema); UNIT_ASSERT(smallData.size() < 100 * 1024); - UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, smallData)); + std::vector<ui64> writeIds; + UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, smallData, ydbSchema, true, &writeIds)); - ProposeCommit(runtime, sender, metaShard, txId, {writeId}); + ProposeCommit(runtime, sender, txId, writeIds); PlanCommit(runtime, sender, planStep, txId); ++writeId; ++planStep; @@ -2949,9 +2953,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { planStep += delay.MilliSeconds(); numWrites = 10; for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) { - UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData)); + std::vector<ui64> writeIds; + UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, triggerData, ydbSchema, true, &writeIds)); - ProposeCommit(runtime, sender, metaShard, txId, {writeId}); + ProposeCommit(runtime, sender, txId, writeIds); PlanCommit(runtime, sender, planStep, txId); } @@ -2980,9 +2985,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { planStep += 2 * delay.MilliSeconds(); numWrites = 2; for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) { - UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData)); + std::vector<ui64> writeIds; + UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, triggerData, ydbSchema, true, &writeIds)); - ProposeCommit(runtime, sender, metaShard, txId, {writeId}); + ProposeCommit(runtime, sender, txId, writeIds); PlanCommit(runtime, sender, planStep, txId); } diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index c70ec3a5685..93309fde2af 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -257,8 +257,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, auto blobs = MakeData(ts, PORTION_ROWS, PORTION_ROWS / 2, spec.TtlColumn, ydbSchema); UNIT_ASSERT_EQUAL(blobs.size(), 2); for (auto& data : blobs) { - UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data)); - ProposeCommit(runtime, sender, metaShard, ++txId, {writeId}); + std::vector<ui64> writeIds; + UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, data, ydbSchema, true, &writeIds)); + ProposeCommit(runtime, sender, ++txId, writeIds); PlanCommit(runtime, sender, ++planStep, txId); } @@ -345,8 +346,10 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, } PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId)); - UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, blobs[0])); - ProposeCommit(runtime, sender, metaShard, ++txId, {writeId}); + + std::vector<ui64> writeIds; + UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, blobs[0], ydbSchema, true, &writeIds)); + ProposeCommit(runtime, sender, ++txId, writeIds); PlanCommit(runtime, sender, ++planStep, txId); if (internal) { @@ -613,8 +616,9 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt } for (auto& data : blobs) { - UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data)); - ProposeCommit(runtime, sender, metaShard, ++txId, {writeId}); + std::vector<ui64> writeIds; + UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, data, testYdbSchema, true, &writeIds)); + ProposeCommit(runtime, sender, ++txId, writeIds); PlanCommit(runtime, sender, ++planStep, txId); } @@ -1080,13 +1084,15 @@ void TestDrop(bool reboots) { UNIT_ASSERT(data2.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT); // Write into index - UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data1)); - ProposeCommit(runtime, sender, metaShard, ++txId, {writeId}); + std::vector<ui64> writeIds; + UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, data1, testYdbSchema, true, &writeIds)); + ProposeCommit(runtime, sender, ++txId, writeIds); PlanCommit(runtime, sender, ++planStep, txId); // Write into InsertTable - UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data2)); - ProposeCommit(runtime, sender, metaShard, ++txId, {writeId}); + writeIds.clear(); + UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, data2, testYdbSchema, true, &writeIds)); + ProposeCommit(runtime, sender, ++txId, writeIds); PlanCommit(runtime, sender, ++planStep, txId); if (reboots) { @@ -1150,7 +1156,7 @@ void TestDropWriteRace() { UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT); // Write into InsertTable - auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, "0", data); + auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, 1, data, testYdbSchema); UNIT_ASSERT(writeIdOpt); ProposeCommit(runtime, sender, ++txId, {*writeIdOpt}); auto commitTxId = txId; @@ -1179,8 +1185,6 @@ void TestCompaction(std::optional<ui32> numWrites = {}) { runtime.DispatchEvents(options); // Create table - - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; ui64 planStep = 100; @@ -1227,9 +1231,10 @@ void TestCompaction(std::optional<ui32> numWrites = {}) { ++planStep; ++txId; for (ui32 i = 0; i < *numWrites; ++i, ++writeId, ++planStep, ++txId) { - UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData)); + std::vector<ui64> writeIds; + UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, triggerData, testYdbSchema, true, &writeIds)); - ProposeCommit(runtime, sender, metaShard, txId, {writeId}); + ProposeCommit(runtime, sender, txId, writeIds); PlanCommit(runtime, sender, planStep, txId); if (i % 2 == 0) { diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h index 2b9965ceb44..d15a59af45d 100644 --- a/ydb/core/tx/ev_write/write_data.h +++ b/ydb/core/tx/ev_write/write_data.h @@ -23,7 +23,7 @@ class TWriteMeta { // Long Tx logic YDB_OPT(NLongTxService::TLongTxId, LongTxId) YDB_ACCESSOR(ui64, WritePartId, 0); - YDB_ACCESSOR(ui64, MetaShard, 0); + YDB_READONLY(ui64, MetaShard, 0); YDB_ACCESSOR_DEF(TString, DedupId); public: diff --git a/ydb/core/tx/schemeshard/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap.cpp index 0a5ca55d2b4..45501d22e6d 100644 --- a/ydb/core/tx/schemeshard/ut_olap.cpp +++ b/ydb/core/tx/schemeshard/ut_olap.cpp @@ -1,5 +1,6 @@ #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> #include <ydb/core/tx/columnshard/columnshard.h> +#include <ydb/core/tx/columnshard/columnshard_ut_common.h> #include <ydb/core/formats/arrow/arrow_helpers.h> #include <ydb/core/formats/arrow/arrow_batch_builder.h> @@ -42,100 +43,6 @@ static const TVector<std::pair<TString, TTypeInfo>> defaultYdbSchema = { {"data", TTypeInfo(NTypeIds::Utf8) } }; -TString MakeTestBlob(std::pair<ui64, ui64> range) { - TString err; - NArrow::TArrowBatchBuilder batchBuilder(arrow::Compression::LZ4_FRAME); - batchBuilder.Start(defaultYdbSchema, 0, 0, err); - - TString str; - TVector<TTypeInfo> types = { - TTypeInfo(NTypeIds::Timestamp), - TTypeInfo(NTypeIds::Utf8) - }; - - for (size_t i = range.first; i < range.second; ++i) { - str = ToString(i); - - TVector<TCell> cells; - cells.push_back(TCell::Make<ui64>(i)); - cells.push_back(TCell(str.data(), str.size())); - - NKikimr::TDbTupleRef unused; - batchBuilder.AddRow(unused, NKikimr::TDbTupleRef(types.data(), cells.data(), 2)); - } - - auto batch = batchBuilder.FlushBatch(true); - UNIT_ASSERT(batch); - auto status = batch->ValidateFull(); - UNIT_ASSERT(status.ok()); - - return batchBuilder.Finish(); -} - -static constexpr ui64 txInitiator = 42; // 0 means LongTx, we need another value here - -void WriteData(TTestBasicRuntime& runtime, TActorId sender, ui64 tabletId, ui64 pathId, ui64 writeId, TString data) { - const TString dedupId = ToString(writeId); - - auto evWrite = std::make_unique<TEvColumnShard::TEvWrite>(sender, txInitiator, writeId, pathId, dedupId, data, 1); - - ForwardToTablet(runtime, tabletId, sender, evWrite.release()); - TAutoPtr<IEventHandle> handle; - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle); - UNIT_ASSERT(event); - - auto& resWrite = Proto(event); - UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), tabletId); - UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), txInitiator); - UNIT_ASSERT_EQUAL(resWrite.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); -} - -void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 tabletId, ui64 txId, const TVector<ui64>& writeIds) { - NKikimrTxColumnShard::ETransactionKind txKind = NKikimrTxColumnShard::ETransactionKind::TX_KIND_COMMIT; - TString txBody; - { - NKikimrTxColumnShard::TCommitTxBody proto; - proto.SetTxInitiator(txInitiator); - for (ui64 id : writeIds) { - proto.AddWriteIds(id); - } - - Y_PROTOBUF_SUPPRESS_NODISCARD proto.SerializeToString(&txBody); - } - - ForwardToTablet(runtime, tabletId, sender, - new TEvColumnShard::TEvProposeTransaction(txKind, sender, txId, txBody)); - TAutoPtr<IEventHandle> handle; - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(handle); - UNIT_ASSERT(event); - - auto& res = Proto(event); - UNIT_ASSERT_EQUAL(res.GetTxKind(), txKind); - UNIT_ASSERT_EQUAL(res.GetTxId(), txId); - UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::PREPARED); -} - -void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 tabletId, ui64 planStep, const TSet<ui64>& txIds) { - auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(planStep, txInitiator, tabletId); - for (ui64 txId : txIds) { - auto tx = plan->Record.AddTransactions(); - tx->SetTxId(txId); - ActorIdToProto(sender, tx->MutableAckTo()); - } - - ForwardToTablet(runtime, tabletId, sender, plan.release()); - TAutoPtr<IEventHandle> handle; - - for (ui32 i = 0; i < txIds.size(); ++i) { - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(handle); - UNIT_ASSERT(event); - - auto& res = Proto(event); - UNIT_ASSERT(txIds.count(res.GetTxId())); - UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - } -} - }} @@ -788,27 +695,28 @@ Y_UNIT_TEST_SUITE(TOlap) { { // Write data directly into shard TActorId sender = runtime.AllocateEdgeActor(); - TString data = MakeTestBlob({0, rowsInBatch}); + TString data = NTxUT::MakeTestBlob({0, rowsInBatch}, defaultYdbSchema); ui64 writeId = 0; TSet<ui64> txIds; for (ui32 i = 0; i < 10; ++i) { - WriteData(runtime, sender, shardId, pathId, ++writeId, data); - ProposeCommit(runtime, sender, shardId, ++txId, {writeId}); + std::vector<ui64> writeIds; + NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds); + NTxUT::ProposeCommit(runtime, sender, shardId, ++txId, writeIds); txIds.insert(txId); } - PlanCommit(runtime, sender, shardId, ++planStep, txIds); + NTxUT::PlanCommit(runtime, sender, shardId, ++planStep, txIds); // emulate timeout runtime.UpdateCurrentTime(TInstant::Now()); // trigger periodic stats at shard (after timeout) - WriteData(runtime, sender, shardId, pathId, ++writeId, data); - ProposeCommit(runtime, sender, shardId, ++txId, {writeId}); - txIds = {txId}; - PlanCommit(runtime, sender, shardId, ++planStep, txIds); + std::vector<ui64> writeIds; + NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds); + NTxUT::ProposeCommit(runtime, sender, shardId, ++txId, writeIds); + NTxUT::PlanCommit(runtime, sender, shardId, ++planStep, {txId}); } auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/OlapStore", true, true); diff --git a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.darwin-x86_64.txt index a899ec6c6c2..e0a3bd24e23 100644 --- a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.darwin-x86_64.txt @@ -39,6 +39,7 @@ target_link_options(ydb-core-tx-schemeshard-ut_olap PRIVATE ) target_sources(ydb-core-tx-schemeshard-ut_olap PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_olap.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ) set_property( TARGET diff --git a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-aarch64.txt index 792c5ec3d20..e00b392f1ad 100644 --- a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-aarch64.txt @@ -42,6 +42,7 @@ target_link_options(ydb-core-tx-schemeshard-ut_olap PRIVATE ) target_sources(ydb-core-tx-schemeshard-ut_olap PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_olap.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ) set_property( TARGET diff --git a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-x86_64.txt b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-x86_64.txt index 4ae2f06628e..0d6cf1b0375 100644 --- a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-x86_64.txt @@ -43,6 +43,7 @@ target_link_options(ydb-core-tx-schemeshard-ut_olap PRIVATE ) target_sources(ydb-core-tx-schemeshard-ut_olap PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_olap.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ) set_property( TARGET diff --git a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.windows-x86_64.txt b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.windows-x86_64.txt index 3bb760793ab..21e10eec1db 100644 --- a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.windows-x86_64.txt @@ -32,6 +32,7 @@ target_link_libraries(ydb-core-tx-schemeshard-ut_olap PUBLIC ) target_sources(ydb-core-tx-schemeshard-ut_olap PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_olap.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ) set_property( TARGET diff --git a/ydb/core/tx/schemeshard/ut_olap/ya.make b/ydb/core/tx/schemeshard/ut_olap/ya.make index ab7fb360ffe..d78aa0d531e 100644 --- a/ydb/core/tx/schemeshard/ut_olap/ya.make +++ b/ydb/core/tx/schemeshard/ut_olap/ya.make @@ -29,6 +29,7 @@ YQL_LAST_ABI_VERSION() SRCS( ut_olap.cpp + ydb/core/tx/columnshard/columnshard_ut_common.cpp ) END() |