summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp11
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp18
-rw-r--r--ydb/core/tx/columnshard/operations/manager.cpp3
-rw-r--r--ydb/core/tx/columnshard/operations/write.h1
-rw-r--r--ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp6
-rw-r--r--ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h6
-rw-r--r--ydb/core/tx/columnshard/test_helper/shard_reader.cpp101
-rw-r--r--ydb/core/tx/columnshard/test_helper/shard_reader.h100
-rw-r--r--ydb/core/tx/columnshard/test_helper/shard_writer.cpp63
-rw-r--r--ydb/core/tx/columnshard/test_helper/shard_writer.h43
-rw-r--r--ydb/core/tx/columnshard/test_helper/ya.make2
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp620
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp124
13 files changed, 527 insertions, 571 deletions
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
index 3fab87ca4b5..5b66a0587b5 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
@@ -97,17 +97,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
- } else if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) {
- NKikimrTxColumnShard::TCommitWriteTxBody proto;
- proto.SetLockId(operation->GetLockId());
- TString txBody;
- Y_ABORT_UNLESS(proto.SerializeToString(&txBody));
- auto op = Self->GetProgressTxController().StartProposeOnExecute(
- TTxController::TTxInfo(
- NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetLockId(), writeMeta.GetSource(), operation->GetCookie(), {}),
- txBody, txc);
- AFL_VERIFY(!op->IsFail());
- ResultOperators.emplace_back(op);
} else {
auto& info = Self->OperationsManager->GetLockVerified(operation->GetLockId());
NKikimrDataEvents::TLock lock;
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 93dda2268ae..981443f26ee 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -459,12 +459,12 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
if (conclusionParse.IsFail()) {
sendError(conclusionParse.GetErrorMessage(), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
} else {
- if (commitOperation->NeedSyncLocks()) {
- auto* lockInfo = OperationsManager->GetLockOptional(commitOperation->GetLockId());
- if (!lockInfo) {
- sendError("haven't lock for commit: " + ::ToString(commitOperation->GetLockId()),
- NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED);
- } else {
+ auto* lockInfo = OperationsManager->GetLockOptional(commitOperation->GetLockId());
+ if (!lockInfo) {
+ sendError("haven't lock for commit: " + ::ToString(commitOperation->GetLockId()),
+ NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
+ } else {
+ if (commitOperation->NeedSyncLocks()) {
if (lockInfo->GetGeneration() != commitOperation->GetGeneration()) {
sendError("tablet lock have another generation: " + ::ToString(lockInfo->GetGeneration()) +
" != " + ::ToString(commitOperation->GetGeneration()),
@@ -477,9 +477,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
} else {
Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx);
}
+ } else {
+ Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx);
}
- } else {
- Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx);
}
}
return;
@@ -557,8 +557,6 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
ui64 lockId = 0;
if (behaviour == EOperationBehaviour::NoTxWrite) {
lockId = BuildEphemeralTxId();
- } else if (behaviour == EOperationBehaviour::InTxWrite) {
- lockId = record.GetTxId();
} else {
lockId = record.GetLockTxId();
}
diff --git a/ydb/core/tx/columnshard/operations/manager.cpp b/ydb/core/tx/columnshard/operations/manager.cpp
index 1527ec5d028..2fdb5d0e181 100644
--- a/ydb/core/tx/columnshard/operations/manager.cpp
+++ b/ydb/core/tx/columnshard/operations/manager.cpp
@@ -255,9 +255,6 @@ TConclusion<EOperationBehaviour> TOperationsManager::GetBehaviour(const NEvents:
return EOperationBehaviour::NoTxWrite;
}
- if (evWrite.Record.HasTxId() && evWrite.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_PREPARE) {
- return EOperationBehaviour::InTxWrite;
- }
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("proto", evWrite.Record.DebugString())("event", "undefined behaviour");
return TConclusionStatus::Fail("undefined request for detect tx type");
}
diff --git a/ydb/core/tx/columnshard/operations/write.h b/ydb/core/tx/columnshard/operations/write.h
index ad22caa651d..d388d09eaf3 100644
--- a/ydb/core/tx/columnshard/operations/write.h
+++ b/ydb/core/tx/columnshard/operations/write.h
@@ -37,7 +37,6 @@ enum class EOperationStatus : ui32 {
enum class EOperationBehaviour : ui32 {
Undefined = 1,
- InTxWrite = 2,
WriteWithLock = 3,
CommitWriteLock = 4,
AbortWriteLock = 5,
diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
index 31de6ffef8a..3ebdb21fb67 100644
--- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
@@ -63,7 +63,7 @@ bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString
return (res.GetStatus() == NKikimrTxColumnShard::PREPARED);
}
-void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap) {
+void PlanSchemaTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap) {
auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0);
auto tx = plan->Record.AddTransactions();
tx->SetTxId(snap.GetTxId());
@@ -78,7 +78,7 @@ void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot
UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::SUCCESS);
}
-void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult) {
+void PlanWriteTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap, bool waitResult) {
auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0);
auto tx = plan->Record.AddTransactions();
tx->SetTxId(snap.GetTxId());
@@ -229,7 +229,7 @@ void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, con
PlanCommit(runtime, sender, TTestTxConfig::TxTablet0, planStep, txIds);
}
-void Wakeup(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId) {
+void Wakeup(TTestBasicRuntime& runtime, const TActorId& sender, const ui64 shardId) {
auto wakeup = std::make_unique<TEvPrivate::TEvPeriodicWakeup>(true);
ForwardToTablet(runtime, shardId, sender, wakeup.release());
}
diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
index 7594be5da95..8a836925282 100644
--- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
@@ -402,9 +402,9 @@ struct TTestSchema {
bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap);
void ProvideTieringSnapshot(TTestBasicRuntime& runtime, const TActorId& sender, NMetadata::NFetcher::ISnapshot::TPtr snapshot);
-void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap);
+void PlanSchemaTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap);
-void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true);
+void PlanWriteTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true);
bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 writeId, const ui64 tableId, const TString& data,
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>* writeIds,
@@ -435,7 +435,7 @@ inline void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planSt
PlanCommit(runtime, sender, planStep, ids);
}
-void Wakeup(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId);
+void Wakeup(TTestBasicRuntime& runtime, const TActorId& sender, const ui64 shardId);
struct TTestBlobOptions {
THashSet<TString> NullColumns;
diff --git a/ydb/core/tx/columnshard/test_helper/shard_reader.cpp b/ydb/core/tx/columnshard/test_helper/shard_reader.cpp
new file mode 100644
index 00000000000..6b3ce1a5a1b
--- /dev/null
+++ b/ydb/core/tx/columnshard/test_helper/shard_reader.cpp
@@ -0,0 +1,101 @@
+#include "shard_reader.h"
+
+namespace NKikimr::NTxUT {
+
+std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TShardReader::BuildStartEvent() const {
+ auto ev = std::make_unique<TEvDataShard::TEvKqpScan>();
+ ev->Record.SetLocalPathId(PathId);
+ ev->Record.MutableSnapshot()->SetStep(Snapshot.GetPlanStep());
+ ev->Record.MutableSnapshot()->SetTxId(Snapshot.GetTxId());
+
+ ev->Record.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_FULL);
+ ev->Record.SetTxId(Snapshot.GetTxId());
+
+ ev->Record.SetReverse(Reverse);
+ ev->Record.SetItemsLimit(Limit);
+
+ ev->Record.SetDataFormat(NKikimrDataEvents::FORMAT_ARROW);
+
+ auto protoRanges = ev->Record.MutableRanges();
+ protoRanges->Reserve(Ranges.size());
+ for (auto& range : Ranges) {
+ auto newRange = protoRanges->Add();
+ range.Serialize(*newRange);
+ }
+
+ if (ProgramProto) {
+ NKikimrSSA::TOlapProgram olapProgram;
+ {
+ TString programBytes;
+ TStringOutput stream(programBytes);
+ ProgramProto->SerializeToArcadiaStream(&stream);
+ olapProgram.SetProgram(programBytes);
+ }
+ {
+ TString programBytes;
+ TStringOutput stream(programBytes);
+ olapProgram.SerializeToArcadiaStream(&stream);
+ ev->Record.SetOlapProgram(programBytes);
+ }
+ ev->Record.SetOlapProgramType(NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS);
+ } else if (SerializedProgram) {
+ ev->Record.SetOlapProgram(*SerializedProgram);
+ ev->Record.SetOlapProgramType(NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS);
+ }
+
+ return ev;
+}
+
+NKikimr::NTxUT::TShardReader& TShardReader::SetReplyColumns(const std::vector<TString>& replyColumns) {
+ AFL_VERIFY(!SerializedProgram);
+ if (!ProgramProto) {
+ ProgramProto = NKikimrSSA::TProgram();
+ }
+ for (auto&& command : *ProgramProto->MutableCommand()) {
+ if (command.HasProjection()) {
+ NKikimrSSA::TProgram::TProjection proj;
+ for (auto&& i : replyColumns) {
+ proj.AddColumns()->SetName(i);
+ }
+ *command.MutableProjection() = proj;
+ return *this;
+ }
+ }
+ {
+ auto* command = ProgramProto->AddCommand();
+ NKikimrSSA::TProgram::TProjection proj;
+ for (auto&& i : replyColumns) {
+ proj.AddColumns()->SetName(i);
+ }
+ *command->MutableProjection() = proj;
+ }
+ return *this;
+}
+
+NKikimr::NTxUT::TShardReader& TShardReader::SetReplyColumnIds(const std::vector<ui32>& replyColumnIds) {
+ AFL_VERIFY(!SerializedProgram);
+ if (!ProgramProto) {
+ ProgramProto = NKikimrSSA::TProgram();
+ }
+ for (auto&& command : *ProgramProto->MutableCommand()) {
+ if (command.HasProjection()) {
+ NKikimrSSA::TProgram::TProjection proj;
+ for (auto&& i : replyColumnIds) {
+ proj.AddColumns()->SetId(i);
+ }
+ *command.MutableProjection() = proj;
+ return *this;
+ }
+ }
+ {
+ auto* command = ProgramProto->AddCommand();
+ NKikimrSSA::TProgram::TProjection proj;
+ for (auto&& i : replyColumnIds) {
+ proj.AddColumns()->SetId(i);
+ }
+ *command->MutableProjection() = proj;
+ }
+ return *this;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/test_helper/shard_reader.h b/ydb/core/tx/columnshard/test_helper/shard_reader.h
index 2beaa5a782d..eb9f041062a 100644
--- a/ydb/core/tx/columnshard/test_helper/shard_reader.h
+++ b/ydb/core/tx/columnshard/test_helper/shard_reader.h
@@ -28,53 +28,7 @@ private:
std::vector<TString> ReplyColumns;
std::vector<TSerializedTableRange> Ranges;
- std::unique_ptr<TEvDataShard::TEvKqpScan> BuildStartEvent() const {
- auto ev = std::make_unique<TEvDataShard::TEvKqpScan>();
- ev->Record.SetLocalPathId(PathId);
- ev->Record.MutableSnapshot()->SetStep(Snapshot.GetPlanStep());
- ev->Record.MutableSnapshot()->SetTxId(Snapshot.GetTxId());
-
- ev->Record.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_FULL);
- ev->Record.SetTxId(Snapshot.GetTxId());
-
- ev->Record.SetReverse(Reverse);
- ev->Record.SetItemsLimit(Limit);
-
- ev->Record.SetDataFormat(NKikimrDataEvents::FORMAT_ARROW);
-
- auto protoRanges = ev->Record.MutableRanges();
- protoRanges->Reserve(Ranges.size());
- for (auto& range : Ranges) {
- auto newRange = protoRanges->Add();
- range.Serialize(*newRange);
- }
-
- if (ProgramProto) {
- NKikimrSSA::TOlapProgram olapProgram;
- {
- TString programBytes;
- TStringOutput stream(programBytes);
- ProgramProto->SerializeToArcadiaStream(&stream);
- olapProgram.SetProgram(programBytes);
- }
- {
- TString programBytes;
- TStringOutput stream(programBytes);
- olapProgram.SerializeToArcadiaStream(&stream);
- ev->Record.SetOlapProgram(programBytes);
- }
- ev->Record.SetOlapProgramType(
- NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS
- );
- } else if (SerializedProgram) {
- ev->Record.SetOlapProgram(*SerializedProgram);
- ev->Record.SetOlapProgramType(
- NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS
- );
- }
-
- return ev;
- }
+ std::unique_ptr<TEvDataShard::TEvKqpScan> BuildStartEvent() const;
std::vector<std::shared_ptr<arrow::RecordBatch>> ResultBatches;
YDB_READONLY(ui32, IterationsCount, 0);
@@ -100,57 +54,9 @@ public:
return r ? r->num_rows() : 0;
}
- TShardReader& SetReplyColumns(const std::vector<TString>& replyColumns) {
- AFL_VERIFY(!SerializedProgram);
- if (!ProgramProto) {
- ProgramProto = NKikimrSSA::TProgram();
- }
- for (auto&& command : *ProgramProto->MutableCommand()) {
- if (command.HasProjection()) {
- NKikimrSSA::TProgram::TProjection proj;
- for (auto&& i : replyColumns) {
- proj.AddColumns()->SetName(i);
- }
- *command.MutableProjection() = proj;
- return *this;
- }
- }
- {
- auto* command = ProgramProto->AddCommand();
- NKikimrSSA::TProgram::TProjection proj;
- for (auto&& i : replyColumns) {
- proj.AddColumns()->SetName(i);
- }
- *command->MutableProjection() = proj;
- }
- return *this;
- }
+ TShardReader& SetReplyColumns(const std::vector<TString>& replyColumns);
- TShardReader& SetReplyColumnIds(const std::vector<ui32>& replyColumnIds) {
- AFL_VERIFY(!SerializedProgram);
- if (!ProgramProto) {
- ProgramProto = NKikimrSSA::TProgram();
- }
- for (auto&& command : *ProgramProto->MutableCommand()) {
- if (command.HasProjection()) {
- NKikimrSSA::TProgram::TProjection proj;
- for (auto&& i : replyColumnIds) {
- proj.AddColumns()->SetId(i);
- }
- *command.MutableProjection() = proj;
- return *this;
- }
- }
- {
- auto* command = ProgramProto->AddCommand();
- NKikimrSSA::TProgram::TProjection proj;
- for (auto&& i : replyColumnIds) {
- proj.AddColumns()->SetId(i);
- }
- *command->MutableProjection() = proj;
- }
- return *this;
- }
+ TShardReader& SetReplyColumnIds(const std::vector<ui32>& replyColumnIds);
TShardReader& SetProgram(const NKikimrSSA::TProgram& p) {
AFL_VERIFY(!ProgramProto);
diff --git a/ydb/core/tx/columnshard/test_helper/shard_writer.cpp b/ydb/core/tx/columnshard/test_helper/shard_writer.cpp
new file mode 100644
index 00000000000..92e262d2f77
--- /dev/null
+++ b/ydb/core/tx/columnshard/test_helper/shard_writer.cpp
@@ -0,0 +1,63 @@
+#include "shard_writer.h"
+
+#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/testlib/tablet_helpers.h>
+#include <ydb/core/tx/columnshard/defs.h>
+#include <ydb/core/tx/data_events/events.h>
+#include <ydb/core/tx/data_events/payload_helper.h>
+
+namespace NKikimr::NTxUT {
+
+NKikimrDataEvents::TEvWriteResult::EStatus TShardWriter::StartCommit(const ui64 txId) {
+ auto evCommit = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+ evCommit->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
+ auto* lock = evCommit->Record.MutableLocks()->AddLocks();
+ lock->SetLockId(LockId);
+ ForwardToTablet(Runtime, TTestTxConfig::TxTablet0, Sender, evCommit.release());
+
+ TAutoPtr<NActors::IEventHandle> handle;
+ auto event = Runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
+ AFL_VERIFY(event);
+
+ return event->Record.GetStatus();
+}
+
+NKikimrDataEvents::TEvWriteResult::EStatus TShardWriter::Abort(const ui64 txId) {
+ auto evCommit = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+ evCommit->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Rollback);
+ auto* lock = evCommit->Record.MutableLocks()->AddLocks();
+ lock->SetLockId(LockId);
+ ForwardToTablet(Runtime, TTestTxConfig::TxTablet0, Sender, evCommit.release());
+
+ TAutoPtr<NActors::IEventHandle> handle;
+ auto event = Runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
+ AFL_VERIFY(event);
+
+ return event->Record.GetStatus();
+}
+
+NKikimrDataEvents::TEvWriteResult::EStatus TShardWriter::Write(
+ const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<ui32>& columnIds, const ui64 txId) {
+ TString blobData = NArrow::SerializeBatchNoCompression(batch);
+// AFL_VERIFY(blobData.size() < NColumnShard::TLimits::GetMaxBlobSize());
+
+ auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+ evWrite->SetTxId(txId);
+ evWrite->SetLockId(LockId, LockNodeId);
+ const ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
+ evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, { OwnerId, PathId, SchemaVersion }, columnIds,
+ payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
+
+ ForwardToTablet(Runtime, TabletId, Sender, evWrite.release());
+
+ TAutoPtr<NActors::IEventHandle> handle;
+ auto event = Runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
+ AFL_VERIFY(event);
+
+ AFL_VERIFY(event->Record.GetOrigin() == TabletId);
+ AFL_VERIFY(event->Record.GetTxId() == LockId);
+
+ return event->Record.GetStatus();
+}
+
+} // namespace NKikimr::NTxUT
diff --git a/ydb/core/tx/columnshard/test_helper/shard_writer.h b/ydb/core/tx/columnshard/test_helper/shard_writer.h
new file mode 100644
index 00000000000..b43e9749a69
--- /dev/null
+++ b/ydb/core/tx/columnshard/test_helper/shard_writer.h
@@ -0,0 +1,43 @@
+#pragma once
+#include <ydb/core/protos/data_events.pb.h>
+#include <ydb/core/testlib/basics/runtime.h>
+
+#include <ydb/library/accessor/accessor.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NTxUT {
+
+class TShardWriter {
+private:
+ TTestBasicRuntime& Runtime;
+ const ui64 TabletId;
+ const ui64 PathId;
+ const ui64 LockId;
+ YDB_ACCESSOR(ui64, SchemaVersion, 1);
+ YDB_ACCESSOR(ui64, OwnerId, 0);
+ YDB_ACCESSOR(ui64, LockNodeId, 1);
+ const TActorId Sender;
+
+public:
+ TShardWriter(TTestBasicRuntime& runtime, const ui64 tabletId, const ui64 pathId, const ui64 lockId)
+ : Runtime(runtime)
+ , TabletId(tabletId)
+ , PathId(pathId)
+ , LockId(lockId)
+ , Sender(Runtime.AllocateEdgeActor())
+ {
+ }
+
+ const TActorId& GetSender() const {
+ return Sender;
+ }
+
+ [[nodiscard]] NKikimrDataEvents::TEvWriteResult::EStatus StartCommit(const ui64 txId);
+ [[nodiscard]] NKikimrDataEvents::TEvWriteResult::EStatus Abort(const ui64 txId);
+
+ [[nodiscard]] NKikimrDataEvents::TEvWriteResult::EStatus Write(
+ const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<ui32>& columnIds, const ui64 txId);
+};
+
+} // namespace NKikimr::NTxUT
diff --git a/ydb/core/tx/columnshard/test_helper/ya.make b/ydb/core/tx/columnshard/test_helper/ya.make
index cab4937293d..d4b96709720 100644
--- a/ydb/core/tx/columnshard/test_helper/ya.make
+++ b/ydb/core/tx/columnshard/test_helper/ya.make
@@ -14,6 +14,8 @@ SRCS(
helper.cpp
controllers.cpp
columnshard_ut_common.cpp
+ shard_reader.cpp
+ shard_writer.cpp
)
IF (OS_WINDOWS)
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index ad5ec1f688f..a0716bd0925 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -1,25 +1,28 @@
-#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>
-#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include <ydb/core/base/blobstorage.h>
-#include <util/string/printf.h>
-#include <arrow/api.h>
-#include <arrow/ipc/reader.h>
-#include <ydb/library/yverify_stream/yverify_stream.h>
+#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
-#include <ydb/core/tx/columnshard/engines/changes/with_appended.h>
-#include <ydb/core/tx/columnshard/engines/changes/compaction.h>
#include <ydb/core/tx/columnshard/engines/changes/cleanup_portions.h>
-#include <ydb/core/tx/columnshard/operations/write_data.h>
+#include <ydb/core/tx/columnshard/engines/changes/compaction.h>
+#include <ydb/core/tx/columnshard/engines/changes/with_appended.h>
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
-#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/operations/write_data.h>
+#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>
#include <ydb/core/tx/columnshard/test_helper/controllers.h>
#include <ydb/core/tx/columnshard/test_helper/shard_reader.h>
+#include <ydb/core/tx/columnshard/test_helper/shard_writer.h>
+
#include <ydb/library/actors/protos/unittests.pb.h>
-#include <ydb/library/formats/arrow/simple_builder/filler.h>
#include <ydb/library/formats/arrow/simple_builder/array.h>
#include <ydb/library/formats/arrow/simple_builder/batch.h>
+#include <ydb/library/formats/arrow/simple_builder/filler.h>
+#include <ydb/library/yverify_stream/yverify_stream.h>
+
+#include <arrow/api.h>
+#include <arrow/ipc/reader.h>
#include <util/string/join.h>
+#include <util/string/printf.h>
namespace NKikimr {
@@ -27,8 +30,7 @@ using namespace NColumnShard;
using namespace Tests;
using namespace NTxUT;
-namespace
-{
+namespace {
namespace NTypeIds = NScheme::NTypeIds;
using TTypeId = NScheme::TTypeId;
@@ -37,8 +39,8 @@ using TTypeInfo = NScheme::TTypeInfo;
using TDefaultTestsController = NKikimr::NYDBTest::NColumnShard::TController;
template <typename TKey = ui64>
-bool DataHas(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::pair<ui64, ui64> range,
- bool requireUniq = false, const std::string& columnName = "timestamp") {
+bool DataHas(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::pair<ui64, ui64> range, bool requireUniq = false,
+ const std::string& columnName = "timestamp") {
static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>;
THashMap<TKey, ui32> keys;
@@ -94,9 +96,8 @@ bool DataHas(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, st
}
template <typename TKey = ui64>
-bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range,
- bool requireUniq = false, const std::string& columnName = "timestamp") {
-
+bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range, bool requireUniq = false,
+ const std::string& columnName = "timestamp") {
auto schema = NArrow::DeserializeSchema(srtSchema);
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
for (auto& blob : blobs) {
@@ -350,7 +351,7 @@ void TestWrite(const TestTableDescription& table) {
const auto& ydbSchema = table.Schema;
- bool ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, ydbSchema), ydbSchema);
+ bool ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, ydbSchema), ydbSchema);
UNIT_ASSERT(ok);
auto schema = ydbSchema;
@@ -364,13 +365,13 @@ void TestWrite(const TestTableDescription& table) {
TTestBlobOptions optsNulls;
optsNulls.NullColumns.emplace("timestamp");
- ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, ydbSchema, optsNulls), ydbSchema);
+ ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, ydbSchema, optsNulls), ydbSchema);
UNIT_ASSERT(!ok);
// missing columns
schema = NArrow::NTest::TTestColumn::CropSchema(schema, 4);
- ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema);
+ ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, schema), schema);
UNIT_ASSERT(ok);
// wrong first key column type (with supported layout: Int64 vs Timestamp)
@@ -378,8 +379,7 @@ void TestWrite(const TestTableDescription& table) {
schema = ydbSchema;
schema[0].SetType(TTypeInfo(NTypeIds::Int64));
- ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema),
- schema);
+ ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, schema), schema);
UNIT_ASSERT(!ok);
// wrong type (no additional schema - fails in case of wrong layout)
@@ -387,7 +387,7 @@ void TestWrite(const TestTableDescription& table) {
for (size_t i = 0; i < ydbSchema.size(); ++i) {
schema = ydbSchema;
schema[i].SetType(TTypeInfo(NTypeIds::Int8));
- ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema);
+ ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, schema), schema);
UNIT_ASSERT(!ok);
}
@@ -396,14 +396,14 @@ void TestWrite(const TestTableDescription& table) {
for (size_t i = 0; i < ydbSchema.size(); ++i) {
schema = ydbSchema;
schema[i].SetType(TTypeInfo(NTypeIds::Int64));
- ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema);
+ ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, schema), schema);
UNIT_ASSERT(ok == (ydbSchema[i].GetType() == TTypeInfo(NTypeIds::Int64)));
}
schema = ydbSchema;
schema[1].SetType(TTypeInfo(NTypeIds::Utf8));
schema[5].SetType(TTypeInfo(NTypeIds::Int32));
- ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema);
+ ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, schema), schema);
UNIT_ASSERT(!ok);
// reordered columns
@@ -415,12 +415,12 @@ void TestWrite(const TestTableDescription& table) {
schema.push_back(NArrow::NTest::TTestColumn(name, typeInfo));
}
- ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema);
+ ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, schema), schema);
UNIT_ASSERT(ok);
// too much data
- TString bigData = MakeTestBlob({0, 150 * 1000}, ydbSchema);
+ TString bigData = MakeTestBlob({ 0, 150 * 1000 }, ydbSchema);
UNIT_ASSERT(bigData.size() > NColumnShard::TLimits::GetMaxBlobSize());
UNIT_ASSERT(bigData.size() < NColumnShard::TLimits::GetMaxBlobSize() + 2 * 1024 * 1024);
ok = WriteData(runtime, sender, writeId++, tableId, bigData, ydbSchema);
@@ -445,7 +445,7 @@ void TestWriteOverload(const TestTableDescription& table) {
SetupSchema(runtime, sender, tableId, table);
- TString testBlob = MakeTestBlob({0, 100 * 1000}, table.Schema);
+ TString testBlob = MakeTestBlob({ 0, 100 * 1000 }, table.Schema);
UNIT_ASSERT(testBlob.size() > NOlap::TCompactionLimits::MAX_BLOB_SIZE / 2);
UNIT_ASSERT(testBlob.size() < NOlap::TCompactionLimits::MAX_BLOB_SIZE);
@@ -489,7 +489,7 @@ void TestWriteOverload(const TestTableDescription& table) {
UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, TTestTxConfig::TxTablet0), (ui32)NKikimrTxColumnShard::EResultStatus::SUCCESS);
}
- UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testBlob, table.Schema)); // OK after overload
+ UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testBlob, table.Schema)); // OK after overload
}
// TODO: Improve test. It does not catch KIKIMR-14890
@@ -514,7 +514,7 @@ void TestWriteReadDup(const TestTableDescription& table = {}) {
SetupSchema(runtime, sender, tableId);
constexpr ui32 numRows = 10;
- std::pair<ui64, ui64> portion = {10, 10 + numRows};
+ std::pair<ui64, ui64> portion = { 10, 10 + numRows };
auto testData = MakeTestBlob(portion, ydbSchema);
TAutoPtr<IEventHandle> handle;
@@ -533,11 +533,11 @@ void TestWriteReadDup(const TestTableDescription& table = {}) {
// read
if (planStep != initPlanStep) {
TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max<ui64>()));
- reader.SetReplyColumns({"timestamp"});
+ reader.SetReplyColumns({ "timestamp" });
auto rb = reader.ReadAll();
UNIT_ASSERT(reader.IsCorrectlyFinished());
UNIT_ASSERT(CheckOrdered(rb));
- UNIT_ASSERT(DataHas({rb}, portion, true));
+ UNIT_ASSERT(DataHas({ rb }, portion, true));
}
}
}
@@ -561,7 +561,7 @@ void TestWriteReadLongTxDup() {
SetupSchema(runtime, sender, tableId);
constexpr ui32 numRows = 10;
- std::pair<ui64, ui64> portion = {10, 10 + numRows};
+ std::pair<ui64, ui64> portion = { 10, 10 + numRows };
NLongTxService::TLongTxId longTxId;
UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));
@@ -573,7 +573,7 @@ void TestWriteReadLongTxDup() {
// Only the first blob with dedup pair {longTx, dedupId} should be inserted
// Others should return OK (write retries emulation)
for (ui32 i = 0; i < 4; ++i) {
- auto data = MakeTestBlob({portion.first + i, portion.second + i}, ydbSchema);
+ auto data = MakeTestBlob({ portion.first + i, portion.second + i }, ydbSchema);
UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, 1, data, ydbSchema);
@@ -584,8 +584,8 @@ void TestWriteReadLongTxDup() {
UNIT_ASSERT_EQUAL(*writeIdOpt, *writeId);
}
- ProposeCommit(runtime, sender, ++txId, {*writeId});
- TSet<ui64> txIds = {txId};
+ ProposeCommit(runtime, sender, ++txId, { *writeId });
+ TSet<ui64> txIds = { txId };
PlanCommit(runtime, sender, planStep, txIds);
// read
@@ -600,8 +600,8 @@ void TestWriteReadLongTxDup() {
Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, TTestSchema::ExtractNames(ydbSchema)));
UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
UNIT_ASSERT(CheckOrdered(rb));
- UNIT_ASSERT(DataHas({rb}, portion, true));
- UNIT_ASSERT(DataHasOnly({rb}, portion));
+ UNIT_ASSERT(DataHas({ rb }, portion, true));
+ UNIT_ASSERT(DataHasOnly({ rb }, portion));
}
}
@@ -622,8 +622,8 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
runtime.DispatchEvents(options);
- auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 writeId, ui64 tableId,
- const TString& data, const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>& intWriteIds) {
+ auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 writeId, ui64 tableId, const TString& data,
+ const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>& intWriteIds) {
bool ok = WriteData(runtime, sender, writeId, tableId, data, ydbSchema, true, &intWriteIds);
if (reboots) {
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
@@ -631,8 +631,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
return ok;
};
- auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 txId,
- const std::vector<ui64>& writeIds) {
+ auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds) {
ProposeCommit(runtime, sender, txId, writeIds);
if (reboots) {
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
@@ -660,12 +659,8 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
// -----xx..
// xx----
// -xxxxx
- std::vector<std::pair<ui64, ui64>> portion = {
- {200, 300},
- {250, 250 + 80 * 1000}, // committed -> index
- {0, 100},
- {50, 300}
- };
+ std::vector<std::pair<ui64, ui64>> portion = { { 200, 300 }, { 250, 250 + 80 * 1000 }, // committed -> index
+ { 0, 100 }, { 50, 300 } };
// write 1: ins:1, cmt:0, idx:0
@@ -678,7 +673,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 1);
TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(0, 1));
- reader.SetReplyColumns({"resource_type"});
+ reader.SetReplyColumns({ "resource_type" });
auto rb = reader.ReadAll();
UNIT_ASSERT(reader.IsCorrectlyFinished());
UNIT_ASSERT_EQUAL(rb, nullptr);
@@ -695,7 +690,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 2);
TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(0, 1));
- reader.SetReplyColumns({"resource_type"});
+ reader.SetReplyColumns({ "resource_type" });
auto rb = reader.ReadAll();
UNIT_ASSERT(reader.IsCorrectlyFinished());
UNIT_ASSERT_EQUAL(rb, nullptr);
@@ -713,14 +708,14 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT(rb->num_rows());
UNIT_ASSERT(reader.IsCorrectlyFinished());
UNIT_ASSERT(CheckOrdered(rb));
- UNIT_ASSERT(DataHas({rb}, portion[0]));
+ UNIT_ASSERT(DataHas({ rb }, portion[0]));
}
// read 4 (column by id)
{
NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 4);
TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
- reader.SetReplyColumnIds({1});
+ reader.SetReplyColumnIds({ 1 });
auto rb = reader.ReadAll();
UNIT_ASSERT(rb);
Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, std::vector<TString>({ "timestamp" })));
@@ -728,14 +723,14 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT(rb->num_rows());
UNIT_ASSERT(reader.IsCorrectlyFinished());
UNIT_ASSERT(CheckOrdered(rb));
- UNIT_ASSERT(DataHas({rb}, portion[0]));
+ UNIT_ASSERT(DataHas({ rb }, portion[0]));
}
// read 5 (2 columns by name)
{
NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 5);
TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
- reader.SetReplyColumns({"timestamp", "message"});
+ reader.SetReplyColumns({ "timestamp", "message" });
auto rb = reader.ReadAll();
UNIT_ASSERT(rb);
Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, std::vector<TString>({ "timestamp", "message" })));
@@ -743,7 +738,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT(rb->num_rows());
UNIT_ASSERT(reader.IsCorrectlyFinished());
UNIT_ASSERT(CheckOrdered(rb));
- UNIT_ASSERT(DataHas({rb}, portion[0]));
+ UNIT_ASSERT(DataHas({ rb }, portion[0]));
}
// write 2 (big portion of data): ins:1, cmt:1, idx:0
@@ -773,7 +768,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
{
NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 6);
TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(0, 1));
- reader.SetReplyColumns({"timestamp", "message"});
+ reader.SetReplyColumns({ "timestamp", "message" });
auto rb = reader.ReadAll();
UNIT_ASSERT(!rb);
UNIT_ASSERT(reader.IsCorrectlyFinished());
@@ -791,9 +786,9 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
UNIT_ASSERT(rb->num_rows());
UNIT_ASSERT(CheckOrdered(rb));
- UNIT_ASSERT(DataHas({rb}, portion[0]));
- UNIT_ASSERT(!DataHas({rb}, portion[1]));
- UNIT_ASSERT(!DataHas({rb}, portion[2]));
+ UNIT_ASSERT(DataHas({ rb }, portion[0]));
+ UNIT_ASSERT(!DataHas({ rb }, portion[1]));
+ UNIT_ASSERT(!DataHas({ rb }, portion[2]));
}
// read 8, planstep 22 (full index)
@@ -808,9 +803,9 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
UNIT_ASSERT(rb->num_rows());
UNIT_ASSERT(CheckOrdered(rb));
- UNIT_ASSERT(DataHas({rb}, portion[0]));
- UNIT_ASSERT(DataHas({rb}, portion[1]));
- UNIT_ASSERT(!DataHas({rb}, portion[2]));
+ UNIT_ASSERT(DataHas({ rb }, portion[0]));
+ UNIT_ASSERT(DataHas({ rb }, portion[1]));
+ UNIT_ASSERT(!DataHas({ rb }, portion[2]));
}
// commit 3: ins:0, cmt:1, idx:1
@@ -838,10 +833,10 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
UNIT_ASSERT(rb->num_rows());
UNIT_ASSERT(CheckOrdered(rb));
- UNIT_ASSERT(DataHas({rb}, portion[0]));
- UNIT_ASSERT(DataHas({rb}, portion[1]));
- UNIT_ASSERT(DataHas({rb}, portion[2]));
- UNIT_ASSERT(!DataHas({rb}, portion[3]));
+ UNIT_ASSERT(DataHas({ rb }, portion[0]));
+ UNIT_ASSERT(DataHas({ rb }, portion[1]));
+ UNIT_ASSERT(DataHas({ rb }, portion[2]));
+ UNIT_ASSERT(!DataHas({ rb }, portion[3]));
}
// commit 4: ins:0, cmt:2, idx:1 (with duplicates in PK)
@@ -863,11 +858,11 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
UNIT_ASSERT(rb->num_rows());
UNIT_ASSERT(CheckOrdered(rb));
- UNIT_ASSERT(DataHas({rb}, portion[0]));
- UNIT_ASSERT(DataHas({rb}, portion[1]));
- UNIT_ASSERT(DataHas({rb}, portion[2]));
- UNIT_ASSERT(DataHas({rb}, portion[3]));
- UNIT_ASSERT(DataHas({rb}, {0, 500}, true));
+ UNIT_ASSERT(DataHas({ rb }, portion[0]));
+ UNIT_ASSERT(DataHas({ rb }, portion[1]));
+ UNIT_ASSERT(DataHas({ rb }, portion[2]));
+ UNIT_ASSERT(DataHas({ rb }, portion[3]));
+ UNIT_ASSERT(DataHas({ rb }, { 0, 500 }, true));
const ui64 compactedBytes = reader.GetReadStat("compacted_bytes");
const ui64 insertedBytes = reader.GetReadStat("inserted_bytes");
@@ -896,13 +891,12 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
}
}
-
// read 11 (range predicate: closed interval)
{
NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 11);
TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(24, txId));
reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema));
- reader.AddRange(MakeTestRange({10, 42}, true, true, testYdbPk));
+ reader.AddRange(MakeTestRange({ 10, 42 }, true, true, testYdbPk));
auto rb = reader.ReadAll();
UNIT_ASSERT(rb);
UNIT_ASSERT(reader.IsCorrectlyFinished());
@@ -910,8 +904,8 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
UNIT_ASSERT(rb->num_rows());
UNIT_ASSERT(CheckOrdered(rb));
- UNIT_ASSERT(DataHas({rb}, {10, 42 + 1}));
- UNIT_ASSERT(DataHasOnly({rb}, {10, 42 + 1}));
+ UNIT_ASSERT(DataHas({ rb }, { 10, 42 + 1 }));
+ UNIT_ASSERT(DataHasOnly({ rb }, { 10, 42 + 1 }));
}
// read 12 (range predicate: open interval)
@@ -919,7 +913,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 11);
TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(24, txId));
reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema));
- reader.AddRange(MakeTestRange({10, 42}, false, false, testYdbPk));
+ reader.AddRange(MakeTestRange({ 10, 42 }, false, false, testYdbPk));
auto rb = reader.ReadAll();
UNIT_ASSERT(rb);
UNIT_ASSERT(reader.IsCorrectlyFinished());
@@ -927,8 +921,8 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
UNIT_ASSERT(rb->num_rows());
UNIT_ASSERT(CheckOrdered(rb));
- UNIT_ASSERT(DataHas({rb}, {10 + 1, 41 + 1}));
- UNIT_ASSERT(DataHasOnly({rb}, {10 + 1, 41 + 1}));
+ UNIT_ASSERT(DataHas({ rb }, { 10 + 1, 41 + 1 }));
+ UNIT_ASSERT(DataHasOnly({ rb }, { 10 + 1, 41 + 1 }));
}
}
@@ -944,8 +938,8 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
runtime.DispatchEvents(options);
- auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 writeId, ui64 tableId,
- const TString& data, const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>& writeIds) {
+ auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 writeId, ui64 tableId, const TString& data,
+ const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>& writeIds) {
bool ok = WriteData(runtime, sender, writeId, tableId, data, ydbSchema, true, &writeIds);
if (reboots) {
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
@@ -953,8 +947,7 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table
return ok;
};
- auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 txId,
- const std::vector<ui64>& writeIds) {
+ auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds) {
ProposeCommit(runtime, sender, txId, writeIds);
if (reboots) {
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
@@ -983,14 +976,14 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table
// Write same keys: merge on compaction
static const ui32 triggerPortionSize = 75 * 1000;
- std::pair<ui64, ui64> triggerPortion = {0, triggerPortionSize};
+ std::pair<ui64, ui64> triggerPortion = { 0, triggerPortionSize };
TString triggerData = MakeTestBlob(triggerPortion, ydbSchema);
UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::GetMaxBlobSize());
static const ui32 portionSize = 1;
- ui32 numWrites = NColumnShard::TLimits::MIN_SMALL_BLOBS_TO_INSERT; // trigger InsertTable -> Index
+ ui32 numWrites = NColumnShard::TLimits::MIN_SMALL_BLOBS_TO_INSERT; // trigger InsertTable -> Index
// inserts triggered by count
ui32 pos = triggerPortionSize;
@@ -998,7 +991,7 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table
std::vector<ui64> ids;
ids.reserve(numWrites);
for (ui32 w = 0; w < numWrites; ++w, ++writeId, pos += portionSize) {
- std::pair<ui64, ui64> portion = {pos, pos + portionSize};
+ std::pair<ui64, ui64> portion = { pos, pos + portionSize };
TString data = MakeTestBlob(portion, ydbSchema);
UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, data, ydbSchema, true, &ids));
@@ -1011,7 +1004,7 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table
proposeCommit(runtime, sender, txId, ids);
planCommit(runtime, sender, planStep, txId);
}
- std::pair<ui64, ui64> smallWrites = {triggerPortionSize, pos};
+ std::pair<ui64, ui64> smallWrites = { triggerPortionSize, pos };
// inserts triggered by size
NOlap::TCompactionLimits engineLimits;
@@ -1031,17 +1024,17 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table
for (ui32 i = 0; i < 2; ++i) {
TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
- reader.SetReplyColumns({"timestamp", "message"});
+ reader.SetReplyColumns({ "timestamp", "message" });
auto rb = reader.ReadAll();
UNIT_ASSERT(rb);
UNIT_ASSERT(reader.IsCorrectlyFinished());
if (ydbPk[0].GetType() == TTypeInfo(NTypeIds::String) || ydbPk[0].GetType() == TTypeInfo(NTypeIds::Utf8)) {
- UNIT_ASSERT(DataHas<std::string>({rb}, triggerPortion, true));
- UNIT_ASSERT(DataHas<std::string>({rb}, smallWrites, true));
+ UNIT_ASSERT(DataHas<std::string>({ rb }, triggerPortion, true));
+ UNIT_ASSERT(DataHas<std::string>({ rb }, smallWrites, true));
} else {
- UNIT_ASSERT(DataHas({rb}, triggerPortion, true));
- UNIT_ASSERT(DataHas({rb}, smallWrites, true));
+ UNIT_ASSERT(DataHas({ rb }, triggerPortion, true));
+ UNIT_ASSERT(DataHas({ rb }, smallWrites, true));
}
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
}
@@ -1054,7 +1047,7 @@ using TAggAssignment = NKikimrSSA::TProgram::TAggregateAssignment;
static NKikimrSSA::TProgram MakeSelect(TAssignment::EFunction compareId = TAssignment::FUNC_CMP_EQUAL) {
NKikimrSSA::TProgram ssa;
- std::vector<ui32> columnIds = {1, 9, 5};
+ std::vector<ui32> columnIds = { 1, 9, 5 };
ui32 tmpColumnId = 100;
auto* line1 = ssa.AddCommand();
@@ -1078,7 +1071,7 @@ static NKikimrSSA::TProgram MakeSelect(TAssignment::EFunction compareId = TAssig
static NKikimrSSA::TProgram MakeSelectLike(TAssignment::EFunction likeId, const TString& pattern) {
NKikimrSSA::TProgram ssa;
- std::vector<ui32> columnIds = {6}; // message
+ std::vector<ui32> columnIds = { 6 }; // message
auto* line1 = ssa.AddCommand();
auto* l1_assign = line1->MutableAssign();
@@ -1102,9 +1095,7 @@ static NKikimrSSA::TProgram MakeSelectLike(TAssignment::EFunction likeId, const
}
// SELECT min(x), max(x), some(x), count(x) FROM t [GROUP BY key[0], key[1], ...]
-NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId, const std::vector<ui32>& keys = {},
- bool addProjection = true)
-{
+NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId, const std::vector<ui32>& keys = {}, bool addProjection = true) {
NKikimrSSA::TProgram ssa;
auto* line1 = ssa.AddCommand();
@@ -1150,10 +1141,8 @@ NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId, const std::vector<ui32>
}
// SELECT min(x), max(x), some(x), count(x) FROM t WHERE y = 1 [GROUP BY key[0], key[1], ...]
-NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(ui32 columnId, ui32 filterColumnId,
- const std::vector<ui32>& keys = {},
- bool addProjection = true)
-{
+NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(
+ ui32 columnId, ui32 filterColumnId, const std::vector<ui32>& keys = {}, bool addProjection = true) {
NKikimrSSA::TProgram ssa;
auto* line1 = ssa.AddCommand();
@@ -1218,8 +1207,7 @@ NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(ui32 columnId, ui32 filterCo
return ssa;
}
-void TestReadWithProgram(const TestTableDescription& table = {})
-{
+void TestReadWithProgram(const TestTableDescription& table = {}) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
@@ -1238,9 +1226,9 @@ void TestReadWithProgram(const TestTableDescription& table = {})
SetupSchema(runtime, sender, tableId, table);
- { // write some data
+ { // write some data
std::vector<ui64> writeIds;
- bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({0, 100}, table.Schema), table.Schema, true, &writeIds);
+ bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({ 0, 100 }, table.Schema), table.Schema, true, &writeIds);
UNIT_ASSERT(ok);
ProposeCommit(runtime, sender, txId, writeIds);
@@ -1290,7 +1278,7 @@ void TestReadWithProgram(const TestTableDescription& table = {})
UNIT_ASSERT(rb->num_rows());
Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, std::vector<TString>({ "level", "timestamp" })));
UNIT_ASSERT(rb->num_columns() == 2);
- UNIT_ASSERT(DataHas({rb}, {0, 100}, true));
+ UNIT_ASSERT(DataHas({ rb }, { 0, 100 }, true));
break;
case 2:
UNIT_ASSERT(!rb || !rb->num_rows());
@@ -1309,8 +1297,7 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) {
auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
TActorId sender = runtime.AllocateEdgeActor();
- CreateTestBootstrapper(runtime,
- CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
+ CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
TDispatchOptions options;
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
@@ -1323,9 +1310,9 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) {
SetupSchema(runtime, sender, tableId, table);
- { // write some data
+ { // write some data
std::vector<ui64> writeIds;
- bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({0, 100}, table.Schema), table.Schema, true, &writeIds);
+ bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({ 0, 100 }, table.Schema), table.Schema, true, &writeIds);
UNIT_ASSERT(ok);
ProposeCommit(runtime, sender, txId, writeIds);
@@ -1333,14 +1320,10 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) {
}
TString pattern = "1";
- std::vector<NKikimrSSA::TProgram> ssas = {
- MakeSelectLike(TAssignment::FUNC_STR_MATCH, pattern),
- MakeSelectLike(TAssignment::FUNC_STR_MATCH_IGNORE_CASE, pattern),
- MakeSelectLike(TAssignment::FUNC_STR_STARTS_WITH, pattern),
- MakeSelectLike(TAssignment::FUNC_STR_STARTS_WITH_IGNORE_CASE, pattern),
- MakeSelectLike(TAssignment::FUNC_STR_ENDS_WITH, pattern),
- MakeSelectLike(TAssignment::FUNC_STR_ENDS_WITH_IGNORE_CASE, pattern)
- };
+ std::vector<NKikimrSSA::TProgram> ssas = { MakeSelectLike(TAssignment::FUNC_STR_MATCH, pattern),
+ MakeSelectLike(TAssignment::FUNC_STR_MATCH_IGNORE_CASE, pattern), MakeSelectLike(TAssignment::FUNC_STR_STARTS_WITH, pattern),
+ MakeSelectLike(TAssignment::FUNC_STR_STARTS_WITH_IGNORE_CASE, pattern), MakeSelectLike(TAssignment::FUNC_STR_ENDS_WITH, pattern),
+ MakeSelectLike(TAssignment::FUNC_STR_ENDS_WITH_IGNORE_CASE, pattern) };
ui32 i = 0;
for (auto& ssa : ssas) {
@@ -1355,15 +1338,15 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) {
switch (i) {
case 0:
case 1:
- UNIT_ASSERT(CheckColumns(rb, {"message"}, 19));
+ UNIT_ASSERT(CheckColumns(rb, { "message" }, 19));
break;
case 2:
case 3:
- UNIT_ASSERT(CheckColumns(rb, {"message"}, 11));
+ UNIT_ASSERT(CheckColumns(rb, { "message" }, 11));
break;
case 4:
case 5:
- UNIT_ASSERT(CheckColumns(rb, {"message"}, 10));
+ UNIT_ASSERT(CheckColumns(rb, { "message" }, 10));
break;
default:
break;
@@ -1391,9 +1374,9 @@ void TestSomePrograms(const TestTableDescription& table) {
SetupSchema(runtime, sender, tableId, table);
- { // write some data
+ { // write some data
std::vector<ui64> writeIds;
- bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({0, 100}, table.Schema), table.Schema, true, &writeIds);
+ bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({ 0, 100 }, table.Schema), table.Schema, true, &writeIds);
UNIT_ASSERT(ok);
ProposeCommit(runtime, sender, txId, writeIds);
@@ -1426,15 +1409,14 @@ void TestSomePrograms(const TestTableDescription& table) {
struct TReadAggregateResult {
ui32 NumRows = 1;
- std::vector<int64_t> MinValues = {0};
- std::vector<int64_t> MaxValues = {99};
- std::vector<int64_t> Counts = {100};
+ std::vector<int64_t> MinValues = { 0 };
+ std::vector<int64_t> MaxValues = { 99 };
+ std::vector<int64_t> Counts = { 100 };
};
-void TestReadAggregate(const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, const TString& testDataBlob,
- bool addProjection, const std::vector<ui32>& aggKeys = {},
- const TReadAggregateResult& expectedResult = {},
- const TReadAggregateResult& expectedFiltered = {1, {1}, {1}, {1}}) {
+void TestReadAggregate(const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, const TString& testDataBlob, bool addProjection,
+ const std::vector<ui32>& aggKeys = {}, const TReadAggregateResult& expectedResult = {},
+ const TReadAggregateResult& expectedFiltered = { 1, { 1 }, { 1 }, { 1 } }) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
@@ -1452,10 +1434,10 @@ void TestReadAggregate(const std::vector<NArrow::NTest::TTestColumn>& ydbSchema,
ui64 txId = 100;
auto pk = NArrow::NTest::TTestColumn::CropSchema(ydbSchema, 4);
- TestTableDescription table{.Schema = ydbSchema, .Pk = pk};
+ TestTableDescription table{ .Schema = ydbSchema, .Pk = pk };
SetupSchema(runtime, sender, tableId, table);
- { // write some data
+ { // write some data
std::vector<ui64> writeIds;
bool ok = WriteData(runtime, sender, writeId, tableId, testDataBlob, table.Schema, true, &writeIds);
UNIT_ASSERT(ok);
@@ -1469,11 +1451,9 @@ void TestReadAggregate(const std::vector<NArrow::NTest::TTestColumn>& ydbSchema,
std::vector<TString> programs;
THashSet<ui32> isFiltered;
THashSet<ui32> checkResult;
- THashSet<NScheme::TTypeId> intTypes = {
- NTypeIds::Int8, NTypeIds::Int16, NTypeIds::Int32, NTypeIds::Int64,
- NTypeIds::Uint8, NTypeIds::Uint16, NTypeIds::Uint32, NTypeIds::Uint64,
- NTypeIds::Timestamp, NTypeIds::Date32, NTypeIds::Datetime64, NTypeIds::Timestamp64, NTypeIds::Interval64
- };
+ THashSet<NScheme::TTypeId> intTypes = { NTypeIds::Int8, NTypeIds::Int16, NTypeIds::Int32, NTypeIds::Int64, NTypeIds::Uint8, NTypeIds::Uint16,
+ NTypeIds::Uint32, NTypeIds::Uint64, NTypeIds::Timestamp, NTypeIds::Date32, NTypeIds::Datetime64, NTypeIds::Timestamp64,
+ NTypeIds::Interval64 };
THashSet<NScheme::TTypeId> strTypes = {
NTypeIds::Utf8, NTypeIds::String
//NTypeIds::Yson, NTypeIds::Json, NTypeIds::JsonDocument
@@ -1481,8 +1461,7 @@ void TestReadAggregate(const std::vector<NArrow::NTest::TTestColumn>& ydbSchema,
ui32 prog = 0;
for (ui32 i = 0; i < ydbSchema.size(); ++i, ++prog) {
- if (intTypes.contains(ydbSchema[i].GetType().GetTypeId()) ||
- strTypes.contains(ydbSchema[i].GetType().GetTypeId())) {
+ if (intTypes.contains(ydbSchema[i].GetType().GetTypeId()) || strTypes.contains(ydbSchema[i].GetType().GetTypeId())) {
checkResult.insert(prog);
}
@@ -1498,8 +1477,7 @@ void TestReadAggregate(const std::vector<NArrow::NTest::TTestColumn>& ydbSchema,
for (ui32 i = 0; i < ydbSchema.size(); ++i, ++prog) {
isFiltered.insert(prog);
- if (intTypes.contains(ydbSchema[i].GetType().GetTypeId()) ||
- strTypes.contains(ydbSchema[i].GetType().GetTypeId())) {
+ if (intTypes.contains(ydbSchema[i].GetType().GetTypeId()) || strTypes.contains(ydbSchema[i].GetType().GetTypeId())) {
checkResult.insert(prog);
}
@@ -1513,8 +1491,8 @@ void TestReadAggregate(const std::vector<NArrow::NTest::TTestColumn>& ydbSchema,
UNIT_ASSERT(program.SerializeToString(&programs.back()));
}
- std::vector<TString> namedColumns = {"res_min", "res_max", "res_some", "res_count"};
- std::vector<TString> unnamedColumns = {"100", "101", "102", "103"};
+ std::vector<TString> namedColumns = { "res_min", "res_max", "res_some", "res_count" };
+ std::vector<TString> unnamedColumns = { "100", "101", "102", "103" };
if (!addProjection) {
for (auto& key : aggKeys) {
namedColumns.push_back(ydbSchema[key].GetName());
@@ -1534,7 +1512,7 @@ void TestReadAggregate(const std::vector<NArrow::NTest::TTestColumn>& ydbSchema,
if (checkResult.contains(prog)) {
if (isFiltered.contains(prog)) {
UNIT_ASSERT(CheckColumns(batch, namedColumns, expectedFiltered.NumRows));
- if (aggKeys.empty()) { // TODO: ORDER BY for compare
+ if (aggKeys.empty()) { // TODO: ORDER BY for compare
UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_min"), expectedFiltered.MinValues));
UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_max"), expectedFiltered.MaxValues));
UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_some"), expectedFiltered.MinValues));
@@ -1542,7 +1520,7 @@ void TestReadAggregate(const std::vector<NArrow::NTest::TTestColumn>& ydbSchema,
UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_count"), expectedFiltered.Counts));
} else {
UNIT_ASSERT(CheckColumns(batch, unnamedColumns, expectedResult.NumRows));
- if (aggKeys.empty()) { // TODO: ORDER BY for compare
+ if (aggKeys.empty()) { // TODO: ORDER BY for compare
UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("100"), expectedResult.MinValues));
UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("101"), expectedResult.MaxValues));
UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("102"), expectedResult.MinValues));
@@ -1555,10 +1533,9 @@ void TestReadAggregate(const std::vector<NArrow::NTest::TTestColumn>& ydbSchema,
}
}
-}
+} // namespace
Y_UNIT_TEST_SUITE(EvWrite) {
-
Y_UNIT_TEST(WriteInTransaction) {
using namespace NArrow;
@@ -1566,48 +1543,37 @@ Y_UNIT_TEST_SUITE(EvWrite) {
TTester::Setup(runtime);
auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
- const ui64 ownerId = 0;
const ui64 tableId = 1;
- const ui64 schemaVersion = 1;
- const std::vector<NArrow::NTest::TTestColumn> schema = {
- NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64)),
- NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8))
- };
- const std::vector<ui32> columnsIds = {1, 2};
+ const std::vector<NArrow::NTest::TTestColumn> schema = { NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64)),
+ NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) };
+ const std::vector<ui32> columnsIds = { 1, 2 };
PrepareTablet(runtime, tableId, schema);
const ui64 txId = 111;
- NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key");
+ NConstruction::IArrayBuilder::TPtr keyColumn =
+ std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key");
NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
"field", NConstruction::TStringPoolFiller(8, 100));
auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048);
- TString blobData = NArrow::SerializeBatchNoCompression(batch);
- UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize());
-
- auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(NKikimrDataEvents::TEvWrite::MODE_PREPARE);
- evWrite->SetTxId(txId);
- ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
- evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
- TActorId sender = runtime.AllocateEdgeActor();
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
+ NTxUT::TShardWriter writer(runtime, TTestTxConfig::TxTablet0, tableId, 222);
+ AFL_VERIFY(writer.Write(batch, {1, 2}, txId) == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
+ AFL_VERIFY(writer.StartCommit(txId) == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
{
- TAutoPtr<NActors::IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
- UNIT_ASSERT(event);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), txId);
- UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
+ NTxUT::TShardWriter writer(runtime, TTestTxConfig::TxTablet0, tableId, 444);
+ AFL_VERIFY(writer.StartCommit(444) == NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
+ }
+ {
auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(10, txId), schema);
UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 0);
- PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId));
+ PlanWriteTx(runtime, writer.GetSender(), NOlap::TSnapshot(11, txId));
}
- auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema);
+ auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot::MaxForPlanStep(11), schema);
UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 2048);
}
@@ -1618,46 +1584,26 @@ Y_UNIT_TEST_SUITE(EvWrite) {
TTester::Setup(runtime);
auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
-
- const ui64 ownerId = 0;
const ui64 tableId = 1;
- const ui64 schemaVersion = 1;
- const std::vector<NArrow::NTest::TTestColumn> schema = {
- NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64)),
- NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8))
- };
- const std::vector<ui32> columnsIds = {1, 2};
+ const std::vector<NArrow::NTest::TTestColumn> schema = { NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64)),
+ NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) };
+ const std::vector<ui32> columnsIds = { 1, 2 };
PrepareTablet(runtime, tableId, schema);
const ui64 txId = 111;
- NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key");
+ NConstruction::IArrayBuilder::TPtr keyColumn =
+ std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key");
NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
"field", NConstruction::TStringPoolFiller(8, 100));
auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048);
- TString blobData = NArrow::SerializeBatchNoCompression(batch);
- UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize());
+ NTxUT::TShardWriter writer(runtime, TTestTxConfig::TxTablet0, tableId, 222);
+ AFL_VERIFY(writer.Write(batch, {1, 2}, txId) == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
+ AFL_VERIFY(writer.Abort(txId) == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
- auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(NKikimrDataEvents::TEvWrite::MODE_PREPARE);
- evWrite->SetTxId(txId);
- ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
- evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
+ PlanWriteTx(runtime, writer.GetSender(), NOlap::TSnapshot(10, txId + 1), false);
- TActorId sender = runtime.AllocateEdgeActor();
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
-
- ui64 outdatedStep = 11;
- {
- TAutoPtr<NActors::IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
- UNIT_ASSERT(event);
- UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
-
- outdatedStep = event->Record.GetMaxStep() + 1;
- PlanWriteTx(runtime, sender, NOlap::TSnapshot(outdatedStep, txId + 1), false);
- }
-
- auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(outdatedStep, txId), schema);
+ auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot::MaxForPlanStep(10), schema);
UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 0);
}
@@ -1668,19 +1614,14 @@ Y_UNIT_TEST_SUITE(EvWrite) {
TTester::Setup(runtime);
auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
-
- const ui64 ownerId = 0;
const ui64 tableId = 1;
- const ui64 schemaVersion = 1;
- const std::vector<NArrow::NTest::TTestColumn> schema = {
- NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64)),
- NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8))
- };
- const std::vector<ui32> columnsIds = {1, 2};
+ const std::vector<NArrow::NTest::TTestColumn> schema = { NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64)),
+ NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) };
PrepareTablet(runtime, tableId, schema);
const ui64 txId = 111;
- NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key");
+ NConstruction::IArrayBuilder::TPtr keyColumn =
+ std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key");
NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
"field", NConstruction::TStringPoolFiller(8, TLimits::GetMaxBlobSize() / 1024));
@@ -1688,23 +1629,13 @@ Y_UNIT_TEST_SUITE(EvWrite) {
TString blobData = NArrow::SerializeBatchNoCompression(batch);
UNIT_ASSERT(blobData.size() > TLimits::GetMaxBlobSize());
- auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(NKikimrDataEvents::TEvWrite::MODE_PREPARE);
- evWrite->SetTxId(txId);
- ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
- evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
-
- TActorId sender = runtime.AllocateEdgeActor();
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
+ NTxUT::TShardWriter writer(runtime, TTestTxConfig::TxTablet0, tableId, 222);
+ AFL_VERIFY(writer.Write(batch, {1, 2}, txId) == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
+ AFL_VERIFY(writer.StartCommit(txId) == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
- {
- TAutoPtr<NActors::IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
- UNIT_ASSERT(event);
- UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
- PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId));
- }
+ PlanWriteTx(runtime, writer.GetSender(), NOlap::TSnapshot(11, txId));
- auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema);
+ auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot::MaxForPlanStep(11), schema);
UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 2048);
}
@@ -1715,93 +1646,44 @@ Y_UNIT_TEST_SUITE(EvWrite) {
TTester::Setup(runtime);
auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
-
- const ui64 ownerId = 0;
const ui64 tableId = 1;
- const ui64 schemaVersion = 1;
- const std::vector<NArrow::NTest::TTestColumn> schema = {
- NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64) ),
- NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8) )
- };
- const std::vector<ui32> columnsIds = {1, 2};
+ const std::vector<NArrow::NTest::TTestColumn> schema = { NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64)),
+ NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) };
+ const std::vector<ui32> columnIds = { 1, 2 };
PrepareTablet(runtime, tableId, schema);
const ui64 txId = 111;
- const ui64 lockId = 110;
+ NTxUT::TShardWriter writer(runtime, TTestTxConfig::TxTablet0, tableId, 222);
{
- NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key");
- NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>("field", NConstruction::TStringPoolFiller(8, 100));
+ NConstruction::IArrayBuilder::TPtr keyColumn =
+ std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key");
+ NConstruction::IArrayBuilder::TPtr column =
+ std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
+ "field", NConstruction::TStringPoolFiller(8, 100));
auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048);
- TString blobData = NArrow::SerializeBatchNoCompression(batch);
- UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize());
- auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
- evWrite->SetLockId(lockId, 1);
-
- ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
- evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
-
- TActorId sender = runtime.AllocateEdgeActor();
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
-
+ AFL_VERIFY(writer.Write(batch, columnIds, txId) == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
{
- TAutoPtr<NActors::IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
- UNIT_ASSERT(event);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), lockId);
- UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
-
- auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(10, lockId), schema);
+ auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(10, txId), schema);
UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 0);
}
}
{
- NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key", 2049);
- NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>("field", NConstruction::TStringPoolFiller(8, 100));
+ NConstruction::IArrayBuilder::TPtr keyColumn =
+ std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key", 2049);
+ NConstruction::IArrayBuilder::TPtr column =
+ std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
+ "field", NConstruction::TStringPoolFiller(8, 100));
auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048);
- TString blobData = NArrow::SerializeBatchNoCompression(batch);
- UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize());
- auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
- evWrite->SetLockId(lockId, 1);
-
- ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
- evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
-
- TActorId sender = runtime.AllocateEdgeActor();
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
+ AFL_VERIFY(writer.Write(batch, columnIds, txId) == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
{
- TAutoPtr<NActors::IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
- UNIT_ASSERT(event);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), lockId);
- UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
-
auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(10, txId), schema);
UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 0);
}
}
{
- auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(NKikimrDataEvents::TEvWrite::MODE_PREPARE);
- evWrite->SetTxId(txId);
- evWrite->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
- auto* lock = evWrite->Record.MutableLocks()->AddLocks();
- lock->SetLockId(lockId);
-
- TActorId sender = runtime.AllocateEdgeActor();
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
-
- {
- TAutoPtr<NActors::IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
- UNIT_ASSERT(event);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), txId);
- }
-
- PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId));
+ AFL_VERIFY(writer.StartCommit(txId) == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
+ PlanWriteTx(runtime, writer.GetSender(), NOlap::TSnapshot(11, txId));
}
auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema);
@@ -1881,7 +1763,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
{
TSet<ui64> txIds;
std::vector<ui64> writeIds;
- UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Update));
+ UNIT_ASSERT(
+ WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Update));
ProposeCommit(runtime, sender, ++txId, writeIds);
txIds.insert(txId);
PlanCommit(runtime, sender, planStep, txIds);
@@ -1896,7 +1779,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
{
TSet<ui64> txIds;
std::vector<ui64> writeIds;
- UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Insert));
+ UNIT_ASSERT(
+ WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Insert));
ProposeCommit(runtime, sender, ++txId, writeIds);
txIds.insert(txId);
PlanCommit(runtime, sender, planStep, txIds);
@@ -1912,7 +1796,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
{
TSet<ui64> txIds;
std::vector<ui64> writeIds;
- UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Upsert));
+ UNIT_ASSERT(
+ WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Upsert));
ProposeCommit(runtime, sender, ++txId, writeIds);
txIds.insert(txId);
PlanCommit(runtime, sender, planStep, txIds);
@@ -1928,7 +1813,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
{
TSet<ui64> txIds;
std::vector<ui64> writeIds;
- UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Update));
+ UNIT_ASSERT(
+ WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Update));
ProposeCommit(runtime, sender, ++txId, writeIds);
txIds.insert(txId);
PlanCommit(runtime, sender, planStep, txIds);
@@ -1944,12 +1830,14 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
{
TSet<ui64> txIds;
std::vector<ui64> writeIds;
- UNIT_ASSERT(!WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Insert));
+ UNIT_ASSERT(
+ !WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Insert));
}
{
TSet<ui64> txIds;
std::vector<ui64> writeIds;
- UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Delete));
+ UNIT_ASSERT(
+ WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Delete));
ProposeCommit(runtime, sender, ++txId, writeIds);
txIds.insert(txId);
PlanCommit(runtime, sender, planStep, txIds);
@@ -2011,7 +1899,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
schema[0].SetType(TTypeInfo(typeId));
pk[0].SetType(TTypeInfo(typeId));
- TestTableDescription table{.Schema = schema, .Pk = pk};
+ TestTableDescription table{ .Schema = schema, .Pk = pk };
TestCompactionInGranuleImpl(reboot, table);
}
@@ -2047,7 +1935,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
TestCompactionInGranule(false, NTypeIds::Datetime);
}
-
Y_UNIT_TEST(CompactionInGranule_PKString_Reboot) {
TestCompactionInGranule(true, NTypeIds::String);
}
@@ -2090,16 +1977,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
Y_UNIT_TEST(ReadSomePrograms) {
TestTableDescription table;
- table.Schema = {
- NArrow::NTest::TTestColumn("timestamp", TTypeInfo(NTypeIds::Timestamp) ),
- NArrow::NTest::TTestColumn("resource_id", TTypeInfo(NTypeIds::Utf8) ),
- NArrow::NTest::TTestColumn("uid", TTypeInfo(NTypeIds::Utf8) ),
- NArrow::NTest::TTestColumn("level", TTypeInfo(NTypeIds::Int32) ),
- NArrow::NTest::TTestColumn("message", TTypeInfo(NTypeIds::Utf8) )
- };
- table.Pk = {
- NArrow::NTest::TTestColumn("timestamp", TTypeInfo(NTypeIds::Timestamp) )
- };
+ table.Schema = { NArrow::NTest::TTestColumn("timestamp", TTypeInfo(NTypeIds::Timestamp)),
+ NArrow::NTest::TTestColumn("resource_id", TTypeInfo(NTypeIds::Utf8)), NArrow::NTest::TTestColumn("uid", TTypeInfo(NTypeIds::Utf8)),
+ NArrow::NTest::TTestColumn("level", TTypeInfo(NTypeIds::Int32)), NArrow::NTest::TTestColumn("message", TTypeInfo(NTypeIds::Utf8)) };
+ table.Pk = { NArrow::NTest::TTestColumn("timestamp", TTypeInfo(NTypeIds::Timestamp)) };
TestSomePrograms(table);
}
@@ -2122,14 +2003,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
counts.push_back(1);
}
- THashSet<NScheme::TTypeId> sameValTypes = {
- NTypeIds::Yson, NTypeIds::Json, NTypeIds::JsonDocument
- };
+ THashSet<NScheme::TTypeId> sameValTypes = { NTypeIds::Yson, NTypeIds::Json, NTypeIds::JsonDocument };
// TODO: query needs normalization to compare with expected
TReadAggregateResult resDefault = { 100, {}, {}, counts };
- TReadAggregateResult resFiltered = { 1, {}, {}, {1} };
- TReadAggregateResult resGrouped = { 1, {}, {}, {100} };
+ TReadAggregateResult resFiltered = { 1, {}, {}, { 1 } };
+ TReadAggregateResult resGrouped = { 1, {}, {}, { 100 } };
for (ui32 key = 0; key < schema.size(); ++key) {
Cerr << "-- group by key: " << key << "\n";
@@ -2143,8 +2022,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
for (ui32 key = 0; key < schema.size() - 1; ++key) {
Cerr << "-- group by key: " << key << ", " << key + 1 << "\n";
- if (sameValTypes.contains(schema[key].GetType().GetTypeId()) &&
- sameValTypes.contains(schema[key + 1].GetType().GetTypeId())) {
+ if (sameValTypes.contains(schema[key].GetType().GetTypeId()) && sameValTypes.contains(schema[key + 1].GetType().GetTypeId())) {
TestReadAggregate(schema, testBlob, (key % 2), { key, key + 1 }, resGrouped, resFiltered);
} else {
TestReadAggregate(schema, testBlob, (key % 2), { key, key + 1 }, resDefault, resFiltered);
@@ -2152,8 +2030,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
for (ui32 key = 0; key < schema.size() - 2; ++key) {
Cerr << "-- group by key: " << key << ", " << key + 1 << ", " << key + 2 << "\n";
- if (sameValTypes.contains(schema[key].GetType().GetTypeId()) &&
- sameValTypes.contains(schema[key + 1].GetType().GetTypeId()) &&
+ if (sameValTypes.contains(schema[key].GetType().GetTypeId()) && sameValTypes.contains(schema[key + 1].GetType().GetTypeId()) &&
sameValTypes.contains(schema[key + 1].GetType().GetTypeId())) {
TestReadAggregate(schema, testBlob, (key % 2), { key, key + 1, key + 2 }, resGrouped, resFiltered);
} else {
@@ -2170,12 +2047,13 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
const std::vector<NArrow::NTest::TTestColumn> YdbPk;
public:
- TTabletReadPredicateTest(TTestBasicRuntime& runtime, const ui64 planStep, const ui64 txId, const std::vector<NArrow::NTest::TTestColumn>& ydbPk)
+ TTabletReadPredicateTest(
+ TTestBasicRuntime& runtime, const ui64 planStep, const ui64 txId, const std::vector<NArrow::NTest::TTestColumn>& ydbPk)
: Runtime(runtime)
, PlanStep(planStep)
, TxId(txId)
- , YdbPk(ydbPk)
- {}
+ , YdbPk(ydbPk) {
+ }
class TBorder {
private:
@@ -2185,14 +2063,15 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
public:
TBorder(const std::vector<ui32>& values, const bool include = false)
: Border(values)
- , Include(include)
- {}
+ , Include(include) {
+ }
- bool GetInclude() const noexcept { return Include; }
+ bool GetInclude() const noexcept {
+ return Include;
+ }
- std::vector<TCell> GetCellVec(const std::vector<NArrow::NTest::TTestColumn>& pk,
- std::vector<TString>& mem, bool trailingNulls = false) const
- {
+ std::vector<TCell> GetCellVec(
+ const std::vector<NArrow::NTest::TTestColumn>& pk, std::vector<TString>& mem, bool trailingNulls = false) const {
UNIT_ASSERT(Border.size() <= pk.size());
std::vector<TCell> cells;
size_t i = 0;
@@ -2213,16 +2092,25 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
TTestCaseOptions() = default;
- TTestCaseOptions& SetFrom(const TBorder& border) { From = border; return *this; }
- TTestCaseOptions& SetTo(const TBorder& border) { To = border; return *this; }
- TTestCaseOptions& SetExpectedCount(ui32 count) { ExpectedCount = count; return *this; }
+ TTestCaseOptions& SetFrom(const TBorder& border) {
+ From = border;
+ return *this;
+ }
+ TTestCaseOptions& SetTo(const TBorder& border) {
+ To = border;
+ return *this;
+ }
+ TTestCaseOptions& SetExpectedCount(ui32 count) {
+ ExpectedCount = count;
+ return *this;
+ }
TSerializedTableRange MakeRange(const std::vector<NArrow::NTest::TTestColumn>& pk) const {
std::vector<TString> mem;
auto cellsFrom = From ? From->GetCellVec(pk, mem, false) : std::vector<TCell>();
auto cellsTo = To ? To->GetCellVec(pk, mem) : std::vector<TCell>();
return TSerializedTableRange(TConstArrayRef<TCell>(cellsFrom), (From ? From->GetInclude() : false),
- TConstArrayRef<TCell>(cellsTo), (To ? To->GetInclude(): false));
+ TConstArrayRef<TCell>(cellsTo), (To ? To->GetInclude() : false));
}
};
@@ -2233,17 +2121,17 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
void Execute() {
const ui64 tableId = 1;
- std::set<TString> useFields = {"timestamp", "message"};
- { // read with predicate (FROM)
+ std::set<TString> useFields = { "timestamp", "message" };
+ { // read with predicate (FROM)
TShardReader reader(Owner.Runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(Owner.PlanStep, Owner.TxId));
- reader.SetReplyColumns({"timestamp", "message"});
+ reader.SetReplyColumns({ "timestamp", "message" });
reader.AddRange(MakeRange(Owner.YdbPk));
auto rb = reader.ReadAll();
UNIT_ASSERT(reader.IsCorrectlyFinished());
if (ExpectedCount) {
if (*ExpectedCount) {
UNIT_ASSERT(CheckOrdered(rb));
- UNIT_ASSERT(CheckColumns(rb, {"timestamp", "message"}, ExpectedCount));
+ UNIT_ASSERT(CheckColumns(rb, { "timestamp", "message" }, ExpectedCount));
} else {
UNIT_ASSERT(!rb || !rb->num_rows());
}
@@ -2255,8 +2143,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
TTestCase(TTabletReadPredicateTest& owner, const TString& testCaseName, const TTestCaseOptions& opts = {})
: TTestCaseOptions(opts)
, Owner(owner)
- , TestCaseName(testCaseName)
- {
+ , TestCaseName(testCaseName) {
Cerr << "TEST CASE " << TestCaseName << " START..." << Endl;
}
@@ -2331,18 +2218,17 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
for (ui32 i = 0; i < 2; ++i) {
{
TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
- reader.SetReplyColumns({"timestamp", "message"});
+ reader.SetReplyColumns({ "timestamp", "message" });
auto rb = reader.ReadAll();
UNIT_ASSERT(reader.IsCorrectlyFinished());
UNIT_ASSERT(CheckOrdered(rb));
if (testBlobOptions.SameValueColumns.contains("timestamp")) {
UNIT_ASSERT(!testBlobOptions.SameValueColumns.contains("message"));
- UNIT_ASSERT(DataHas<std::string>({rb}, {0, fullNumRows}, true, "message"));
+ UNIT_ASSERT(DataHas<std::string>({ rb }, { 0, fullNumRows }, true, "message"));
} else {
- UNIT_ASSERT(isStrPk0
- ? DataHas<std::string>({rb}, {0, fullNumRows}, true, "timestamp")
- : DataHas({rb}, {0, fullNumRows}, true, "timestamp"));
+ UNIT_ASSERT(isStrPk0 ? DataHas<std::string>({ rb }, { 0, fullNumRows }, true, "timestamp")
+ : DataHas({ rb }, { 0, fullNumRows }, true, "timestamp"));
}
}
std::vector<ui32> val0 = { 0 };
@@ -2351,9 +2237,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
std::vector<ui32> val9999 = { 99999 };
std::vector<ui32> val1M = { 1000000000 };
std::vector<ui32> val1M_1 = { 1000000001 };
- std::vector<ui32> valNumRows = {fullNumRows};
- std::vector<ui32> valNumRows_1 = {fullNumRows - 1 };
- std::vector<ui32> valNumRows_2 = {fullNumRows - 2 };
+ std::vector<ui32> valNumRows = { fullNumRows };
+ std::vector<ui32> valNumRows_1 = { fullNumRows - 1 };
+ std::vector<ui32> valNumRows_2 = { fullNumRows - 2 };
{
UNIT_ASSERT(table.Pk.size() >= 2);
@@ -2365,7 +2251,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
val9999 = { sameValue, 99999 };
val1M = { sameValue, 1000000000 };
val1M_1 = { sameValue, 1000000001 };
- valNumRows = { sameValue, fullNumRows};
+ valNumRows = { sameValue, fullNumRows };
valNumRows_1 = { sameValue, fullNumRows - 1 };
valNumRows_2 = { sameValue, fullNumRows - 2 };
}
@@ -2382,8 +2268,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
testAgent.Test("[0:1)").SetFrom(TBorder(val0, true)).SetTo(TBorder(val1, false)).SetExpectedCount(1);
testAgent.Test("(0:1)").SetFrom(TBorder(val0, false)).SetTo(TBorder(val1, false)).SetExpectedCount(0);
testAgent.Test("outscope1").SetFrom(TBorder(val1M, true)).SetTo(TBorder(val1M_1, true)).SetExpectedCount(0);
-// VERIFIED AS INCORRECT INTERVAL (its good)
-// testAgent.Test("[0-0)").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TBorder(0, false)).SetExpectedCount(0);
+ // VERIFIED AS INCORRECT INTERVAL (its good)
+ // testAgent.Test("[0-0)").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TBorder(0, false)).SetExpectedCount(0);
if (isStrPk0) {
testAgent.Test("(99990:").SetFrom(TBorder(val9990, false)).SetExpectedCount(109);
@@ -2401,8 +2287,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
const TInstant start = TInstant::Now();
bool success = false;
- while (!success && TInstant::Now() - start < TDuration::Seconds(30)) { // Get index stats
- ScanIndexStats(runtime, sender, {tableId, 42}, NOlap::TSnapshot(planStep, txId), 0);
+ while (!success && TInstant::Now() - start < TDuration::Seconds(30)) { // Get index stats
+ ScanIndexStats(runtime, sender, { tableId, 42 }, NOlap::TSnapshot(planStep, txId), 0);
auto scanInited = runtime.GrabEdgeEvent<NKqp::TEvKqpCompute::TEvScanInitActor>(handle);
auto& msg = scanInited->Record;
auto scanActorId = ActorIdFromProto(msg.GetScanActorId());
@@ -2440,11 +2326,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
if (!activity) {
continue;
}
- Cerr << "[" << __LINE__ << "] " << activity << " " << table.Pk[0].GetType().GetTypeId() << " "
- << pathId << " " << kindStr << " " << numRows << " " << numBytes << " " << numRawBytes << "\n";
+ Cerr << "[" << __LINE__ << "] " << activity << " " << table.Pk[0].GetType().GetTypeId() << " " << pathId << " " << kindStr
+ << " " << numRows << " " << numBytes << " " << numRawBytes << "\n";
if (pathId == tableId) {
- if (kindStr == ::ToString(NOlap::NPortion::EProduced::COMPACTED) || kindStr == ::ToString(NOlap::NPortion::EProduced::SPLIT_COMPACTED) || numBytes > (4LLU << 20)) {
+ if (kindStr == ::ToString(NOlap::NPortion::EProduced::COMPACTED) ||
+ kindStr == ::ToString(NOlap::NPortion::EProduced::SPLIT_COMPACTED) || numBytes > (4LLU << 20)) {
sumCompactedBytes += numBytes;
sumCompactedRows += numRows;
//UNIT_ASSERT(numRawBytes > numBytes);
@@ -2483,7 +2370,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
pk[0].SetType(TTypeInfo(typeId));
schema[1].SetType(TTypeInfo(typeId));
pk[1].SetType(TTypeInfo(typeId));
- TestTableDescription table{.Schema = schema, .Pk = pk};
+ TestTableDescription table{ .Schema = schema, .Pk = pk };
TestCompactionSplitGranuleImpl(table, opts);
}
@@ -2542,7 +2429,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
// Write some test data to advance the time
{
- std::pair<ui64, ui64> triggerPortion = {1, 1000};
+ std::pair<ui64, ui64> triggerPortion = { 1, 1000 };
TString triggerData = MakeTestBlob(triggerPortion, ydbSchema);
std::vector<ui64> writeIds;
@@ -2581,11 +2468,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
// Try to read snapshot that is too old
{
TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - staleness.MilliSeconds(), Max<ui64>()));
- reader.SetReplyColumns({"timestamp", "message"});
+ reader.SetReplyColumns({ "timestamp", "message" });
reader.ReadAll();
UNIT_ASSERT(reader.IsError());
}
-
}
void TestCompactionGC() {
@@ -2648,7 +2534,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
++compactionsHappened;
TStringBuilder sb;
sb << "Compaction old portions:";
- ui64 srcPathId{0};
+ ui64 srcPathId{ 0 };
for (const auto& portionInfo : compact->SwitchedPortions) {
const ui64 pathId = portionInfo.GetPathId();
UNIT_ASSERT(!srcPathId || srcPathId == pathId);
@@ -2673,7 +2559,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
} else if (auto* msg = TryGetPrivateEvent<NActors::NLog::TEvLog>(ev)) {
{
- const std::vector<TString> prefixes = {"Delay Delete Blob "};
+ const std::vector<TString> prefixes = { "Delay Delete Blob " };
for (TString prefix : prefixes) {
size_t pos = msg->Line.find(prefix);
if (pos != TString::npos) {
@@ -2716,7 +2602,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
// Write different keys: grow on compaction
static const ui32 triggerPortionSize = 75 * 1000;
- std::pair<ui64, ui64> triggerPortion = {0, triggerPortionSize};
+ std::pair<ui64, ui64> triggerPortion = { 0, triggerPortionSize };
TString triggerData = MakeTestBlob(triggerPortion, ydbSchema);
UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::GetMaxBlobSize());
@@ -2736,7 +2622,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
// Do a small write that is not indexed so that we will get a committed blob in read request
{
- TString smallData = MakeTestBlob({0, 2}, ydbSchema);
+ TString smallData = MakeTestBlob({ 0, 2 }, ydbSchema);
UNIT_ASSERT(smallData.size() < 100 * 1024);
std::vector<ui64> writeIds;
UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, smallData, ydbSchema, true, &writeIds));
@@ -2751,7 +2637,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
--planStep;
--txId;
Cerr << compactionsHappened << Endl;
-// UNIT_ASSERT_GE(compactionsHappened, 3); // we catch it three times per action
+ // UNIT_ASSERT_GE(compactionsHappened, 3); // we catch it three times per action
ui64 previousCompactionsHappened = compactionsHappened;
ui64 previousCleanupsHappened = cleanupsHappened;
@@ -2760,7 +2646,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
// This request is expected to read at least 1 committed blob and several index portions
// These committed blob and portions must not be deleted by the BlobManager until the read request finishes
TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max<ui64>()));
- reader.SetReplyColumns({"timestamp", "message"});
+ reader.SetReplyColumns({ "timestamp", "message" });
auto rb = reader.ReadAll();
UNIT_ASSERT(reader.IsCorrectlyFinished());
UNIT_ASSERT(CheckOrdered(rb));
@@ -2868,4 +2754,4 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
}
-}
+} // namespace NKikimr
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
index 73404795270..1f2312b1bfd 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
@@ -1,16 +1,14 @@
#include <ydb/core/tx/columnshard/columnshard_schema.h>
-#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>
-
+#include <ydb/core/tx/columnshard/engines/portions/constructor.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
-#include <ydb/core/tx/columnshard/engines/portions/constructor.h>
-
#include <ydb/core/tx/columnshard/operations/write_data.h>
+#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>
+#include <ydb/core/tx/columnshard/test_helper/shard_writer.h>
-#include <ydb/library/formats/arrow/simple_builder/filler.h>
#include <ydb/library/formats/arrow/simple_builder/array.h>
#include <ydb/library/formats/arrow/simple_builder/batch.h>
-
+#include <ydb/library/formats/arrow/simple_builder/filler.h>
namespace NKikimr {
@@ -34,17 +32,17 @@ struct TPortionRecord {
ui32 Size = 0;
};
-
class TNormalizerChecker {
public:
- virtual ~TNormalizerChecker() {}
+ virtual ~TNormalizerChecker() {
+ }
virtual ui64 RecordsCountAfterReboot(const ui64 initialRecodsCount) const {
return initialRecodsCount;
}
};
-class TPathIdCleaner : public NYDBTest::ILocalDBModifier {
+class TPathIdCleaner: public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
@@ -83,33 +81,26 @@ public:
UNIT_ASSERT(pathId.has_value());
- for (auto&& [ portionId, key ] : portion2Key) {
- db.Table<Schema::IndexColumns>().Key(key.Index, key.Granule, key.ColumnIdx,
- key.PlanStep, key.TxId, key.Portion, key.Chunk).Delete();
+ for (auto&& [portionId, key] : portion2Key) {
+ db.Table<Schema::IndexColumns>().Key(key.Index, key.Granule, key.ColumnIdx, key.PlanStep, key.TxId, key.Portion, key.Chunk).Delete();
- db.Table<Schema::IndexColumns>().Key(key.Index, 1, key.ColumnIdx,
- key.PlanStep, key.TxId, key.Portion, key.Chunk).Update(
- NIceDb::TUpdate<Schema::IndexColumns::XPlanStep>(key.XPlanStep),
- NIceDb::TUpdate<Schema::IndexColumns::XTxId>(key.XTxId),
- NIceDb::TUpdate<Schema::IndexColumns::Blob>(key.Blob),
- NIceDb::TUpdate<Schema::IndexColumns::Metadata>(key.Metadata),
- NIceDb::TUpdate<Schema::IndexColumns::Offset>(key.Offset),
- NIceDb::TUpdate<Schema::IndexColumns::Size>(key.Size),
+ db.Table<Schema::IndexColumns>()
+ .Key(key.Index, 1, key.ColumnIdx, key.PlanStep, key.TxId, key.Portion, key.Chunk)
+ .Update(NIceDb::TUpdate<Schema::IndexColumns::XPlanStep>(key.XPlanStep), NIceDb::TUpdate<Schema::IndexColumns::XTxId>(key.XTxId),
+ NIceDb::TUpdate<Schema::IndexColumns::Blob>(key.Blob), NIceDb::TUpdate<Schema::IndexColumns::Metadata>(key.Metadata),
+ NIceDb::TUpdate<Schema::IndexColumns::Offset>(key.Offset), NIceDb::TUpdate<Schema::IndexColumns::Size>(key.Size),
- NIceDb::TNull<Schema::IndexColumns::PathId>()
- );
+ NIceDb::TNull<Schema::IndexColumns::PathId>());
}
- db.Table<Schema::IndexGranules>().Key(0, *pathId, "1").Update(
- NIceDb::TUpdate<Schema::IndexGranules::Granule>(1),
- NIceDb::TUpdate<Schema::IndexGranules::PlanStep>(1),
- NIceDb::TUpdate<Schema::IndexGranules::TxId>(1),
- NIceDb::TUpdate<Schema::IndexGranules::Metadata>("")
- );
+ db.Table<Schema::IndexGranules>()
+ .Key(0, *pathId, "1")
+ .Update(NIceDb::TUpdate<Schema::IndexGranules::Granule>(1), NIceDb::TUpdate<Schema::IndexGranules::PlanStep>(1),
+ NIceDb::TUpdate<Schema::IndexGranules::TxId>(1), NIceDb::TUpdate<Schema::IndexGranules::Metadata>(""));
}
};
-class TColumnChunksCleaner : public NYDBTest::ILocalDBModifier {
+class TColumnChunksCleaner: public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
@@ -148,21 +139,20 @@ public:
UNIT_ASSERT(pathId.has_value());
- for (auto&& key: portion2Key) {
+ for (auto&& key : portion2Key) {
NKikimrTxColumnShard::TIndexColumnMeta metaProto;
UNIT_ASSERT(metaProto.ParseFromArray(key.Metadata.data(), key.Metadata.size()));
metaProto.ClearNumRows();
metaProto.ClearRawBytes();
- db.Table<Schema::IndexColumns>().Key(key.Index, key.Granule, key.ColumnIdx,
- key.PlanStep, key.TxId, key.Portion, key.Chunk).Update(
- NIceDb::TUpdate<Schema::IndexColumns::Metadata>(metaProto.SerializeAsString())
- );
+ db.Table<Schema::IndexColumns>()
+ .Key(key.Index, key.Granule, key.ColumnIdx, key.PlanStep, key.TxId, key.Portion, key.Chunk)
+ .Update(NIceDb::TUpdate<Schema::IndexColumns::Metadata>(metaProto.SerializeAsString()));
}
}
};
-class TPortionsCleaner : public NYDBTest::ILocalDBModifier {
+class TPortionsCleaner: public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
@@ -174,20 +164,20 @@ public:
UNIT_ASSERT(rowset.IsReady());
while (!rowset.EndOfSet()) {
- NOlap::TPortionAddress addr(rowset.GetValue<Schema::IndexPortions::PathId>(), rowset.GetValue<Schema::IndexPortions::PortionId>());
+ NOlap::TPortionAddress addr(
+ rowset.GetValue<Schema::IndexPortions::PathId>(), rowset.GetValue<Schema::IndexPortions::PortionId>());
portions.emplace_back(addr);
UNIT_ASSERT(rowset.Next());
}
}
- for (auto&& key: portions) {
+ for (auto&& key : portions) {
db.Table<Schema::IndexPortions>().Key(key.GetPathId(), key.GetPortionId()).Delete();
}
}
};
-
-class TEmptyPortionsCleaner : public NYDBTest::ILocalDBModifier {
+class TEmptyPortionsCleaner: public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
@@ -200,8 +190,7 @@ public:
}
};
-
-class TTablesCleaner : public NYDBTest::ILocalDBModifier {
+class TTablesCleaner: public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
@@ -219,7 +208,7 @@ public:
}
}
- for (auto&& key: tables) {
+ for (auto&& key : tables) {
db.Table<Schema::TableInfo>().Key(key).Delete();
}
@@ -244,10 +233,9 @@ public:
}
}
- for (auto&& key: versions) {
+ for (auto&& key : versions) {
db.Table<Schema::TableVersionInfo>().Key(key.PathId, key.Step, key.TxId).Delete();
}
-
}
};
@@ -255,6 +243,7 @@ template <class TLocalDBModifier>
class TPrepareLocalDBController: public NKikimr::NYDBTest::NColumnShard::TController {
private:
using TBase = NKikimr::NYDBTest::ICSController;
+
public:
NYDBTest::ILocalDBModifier::TPtr BuildLocalBaseModifier() const override {
return std::make_shared<TLocalDBModifier>();
@@ -262,7 +251,6 @@ public:
};
Y_UNIT_TEST_SUITE(Normalizers) {
-
template <class TLocalDBModifier>
void TestNormalizerImpl(const TNormalizerChecker& checker = TNormalizerChecker()) {
using namespace NArrow;
@@ -271,52 +259,36 @@ Y_UNIT_TEST_SUITE(Normalizers) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
- const ui64 ownerId = 0;
const ui64 tableId = 1;
- const ui64 schemaVersion = 1;
- const std::vector<NArrow::NTest::TTestColumn> schema = {
- NArrow::NTest::TTestColumn("key1", TTypeInfo(NTypeIds::Uint64)),
- NArrow::NTest::TTestColumn("key2", TTypeInfo(NTypeIds::Uint64)),
- NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8) )
- };
- const std::vector<ui32> columnsIds = { 1, 2, 3};
+ const std::vector<NArrow::NTest::TTestColumn> schema = { NArrow::NTest::TTestColumn("key1", TTypeInfo(NTypeIds::Uint64)),
+ NArrow::NTest::TTestColumn("key2", TTypeInfo(NTypeIds::Uint64)), NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) };
+ const std::vector<ui32> columnsIds = { 1, 2, 3 };
PrepareTablet(runtime, tableId, schema, 2);
const ui64 txId = 111;
- NConstruction::IArrayBuilder::TPtr key1Column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key1");
- NConstruction::IArrayBuilder::TPtr key2Column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key2");
+ NConstruction::IArrayBuilder::TPtr key1Column =
+ std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key1");
+ NConstruction::IArrayBuilder::TPtr key2Column =
+ std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key2");
NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
"field", NConstruction::TStringPoolFiller(8, 100));
auto batch = NConstruction::TRecordBatchConstructor({ key1Column, key2Column, column }).BuildBatch(20048);
- TString blobData = NArrow::SerializeBatchNoCompression(batch);
-
- auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(NKikimrDataEvents::TEvWrite::MODE_PREPARE);
- evWrite->SetTxId(txId);
- ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
- evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
-
- TActorId sender = runtime.AllocateEdgeActor();
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
- {
- TAutoPtr<NActors::IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
- UNIT_ASSERT(event);
- UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
-
- PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId));
- }
+ NTxUT::TShardWriter writer(runtime, TTestTxConfig::TxTablet0, tableId, 222);
+ AFL_VERIFY(writer.Write(batch, {1, 2, 3}, txId) == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
+ AFL_VERIFY(writer.StartCommit(txId) == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
+ PlanWriteTx(runtime, writer.GetSender(), NOlap::TSnapshot(11, txId));
{
auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema);
UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 20048);
while (!csControllerGuard->GetInsertFinishedCounter().Val()) {
Cerr << csControllerGuard->GetInsertStartedCounter().Val() << Endl;
- Wakeup(runtime, sender, TTestTxConfig::TxTablet0);
+ Wakeup(runtime, writer.GetSender(), TTestTxConfig::TxTablet0);
runtime.SimulateSleep(TDuration::Seconds(1));
}
}
- RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
+ RebootTablet(runtime, TTestTxConfig::TxTablet0, writer.GetSender());
{
auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema);
@@ -341,7 +313,7 @@ Y_UNIT_TEST_SUITE(Normalizers) {
}
Y_UNIT_TEST(EmptyTablesNormalizer) {
- class TLocalNormalizerChecker : public TNormalizerChecker {
+ class TLocalNormalizerChecker: public TNormalizerChecker {
public:
ui64 RecordsCountAfterReboot(const ui64) const override {
return 0;
@@ -352,4 +324,4 @@ Y_UNIT_TEST_SUITE(Normalizers) {
}
}
-} // namespace NKikimr
+} // namespace NKikimr