aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-08-02 15:50:39 +0300
committernsofya <nsofya@yandex-team.com>2023-08-02 15:50:39 +0300
commitc1e8ecaf762b945d005bd42ed005e44c1604346d (patch)
tree70beb4a2b8d19712af0a106dd8d4dc4bd2a7f019
parentd199195371ee005f086b1a6d061c955c828b199d (diff)
downloadydb-c1e8ecaf762b945d005bd42ed005e44c1604346d.tar.gz
KIKIMR-18263: Do not use legacy write in tests
-rw-r--r--ydb/core/tx/columnshard/columnshard.h11
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp35
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp84
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h23
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp148
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp35
-rw-r--r--ydb/core/tx/ev_write/write_data.h2
-rw-r--r--ydb/core/tx/schemeshard/ut_olap.cpp112
-rw-r--r--ydb/core/tx/schemeshard/ut_olap/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/schemeshard/ut_olap/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/schemeshard/ut_olap/ya.make1
13 files changed, 199 insertions, 256 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h
index 2a26e86a248..50899858f2a 100644
--- a/ydb/core/tx/columnshard/columnshard.h
+++ b/ydb/core/tx/columnshard/columnshard.h
@@ -233,17 +233,6 @@ struct TEvColumnShard {
struct TEvWrite : public TEventPB<TEvWrite, NKikimrTxColumnShard::TEvWrite, TEvColumnShard::EvWrite> {
TEvWrite() = default;
- TEvWrite(const TActorId& source, ui64 metaShard, ui64 writeId, ui64 tableId,
- const TString& dedupId, const TString& data, const ui32 writePartId) {
- ActorIdToProto(source, Record.MutableSource());
- Record.SetTxInitiator(metaShard);
- Record.SetWriteId(writeId);
- Record.SetTableId(tableId);
- Record.SetDedupId(dedupId);
- Record.SetData(data);
- Record.SetWritePartId(writePartId);
- }
-
TEvWrite(const TActorId& source, const NLongTxService::TLongTxId& longTxId, ui64 tableId,
const TString& dedupId, const TString& data, const ui32 writePartId) {
ActorIdToProto(source, Record.MutableSource());
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index f2a4a911454..9e5c688ca76 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -228,19 +228,15 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
const auto& record = Proto(ev->Get());
const ui64 tableId = record.GetTableId();
- const ui64 metaShard = record.GetTxInitiator();
const ui64 writeId = record.GetWriteId();
const TString dedupId = record.GetDedupId();
const auto source = ev->Get()->GetSource();
NEvWrite::TWriteMeta writeMeta(writeId, tableId, source);
- writeMeta.SetMetaShard(metaShard);
writeMeta.SetDedupId(dedupId);
- if (record.HasLongTxId()) {
- Y_VERIFY(metaShard == 0);
- writeMeta.SetLongTxId(NLongTxService::TLongTxId::FromProto(record.GetLongTxId()));
- writeMeta.SetWritePartId(record.GetWritePartId());
- }
+ Y_VERIFY(record.HasLongTxId());
+ writeMeta.SetLongTxId(NLongTxService::TLongTxId::FromProto(record.GetLongTxId()));
+ writeMeta.SetWritePartId(record.GetWritePartId());
if (!TablesManager.IsReadyForWrite(tableId)) {
LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index")
@@ -269,20 +265,17 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED);
OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx);
} else {
- if (writeMeta.HasLongTxId()) {
- // TODO: multiple blobs in one longTx ({longTxId, dedupId} -> writeId)
- if (ui64 writeId = (ui64) HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) {
- LOG_S_DEBUG("Write (duplicate) into pathId " << writeMeta.GetTableId()
- << " longTx " << writeMeta.GetLongTxIdUnsafe().ToString()
- << " at tablet " << TabletID());
-
- IncCounter(COUNTER_WRITE_DUPLICATE);
-
- auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(
- TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS);
- ctx.Send(writeMeta.GetSource(), result.release());
- return;
- }
+ if (ui64 writeId = (ui64) HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) {
+ LOG_S_DEBUG("Write (duplicate) into pathId " << writeMeta.GetTableId()
+ << " longTx " << writeMeta.GetLongTxIdUnsafe().ToString()
+ << " at tablet " << TabletID());
+
+ IncCounter(COUNTER_WRITE_DUPLICATE);
+
+ auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(
+ TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ ctx.Send(writeMeta.GetSource(), result.release());
+ return;
}
WritesMonitor.RegisterWrite(writeData.GetSize());
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index 02f8f5df731..04db9777887 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -91,40 +91,62 @@ void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot
}
}
-bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId,
- const TString& data, std::shared_ptr<arrow::Schema> schema, bool waitResult) {
- const TString dedupId = ToString(writeId);
- auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, metaShard, writeId, tableId, dedupId, data, 1);
- if (schema) {
- write->SetArrowSchema(NArrow::SerializeSchema(*schema));
+ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 shardId, std::vector<ui64>* writeIds) {
+ TAutoPtr<IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle);
+ UNIT_ASSERT(event);
+
+ auto& resWrite = Proto(event);
+ UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), shardId);
+ UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), 0);
+ if (writeIds && resWrite.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
+ writeIds->push_back(resWrite.GetWriteId());
}
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, write.release());
+ return resWrite.GetStatus();
+}
- if (waitResult) {
- return WaitWriteResult(runtime, metaShard) == NKikimrTxColumnShard::EResultStatus::SUCCESS;
+bool WriteDataImpl(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 tableId,
+ const NLongTxService::TLongTxId& longTxId, const ui64 writeId,
+ const TString& data, const std::shared_ptr<arrow::Schema>& schema, std::vector<ui64>* writeIds) {
+ const TString dedupId = ToString(writeId);
+
+ auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, dedupId, data, writeId);
+ Y_VERIFY(schema);
+ write->SetArrowSchema(NArrow::SerializeSchema(*schema));
+ ForwardToTablet(runtime, shardId, sender, write.release());
+
+ if (writeIds) {
+ return WaitWriteResult(runtime, shardId, writeIds) == NKikimrTxColumnShard::EResultStatus::SUCCESS;
}
return true;
}
-ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 metaShard) {
- TAutoPtr<IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle);
- UNIT_ASSERT(event);
+bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 writeId, const ui64 tableId, const TString& data,
+ const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, std::vector<ui64>* writeIds) {
+ NLongTxService::TLongTxId longTxId;
+ UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));
+ return WriteDataImpl(runtime, sender, shardId, tableId, longTxId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), writeIds);
- auto& resWrite = Proto(event);
- UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), metaShard);
- return resWrite.GetStatus();
+}
+
+bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 writeId, const ui64 tableId, const TString& data,
+ const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, bool waitResult, std::vector<ui64>* writeIds) {
+ NLongTxService::TLongTxId longTxId;
+ UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));
+ if (writeIds) {
+ return WriteDataImpl(runtime, sender, TTestTxConfig::TxTablet0, tableId, longTxId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), writeIds);
+ }
+ std::vector<ui64> ids;
+ return WriteDataImpl(runtime, sender, TTestTxConfig::TxTablet0, tableId, longTxId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), waitResult ? &ids : nullptr);
}
std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId,
- ui64 tableId, const TString& dedupId, const TString& data,
- std::shared_ptr<arrow::Schema> schema)
+ ui64 tableId, const ui64 writePartId, const TString& data,
+ const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema)
{
- auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, dedupId, data, 1);
- if (schema) {
- write->SetArrowSchema(NArrow::SerializeSchema(*schema));
- }
+ auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, "0", data, writePartId);
+ write->SetArrowSchema(NArrow::SerializeSchema(*NArrow::MakeArrowSchema(ydbSchema)));
+
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, write.release());
TAutoPtr<IEventHandle> handle;
auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle);
@@ -177,11 +199,11 @@ void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vec
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, scan.release());
}
-void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 txId, const std::vector<ui64>& writeIds) {
+void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds) {
NKikimrTxColumnShard::ETransactionKind txKind = NKikimrTxColumnShard::ETransactionKind::TX_KIND_COMMIT;
- TString txBody = TTestSchema::CommitTxBody(metaShard, writeIds);
+ TString txBody = TTestSchema::CommitTxBody(0, writeIds);
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
+ ForwardToTablet(runtime, shardId, sender,
new TEvColumnShard::TEvProposeTransaction(txKind, sender, txId, txBody));
TAutoPtr<IEventHandle> handle;
auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(handle);
@@ -194,18 +216,22 @@ void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard,
}
void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds) {
- ProposeCommit(runtime, sender, 0, txId, writeIds);
+ ProposeCommit(runtime, sender, TTestTxConfig::TxTablet0, txId, writeIds);
}
void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, const TSet<ui64>& txIds) {
- auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(planStep, 0, TTestTxConfig::TxTablet0);
+ PlanCommit(runtime, sender, TTestTxConfig::TxTablet0, planStep, txIds);
+}
+
+void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 planStep, const TSet<ui64>& txIds) {
+ auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(planStep, 0, shardId);
for (ui64 txId : txIds) {
auto tx = plan->Record.AddTransactions();
tx->SetTxId(txId);
ActorIdToProto(sender, tx->MutableAckTo());
}
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, plan.release());
+ ForwardToTablet(runtime, shardId, sender, plan.release());
TAutoPtr<IEventHandle> handle;
for (ui32 i = 0; i < txIds.size(); ++i) {
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index ed09885ace4..07a79487aa9 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -412,17 +412,28 @@ struct TTestSchema {
bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap);
void ProvideTieringSnapshot(TTestBasicRuntime& runtime, TActorId& sender, NMetadata::NFetcher::ISnapshot::TPtr snapshot);
void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap);
+
void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true);
-bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId,
- const TString& data, std::shared_ptr<arrow::Schema> schema = {}, bool waitResult = true);
+
+bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 writeId, const ui64 tableId, const TString& data,
+ const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, std::vector<ui64>* writeIds);
+
+bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 writeId, const ui64 tableId, const TString& data,
+ const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, bool waitResult = true, std::vector<ui64>* writeIds = nullptr);
+
std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId,
- ui64 tableId, const TString& dedupId, const TString& data,
- std::shared_ptr<arrow::Schema> schema = {});
-ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 metaShard);
+ ui64 tableId, const ui64 writePartId, const TString& data,
+ const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema);
+
+ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 shardId, std::vector<ui64>* writeIds = nullptr);
+
void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vector<ui64>& pathIds,
NOlap::TSnapshot snap, ui64 scanId = 0);
-void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 txId, const std::vector<ui64>& writeIds);
+
+void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds);
void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds);
+
+void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 planStep, const TSet<ui64>& txIds);
void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, const TSet<ui64>& txIds);
inline void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, ui64 txId) {
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 14a5cceeca9..2dd54ea1f48 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -377,8 +377,6 @@ void TestWrite(const TestTableDescription& table) {
runtime.DispatchEvents(options);
//
-
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
@@ -386,36 +384,36 @@ void TestWrite(const TestTableDescription& table) {
const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema = table.Schema;
- bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, ydbSchema));
+ bool ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, ydbSchema), ydbSchema);
UNIT_ASSERT(ok);
std::vector<std::pair<TString, TTypeInfo>> schema = ydbSchema;
// no data
- ok = WriteData(runtime, sender, metaShard, writeId, tableId, TString());
+ ok = WriteData(runtime, sender, writeId++, tableId, TString(), ydbSchema);
UNIT_ASSERT(!ok);
// null column in PK
TTestBlobOptions optsNulls;
optsNulls.NullColumns.emplace("timestamp");
- ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, ydbSchema, optsNulls));
+ ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, ydbSchema, optsNulls), ydbSchema);
UNIT_ASSERT(!ok);
// missing columns
schema.resize(4);
- ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema));
- UNIT_ASSERT(!ok);
+ ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema);
+ UNIT_ASSERT(ok);
// wrong first key column type (with supported layout: Int64 vs Timestamp)
// It fails only if we specify source schema. No way to detect it from serialized batch data.
schema = ydbSchema;
schema[0].second = TTypeInfo(NTypeIds::Int64);
- ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema),
- NArrow::MakeArrowSchema(schema));
+ ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema),
+ schema);
UNIT_ASSERT(!ok);
// wrong type (no additional schema - fails in case of wrong layout)
@@ -423,7 +421,7 @@ void TestWrite(const TestTableDescription& table) {
for (size_t i = 0; i < ydbSchema.size(); ++i) {
schema = ydbSchema;
schema[i].second = TTypeInfo(NTypeIds::Int8);
- ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema));
+ ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema);
UNIT_ASSERT(!ok);
}
@@ -432,15 +430,14 @@ void TestWrite(const TestTableDescription& table) {
for (size_t i = 0; i < ydbSchema.size(); ++i) {
schema = ydbSchema;
schema[i].second = TTypeInfo(NTypeIds::Int64);
- ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema),
- NArrow::MakeArrowSchema(schema));
+ ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema);
UNIT_ASSERT(ok == (ydbSchema[i].second == TTypeInfo(NTypeIds::Int64)));
}
schema = ydbSchema;
schema[1].second = TTypeInfo(NTypeIds::Utf8);
schema[5].second = TTypeInfo(NTypeIds::Int32);
- ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema));
+ ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema);
UNIT_ASSERT(!ok);
// reordered columns
@@ -452,11 +449,7 @@ void TestWrite(const TestTableDescription& table) {
schema.push_back({name, typeInfo});
}
- ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema));
- UNIT_ASSERT(!ok);
-
- ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema),
- NArrow::MakeArrowSchema(schema));
+ ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema);
UNIT_ASSERT(ok);
// too much data
@@ -464,7 +457,7 @@ void TestWrite(const TestTableDescription& table) {
TString bigData = MakeTestBlob({0, 150 * 1000}, ydbSchema);
UNIT_ASSERT(bigData.size() > NColumnShard::TLimits::GetMaxBlobSize());
UNIT_ASSERT(bigData.size() < NColumnShard::TLimits::GetMaxBlobSize() + 2 * 1024 * 1024);
- ok = WriteData(runtime, sender, metaShard, writeId, tableId, bigData);
+ ok = WriteData(runtime, sender, writeId++, tableId, bigData, ydbSchema);
UNIT_ASSERT(!ok);
}
@@ -480,8 +473,6 @@ void TestWriteOverload(const TestTableDescription& table) {
runtime.DispatchEvents(options);
//
-
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
@@ -521,17 +512,17 @@ void TestWriteOverload(const TestTableDescription& table) {
const ui32 toSend = toCatch + 1;
for (ui32 i = 0; i < toSend; ++i) {
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, testBlob, {}, false));
+ UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testBlob, table.Schema, false));
}
- UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, metaShard), (ui32)NKikimrTxColumnShard::EResultStatus::OVERLOADED);
+ UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, TTestTxConfig::TxTablet0), (ui32)NKikimrTxColumnShard::EResultStatus::OVERLOADED);
while (capturedWrites.size()) {
resendOneCaptured();
- UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, metaShard), (ui32)NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, TTestTxConfig::TxTablet0), (ui32)NKikimrTxColumnShard::EResultStatus::SUCCESS);
}
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, testBlob)); // OK after overload
+ UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testBlob, table.Schema)); // OK after overload
}
// TODO: Improve test. It does not catch KIKIMR-14890
@@ -565,8 +556,9 @@ void TestWriteReadDup(const TestTableDescription& table = {}) {
for (ui64 planStep = initPlanStep; planStep < initPlanStep + 50; ++planStep) {
TSet<ui64> txIds;
for (ui32 i = 0; i <= 5; ++i) {
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, testData));
- ProposeCommit(runtime, sender, metaShard, ++txId, {writeId});
+ std::vector<ui64> writeIds;
+ UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds));
+ ProposeCommit(runtime, sender, ++txId, writeIds);
txIds.insert(txId);
}
PlanCommit(runtime, sender, planStep, txIds);
@@ -617,7 +609,7 @@ void TestWriteReadLongTxDup() {
auto data = MakeTestBlob({portion.first + i, portion.second + i}, ydbSchema);
UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
- auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, "0", data);
+ auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, 1, data, ydbSchema);
UNIT_ASSERT(writeIdOpt);
if (!i) {
writeId = *writeIdOpt;
@@ -666,18 +658,18 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
runtime.DispatchEvents(options);
- auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId,
- const TString& data) {
- bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, data);
+ auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 writeId, ui64 tableId,
+ const TString& data, const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, std::vector<ui64>& intWriteIds) {
+ bool ok = WriteData(runtime, sender, writeId, tableId, data, ydbSchema, true, &intWriteIds);
if (reboots) {
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
}
return ok;
};
- auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 txId,
+ auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 txId,
const std::vector<ui64>& writeIds) {
- ProposeCommit(runtime, sender, metaShard, txId, writeIds);
+ ProposeCommit(runtime, sender, txId, writeIds);
if (reboots) {
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
}
@@ -714,7 +706,8 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
// write 1: ins:1, cmt:0, idx:0
- UNIT_ASSERT(write(runtime, sender, metaShard, writeId, tableId, MakeTestBlob(portion[0], ydbSchema)));
+ std::vector<ui64> intWriteIds;
+ UNIT_ASSERT(write(runtime, sender, writeId, tableId, MakeTestBlob(portion[0], ydbSchema), ydbSchema, intWriteIds));
// read
@@ -736,7 +729,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
ui64 planStep = 21;
ui64 txId = 100;
- proposeCommit(runtime, sender, metaShard, txId, {writeId});
+ proposeCommit(runtime, sender, txId, intWriteIds);
planCommit(runtime, sender, planStep, txId);
// read 2 (committed, old snapshot)
@@ -822,23 +815,25 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
// write 2 (big portion of data): ins:1, cmt:1, idx:0
++writeId;
+ intWriteIds.clear();
{
TString triggerData = MakeTestBlob(portion[1], ydbSchema);
UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
- UNIT_ASSERT(write(runtime, sender, metaShard, writeId, tableId, triggerData));
+ UNIT_ASSERT(write(runtime, sender, writeId, tableId, triggerData, ydbSchema, intWriteIds));
}
// commit 2 (init indexation): ins:0, cmt:0, idx:1
planStep = 22;
++txId;
- proposeCommit(runtime, sender, metaShard, txId, {writeId});
+ proposeCommit(runtime, sender, txId, intWriteIds);
planCommit(runtime, sender, planStep, txId);
// write 3: ins:1, cmt:0, idx:1
++writeId;
- UNIT_ASSERT(write(runtime, sender, metaShard, writeId, tableId, MakeTestBlob(portion[2], ydbSchema)));
+ intWriteIds.clear();
+ UNIT_ASSERT(write(runtime, sender, writeId, tableId, MakeTestBlob(portion[2], ydbSchema), ydbSchema, intWriteIds));
// read 6, planstep 0
@@ -909,13 +904,14 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
planStep = 23;
++txId;
- proposeCommit(runtime, sender, metaShard, txId, {writeId});
+ proposeCommit(runtime, sender, txId, intWriteIds);
planCommit(runtime, sender, planStep, txId);
// write 4: ins:1, cmt:1, idx:1
++writeId;
- UNIT_ASSERT(write(runtime, sender, metaShard, writeId, tableId, MakeTestBlob(portion[3], ydbSchema)));
+ intWriteIds.clear();
+ UNIT_ASSERT(write(runtime, sender, writeId, tableId, MakeTestBlob(portion[3], ydbSchema), ydbSchema, intWriteIds));
// read 9 (committed, indexed)
@@ -935,7 +931,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
planStep = 24;
++txId;
- proposeCommit(runtime, sender, metaShard, txId, {writeId});
+ proposeCommit(runtime, sender, txId, intWriteIds);
planCommit(runtime, sender, planStep, txId);
// read 10
@@ -1092,18 +1088,18 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
runtime.DispatchEvents(options);
- auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId,
- const TString& data) {
- bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, data);
+ auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 writeId, ui64 tableId,
+ const TString& data, const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, std::vector<ui64>& writeIds) {
+ bool ok = WriteData(runtime, sender, writeId, tableId, data, ydbSchema, true, &writeIds);
if (reboots) {
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
}
return ok;
};
- auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 txId,
+ auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 txId,
const std::vector<ui64>& writeIds) {
- ProposeCommit(runtime, sender, metaShard, txId, writeIds);
+ ProposeCommit(runtime, sender, txId, writeIds);
if (reboots) {
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
}
@@ -1150,15 +1146,14 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table
std::pair<ui64, ui64> portion = {pos, pos + portionSize};
TString data = MakeTestBlob(portion, ydbSchema);
- ids.push_back(writeId);
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, data));
+ UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, data, ydbSchema, true, &ids));
}
if (reboots) {
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
}
- proposeCommit(runtime, sender, metaShard, txId, ids);
+ proposeCommit(runtime, sender, txId, ids);
planCommit(runtime, sender, planStep, txId);
}
std::pair<ui64, ui64> smallWrites = {triggerPortionSize, pos};
@@ -1168,9 +1163,10 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table
ui32 numTxs = engineLimits.GranuleSizeForOverloadPrevent / triggerData.size() + 1;
for (ui32 i = 0; i < numTxs; ++i, ++writeId, ++planStep, ++txId) {
- UNIT_ASSERT(write(runtime, sender, metaShard, writeId, tableId, triggerData));
+ std::vector<ui64> writeIds;
+ UNIT_ASSERT(write(runtime, sender, writeId, tableId, triggerData, ydbSchema, writeIds));
- proposeCommit(runtime, sender, metaShard, txId, {writeId});
+ proposeCommit(runtime, sender, txId, writeIds);
planCommit(runtime, sender, planStep, txId);
}
@@ -1406,10 +1402,11 @@ void TestReadWithProgram(const TestTableDescription& table = {})
SetupSchema(runtime, sender, tableId, table);
{ // write some data
- bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, table.Schema));
+ std::vector<ui64> writeIds;
+ bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({0, 100}, table.Schema), table.Schema, true, &writeIds);
UNIT_ASSERT(ok);
- ProposeCommit(runtime, sender, metaShard, txId, {writeId});
+ ProposeCommit(runtime, sender, txId, writeIds);
PlanCommit(runtime, sender, planStep, txId);
}
@@ -1533,10 +1530,11 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) {
SetupSchema(runtime, sender, tableId, table);
{ // write some data
- bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, table.Schema));
+ std::vector<ui64> writeIds;
+ bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({0, 100}, table.Schema), table.Schema, true, &writeIds);
UNIT_ASSERT(ok);
- ProposeCommit(runtime, sender, metaShard, txId, {writeId});
+ ProposeCommit(runtime, sender, txId, writeIds);
PlanCommit(runtime, sender, planStep, txId);
}
@@ -1627,10 +1625,11 @@ void TestSomePrograms(const TestTableDescription& table) {
SetupSchema(runtime, sender, tableId, table);
{ // write some data
- bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, table.Schema));
+ std::vector<ui64> writeIds;
+ bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({0, 100}, table.Schema), table.Schema, true, &writeIds);
UNIT_ASSERT(ok);
- ProposeCommit(runtime, sender, metaShard, txId, {writeId});
+ ProposeCommit(runtime, sender, txId, writeIds);
PlanCommit(runtime, sender, planStep, txId);
}
@@ -1712,10 +1711,11 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche
SetupSchema(runtime, sender, tableId, table);
{ // write some data
- bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, testDataBlob);
+ std::vector<ui64> writeIds;
+ bool ok = WriteData(runtime, sender, writeId, tableId, testDataBlob, table.Schema, true, &writeIds);
UNIT_ASSERT(ok);
- ProposeCommit(runtime, sender, metaShard, txId, {writeId});
+ ProposeCommit(runtime, sender, txId, writeIds);
PlanCommit(runtime, sender, planStep, txId);
}
@@ -2392,9 +2392,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::GetMaxBlobSize());
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData));
+ std::vector<ui64> writeIds;
+ UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, triggerData, table.Schema, true, &writeIds));
- ProposeCommit(runtime, sender, metaShard, txId, { writeId });
+ ProposeCommit(runtime, sender, txId, writeIds);
PlanCommit(runtime, sender, planStep, txId);
}
}
@@ -2674,9 +2675,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
std::pair<ui64, ui64> triggerPortion = {1, 1000};
TString triggerData = MakeTestBlob(triggerPortion, ydbSchema);
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData));
+ std::vector<ui64> writeIds;
+ UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, triggerData, ydbSchema, true, &writeIds));
- ProposeCommit(runtime, sender, metaShard, txId, {writeId});
+ ProposeCommit(runtime, sender, txId, writeIds);
PlanCommit(runtime, sender, planStep, txId);
}
@@ -2884,9 +2886,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
// Overwrite the same data multiple times to produce multiple portions at different timestamps
ui32 numWrites = 14;
for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) {
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData));
+ std::vector<ui64> writeIds;
+ UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, triggerData, ydbSchema, true, &writeIds));
- ProposeCommit(runtime, sender, metaShard, txId, {writeId});
+ ProposeCommit(runtime, sender, txId, writeIds);
PlanCommit(runtime, sender, planStep, txId);
}
@@ -2894,9 +2897,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
{
TString smallData = MakeTestBlob({0, 2}, ydbSchema);
UNIT_ASSERT(smallData.size() < 100 * 1024);
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, smallData));
+ std::vector<ui64> writeIds;
+ UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, smallData, ydbSchema, true, &writeIds));
- ProposeCommit(runtime, sender, metaShard, txId, {writeId});
+ ProposeCommit(runtime, sender, txId, writeIds);
PlanCommit(runtime, sender, planStep, txId);
++writeId;
++planStep;
@@ -2949,9 +2953,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
planStep += delay.MilliSeconds();
numWrites = 10;
for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) {
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData));
+ std::vector<ui64> writeIds;
+ UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, triggerData, ydbSchema, true, &writeIds));
- ProposeCommit(runtime, sender, metaShard, txId, {writeId});
+ ProposeCommit(runtime, sender, txId, writeIds);
PlanCommit(runtime, sender, planStep, txId);
}
@@ -2980,9 +2985,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
planStep += 2 * delay.MilliSeconds();
numWrites = 2;
for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) {
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData));
+ std::vector<ui64> writeIds;
+ UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, triggerData, ydbSchema, true, &writeIds));
- ProposeCommit(runtime, sender, metaShard, txId, {writeId});
+ ProposeCommit(runtime, sender, txId, writeIds);
PlanCommit(runtime, sender, planStep, txId);
}
diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
index c70ec3a5685..93309fde2af 100644
--- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
@@ -257,8 +257,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
auto blobs = MakeData(ts, PORTION_ROWS, PORTION_ROWS / 2, spec.TtlColumn, ydbSchema);
UNIT_ASSERT_EQUAL(blobs.size(), 2);
for (auto& data : blobs) {
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data));
- ProposeCommit(runtime, sender, metaShard, ++txId, {writeId});
+ std::vector<ui64> writeIds;
+ UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, data, ydbSchema, true, &writeIds));
+ ProposeCommit(runtime, sender, ++txId, writeIds);
PlanCommit(runtime, sender, ++planStep, txId);
}
@@ -345,8 +346,10 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
}
PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, blobs[0]));
- ProposeCommit(runtime, sender, metaShard, ++txId, {writeId});
+
+ std::vector<ui64> writeIds;
+ UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, blobs[0], ydbSchema, true, &writeIds));
+ ProposeCommit(runtime, sender, ++txId, writeIds);
PlanCommit(runtime, sender, ++planStep, txId);
if (internal) {
@@ -613,8 +616,9 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
}
for (auto& data : blobs) {
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data));
- ProposeCommit(runtime, sender, metaShard, ++txId, {writeId});
+ std::vector<ui64> writeIds;
+ UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, data, testYdbSchema, true, &writeIds));
+ ProposeCommit(runtime, sender, ++txId, writeIds);
PlanCommit(runtime, sender, ++planStep, txId);
}
@@ -1080,13 +1084,15 @@ void TestDrop(bool reboots) {
UNIT_ASSERT(data2.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
// Write into index
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data1));
- ProposeCommit(runtime, sender, metaShard, ++txId, {writeId});
+ std::vector<ui64> writeIds;
+ UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, data1, testYdbSchema, true, &writeIds));
+ ProposeCommit(runtime, sender, ++txId, writeIds);
PlanCommit(runtime, sender, ++planStep, txId);
// Write into InsertTable
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data2));
- ProposeCommit(runtime, sender, metaShard, ++txId, {writeId});
+ writeIds.clear();
+ UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, data2, testYdbSchema, true, &writeIds));
+ ProposeCommit(runtime, sender, ++txId, writeIds);
PlanCommit(runtime, sender, ++planStep, txId);
if (reboots) {
@@ -1150,7 +1156,7 @@ void TestDropWriteRace() {
UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
// Write into InsertTable
- auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, "0", data);
+ auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, 1, data, testYdbSchema);
UNIT_ASSERT(writeIdOpt);
ProposeCommit(runtime, sender, ++txId, {*writeIdOpt});
auto commitTxId = txId;
@@ -1179,8 +1185,6 @@ void TestCompaction(std::optional<ui32> numWrites = {}) {
runtime.DispatchEvents(options);
// Create table
-
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
ui64 planStep = 100;
@@ -1227,9 +1231,10 @@ void TestCompaction(std::optional<ui32> numWrites = {}) {
++planStep;
++txId;
for (ui32 i = 0; i < *numWrites; ++i, ++writeId, ++planStep, ++txId) {
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData));
+ std::vector<ui64> writeIds;
+ UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, triggerData, testYdbSchema, true, &writeIds));
- ProposeCommit(runtime, sender, metaShard, txId, {writeId});
+ ProposeCommit(runtime, sender, txId, writeIds);
PlanCommit(runtime, sender, planStep, txId);
if (i % 2 == 0) {
diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h
index 2b9965ceb44..d15a59af45d 100644
--- a/ydb/core/tx/ev_write/write_data.h
+++ b/ydb/core/tx/ev_write/write_data.h
@@ -23,7 +23,7 @@ class TWriteMeta {
// Long Tx logic
YDB_OPT(NLongTxService::TLongTxId, LongTxId)
YDB_ACCESSOR(ui64, WritePartId, 0);
- YDB_ACCESSOR(ui64, MetaShard, 0);
+ YDB_READONLY(ui64, MetaShard, 0);
YDB_ACCESSOR_DEF(TString, DedupId);
public:
diff --git a/ydb/core/tx/schemeshard/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap.cpp
index 0a5ca55d2b4..45501d22e6d 100644
--- a/ydb/core/tx/schemeshard/ut_olap.cpp
+++ b/ydb/core/tx/schemeshard/ut_olap.cpp
@@ -1,5 +1,6 @@
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/columnshard/columnshard.h>
+#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
@@ -42,100 +43,6 @@ static const TVector<std::pair<TString, TTypeInfo>> defaultYdbSchema = {
{"data", TTypeInfo(NTypeIds::Utf8) }
};
-TString MakeTestBlob(std::pair<ui64, ui64> range) {
- TString err;
- NArrow::TArrowBatchBuilder batchBuilder(arrow::Compression::LZ4_FRAME);
- batchBuilder.Start(defaultYdbSchema, 0, 0, err);
-
- TString str;
- TVector<TTypeInfo> types = {
- TTypeInfo(NTypeIds::Timestamp),
- TTypeInfo(NTypeIds::Utf8)
- };
-
- for (size_t i = range.first; i < range.second; ++i) {
- str = ToString(i);
-
- TVector<TCell> cells;
- cells.push_back(TCell::Make<ui64>(i));
- cells.push_back(TCell(str.data(), str.size()));
-
- NKikimr::TDbTupleRef unused;
- batchBuilder.AddRow(unused, NKikimr::TDbTupleRef(types.data(), cells.data(), 2));
- }
-
- auto batch = batchBuilder.FlushBatch(true);
- UNIT_ASSERT(batch);
- auto status = batch->ValidateFull();
- UNIT_ASSERT(status.ok());
-
- return batchBuilder.Finish();
-}
-
-static constexpr ui64 txInitiator = 42; // 0 means LongTx, we need another value here
-
-void WriteData(TTestBasicRuntime& runtime, TActorId sender, ui64 tabletId, ui64 pathId, ui64 writeId, TString data) {
- const TString dedupId = ToString(writeId);
-
- auto evWrite = std::make_unique<TEvColumnShard::TEvWrite>(sender, txInitiator, writeId, pathId, dedupId, data, 1);
-
- ForwardToTablet(runtime, tabletId, sender, evWrite.release());
- TAutoPtr<IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle);
- UNIT_ASSERT(event);
-
- auto& resWrite = Proto(event);
- UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), tabletId);
- UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), txInitiator);
- UNIT_ASSERT_EQUAL(resWrite.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
-}
-
-void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 tabletId, ui64 txId, const TVector<ui64>& writeIds) {
- NKikimrTxColumnShard::ETransactionKind txKind = NKikimrTxColumnShard::ETransactionKind::TX_KIND_COMMIT;
- TString txBody;
- {
- NKikimrTxColumnShard::TCommitTxBody proto;
- proto.SetTxInitiator(txInitiator);
- for (ui64 id : writeIds) {
- proto.AddWriteIds(id);
- }
-
- Y_PROTOBUF_SUPPRESS_NODISCARD proto.SerializeToString(&txBody);
- }
-
- ForwardToTablet(runtime, tabletId, sender,
- new TEvColumnShard::TEvProposeTransaction(txKind, sender, txId, txBody));
- TAutoPtr<IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(handle);
- UNIT_ASSERT(event);
-
- auto& res = Proto(event);
- UNIT_ASSERT_EQUAL(res.GetTxKind(), txKind);
- UNIT_ASSERT_EQUAL(res.GetTxId(), txId);
- UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::PREPARED);
-}
-
-void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 tabletId, ui64 planStep, const TSet<ui64>& txIds) {
- auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(planStep, txInitiator, tabletId);
- for (ui64 txId : txIds) {
- auto tx = plan->Record.AddTransactions();
- tx->SetTxId(txId);
- ActorIdToProto(sender, tx->MutableAckTo());
- }
-
- ForwardToTablet(runtime, tabletId, sender, plan.release());
- TAutoPtr<IEventHandle> handle;
-
- for (ui32 i = 0; i < txIds.size(); ++i) {
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(handle);
- UNIT_ASSERT(event);
-
- auto& res = Proto(event);
- UNIT_ASSERT(txIds.count(res.GetTxId()));
- UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- }
-}
-
}}
@@ -788,27 +695,28 @@ Y_UNIT_TEST_SUITE(TOlap) {
{ // Write data directly into shard
TActorId sender = runtime.AllocateEdgeActor();
- TString data = MakeTestBlob({0, rowsInBatch});
+ TString data = NTxUT::MakeTestBlob({0, rowsInBatch}, defaultYdbSchema);
ui64 writeId = 0;
TSet<ui64> txIds;
for (ui32 i = 0; i < 10; ++i) {
- WriteData(runtime, sender, shardId, pathId, ++writeId, data);
- ProposeCommit(runtime, sender, shardId, ++txId, {writeId});
+ std::vector<ui64> writeIds;
+ NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds);
+ NTxUT::ProposeCommit(runtime, sender, shardId, ++txId, writeIds);
txIds.insert(txId);
}
- PlanCommit(runtime, sender, shardId, ++planStep, txIds);
+ NTxUT::PlanCommit(runtime, sender, shardId, ++planStep, txIds);
// emulate timeout
runtime.UpdateCurrentTime(TInstant::Now());
// trigger periodic stats at shard (after timeout)
- WriteData(runtime, sender, shardId, pathId, ++writeId, data);
- ProposeCommit(runtime, sender, shardId, ++txId, {writeId});
- txIds = {txId};
- PlanCommit(runtime, sender, shardId, ++planStep, txIds);
+ std::vector<ui64> writeIds;
+ NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds);
+ NTxUT::ProposeCommit(runtime, sender, shardId, ++txId, writeIds);
+ NTxUT::PlanCommit(runtime, sender, shardId, ++planStep, {txId});
}
auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/OlapStore", true, true);
diff --git a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.darwin-x86_64.txt
index a899ec6c6c2..e0a3bd24e23 100644
--- a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.darwin-x86_64.txt
@@ -39,6 +39,7 @@ target_link_options(ydb-core-tx-schemeshard-ut_olap PRIVATE
)
target_sources(ydb-core-tx-schemeshard-ut_olap PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_olap.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-aarch64.txt
index 792c5ec3d20..e00b392f1ad 100644
--- a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-aarch64.txt
@@ -42,6 +42,7 @@ target_link_options(ydb-core-tx-schemeshard-ut_olap PRIVATE
)
target_sources(ydb-core-tx-schemeshard-ut_olap PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_olap.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-x86_64.txt b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-x86_64.txt
index 4ae2f06628e..0d6cf1b0375 100644
--- a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.linux-x86_64.txt
@@ -43,6 +43,7 @@ target_link_options(ydb-core-tx-schemeshard-ut_olap PRIVATE
)
target_sources(ydb-core-tx-schemeshard-ut_olap PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_olap.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.windows-x86_64.txt b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.windows-x86_64.txt
index 3bb760793ab..21e10eec1db 100644
--- a/ydb/core/tx/schemeshard/ut_olap/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/schemeshard/ut_olap/CMakeLists.windows-x86_64.txt
@@ -32,6 +32,7 @@ target_link_libraries(ydb-core-tx-schemeshard-ut_olap PUBLIC
)
target_sources(ydb-core-tx-schemeshard-ut_olap PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_olap.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/schemeshard/ut_olap/ya.make b/ydb/core/tx/schemeshard/ut_olap/ya.make
index ab7fb360ffe..d78aa0d531e 100644
--- a/ydb/core/tx/schemeshard/ut_olap/ya.make
+++ b/ydb/core/tx/schemeshard/ut_olap/ya.make
@@ -29,6 +29,7 @@ YQL_LAST_ABI_VERSION()
SRCS(
ut_olap.cpp
+ ydb/core/tx/columnshard/columnshard_ut_common.cpp
)
END()