aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <145343289+azevaykin@users.noreply.github.com>2024-01-12 09:08:33 +0300
committerGitHub <noreply@github.com>2024-01-12 09:08:33 +0300
commitdb4e90915f31cde567461a77d468b4b4a6a0c15e (patch)
tree2cb1664c3f5841bfe6a2db670470f2f8a268c68f
parent3a4b1a7c0264efa0fd0cdb7f43e8b640b2dda9fc (diff)
downloadydb-db4e90915f31cde567461a77d468b4b4a6a0c15e.tar.gz
Tests: Replace EvProposeTransaction with EvWrite (#932)
-rw-r--r--ydb/core/tx/datashard/datashard__write.cpp23
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_write.cpp21
-rw-r--r--ydb/core/tx/datashard/datashard_write.h3
-rw-r--r--ydb/core/tx/datashard/datashard_write_operation.cpp2
-rw-r--r--ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp107
-rw-r--r--ydb/core/tx/datashard/ut_common/datashard_ut_common.h39
-rw-r--r--ydb/library/actors/testlib/test_runtime.h2
8 files changed, 193 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/datashard__write.cpp b/ydb/core/tx/datashard/datashard__write.cpp
index 028846ea02..63ff4a55c1 100644
--- a/ydb/core/tx/datashard/datashard__write.cpp
+++ b/ydb/core/tx/datashard/datashard__write.cpp
@@ -254,4 +254,27 @@ ui64 EvWrite::Convertor::GetProposeFlags(NKikimrDataEvents::TEvWrite::ETxMode tx
Y_FAIL_S("Unexpected tx mode " << txMode);
}
}
+
+NKikimrDataEvents::TEvWrite::ETxMode EvWrite::Convertor::GetTxMode(ui64 flags) {
+ if ((flags & TTxFlags::Immediate) && !(flags & TTxFlags::ForceOnline)) {
+ return NKikimrDataEvents::TEvWrite::ETxMode::TEvWrite_ETxMode_MODE_IMMEDIATE;
+ }
+ else if (flags & TTxFlags::VolatilePrepare) {
+ return NKikimrDataEvents::TEvWrite::ETxMode::TEvWrite_ETxMode_MODE_VOLATILE_PREPARE;
+ }
+ else {
+ return NKikimrDataEvents::TEvWrite::ETxMode::TEvWrite_ETxMode_MODE_PREPARE;
+ }
+}
+
+NKikimrTxDataShard::TEvProposeTransactionResult::EStatus EvWrite::Convertor::GetStatus(NKikimrDataEvents::TEvWriteResult::EStatus status) {
+ switch (status) {
+ case NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED:
+ return NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE;
+ case NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED:
+ return NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED;
+ default:
+ return NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
+ }
+}
} \ No newline at end of file
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index fbdc8728b0..8e9f325d3e 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1570,7 +1570,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
Y_ABORT_UNLESS(writeTx);
auto badRequest = [&](const TString& error) {
- writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << "at tablet# " << Self->TabletID(), Self->TabletID());
+ writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << " at tablet# " << Self->TabletID(), Self->TabletID());
LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error);
};
diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp
index 82ee700ff2..870295488a 100644
--- a/ydb/core/tx/datashard/datashard_ut_write.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_write.cpp
@@ -29,6 +29,27 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
return {runtime, server, sender};
}
+ Y_UNIT_TEST_TWIN(Upsert, EvWrite) {
+ auto [runtime, server, sender] = TestCreateServer();
+
+ auto opts = TShardedTableOptions();
+ auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
+
+ auto rows = EvWrite ? TEvWriteRows{{{0, 1}}, {{2, 3}}, {{4, 5}}} : TEvWriteRows{};
+ auto upsertObserver = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
+ auto upsertResultObserver = ReplaceEvProposeTransactionResultWithEvWrite(runtime, rows);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 1);"));
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 3);"));
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 5);"));
+
+ auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
+
+ UNIT_ASSERT_VALUES_EQUAL(table1state, "key = 0, value = 1\n"
+ "key = 2, value = 3\n"
+ "key = 4, value = 5\n");
+ }
+
Y_UNIT_TEST(WriteImmediateOnShard) {
auto [runtime, server, sender] = TestCreateServer();
diff --git a/ydb/core/tx/datashard/datashard_write.h b/ydb/core/tx/datashard/datashard_write.h
index f58bd947b8..af19b37c22 100644
--- a/ydb/core/tx/datashard/datashard_write.h
+++ b/ydb/core/tx/datashard/datashard_write.h
@@ -2,6 +2,7 @@
#include <ydb/library/actors/core/event.h>
#include <ydb/core/protos/data_events.pb.h>
+#include <ydb/core/protos/tx_datashard.pb.h>
#include <util/generic/ptr.h>
@@ -13,5 +14,7 @@ class Convertor {
public:
static ui64 GetTxId(const TAutoPtr<IEventHandle>& ev);
static ui64 GetProposeFlags(NKikimrDataEvents::TEvWrite::ETxMode txMode);
+ static NKikimrDataEvents::TEvWrite::ETxMode GetTxMode(ui64 flags);
+ static NKikimrTxDataShard::TEvProposeTransactionResult::EStatus GetStatus(NKikimrDataEvents::TEvWriteResult::EStatus status);
};
} \ No newline at end of file
diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp
index aa25e3359d..07979ae3a4 100644
--- a/ydb/core/tx/datashard/datashard_write_operation.cpp
+++ b/ydb/core/tx/datashard/datashard_write_operation.cpp
@@ -67,7 +67,7 @@ bool TValidatedWriteTx::ParseRecord(const TDataShard::TTableInfos& tableInfos) {
auto tableInfoPtr = tableInfos.FindPtr(tableIdRecord.GetTableId());
if (!tableInfoPtr) {
ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR;
- ErrStr = TStringBuilder() << "Table '" << tableIdRecord.GetTableId() << "' doesn't exist";
+ ErrStr = TStringBuilder() << "Table '" << tableIdRecord.GetTableId() << "' doesn't exist.";
return false;
}
TableInfo = tableInfoPtr->Get();
diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
index 7b0ff48f22..b02e2fb303 100644
--- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
@@ -1875,6 +1875,103 @@ NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sen
return Write(runtime, sender, shardId, std::move(request), expectedStatus, std::move(traceId));
}
+
+
+TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows) {
+ if (rows.empty())
+ return {};
+
+ return runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) {
+ if (event->GetTypeRewrite() != TEvDataShard::EvProposeTransaction)
+ return;
+
+ const auto& record = event->Get<TEvDataShard::TEvProposeTransaction>()->Record;
+
+ if (record.GetTxKind() != NKikimrTxDataShard::TX_KIND_DATA)
+ return;
+
+ // Parse original TEvProposeTransaction
+ const TString& txBody = record.GetTxBody();
+ NKikimrTxDataShard::TDataTransaction tx;
+ Y_VERIFY(tx.ParseFromArray(txBody.data(), txBody.size()));
+
+ // Construct new EvWrite
+ TVector<TCell> cells;
+ ui64 tableId = 0;
+ ui16 colCount = 0;
+ for (const auto& task : tx.GetKqpTransaction().GetTasks()) {
+ NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;
+ Y_VERIFY(task.GetMeta().UnpackTo(&meta));
+ if (!meta.HasWrites())
+ continue;
+
+ const auto& tableMeta = meta.GetTable();
+ Y_VERIFY_S(tableId == 0 || tableId == tableMeta.GetTableId().GetTableId(), "Only writes to one table is supported now");
+ tableId = tableMeta.GetTableId().GetTableId();
+ const auto& writes = meta.GetWrites();
+ Y_VERIFY_S(colCount == 0 || colCount == writes.GetColumns().size(), "Only equal column count is supported now.");
+ colCount = writes.GetColumns().size();
+
+ const auto& row = rows.ProcessNextRow();
+ Y_VERIFY(row.Cells.size() == colCount);
+ std::copy(row.Cells.begin(), row.Cells.end(), std::back_inserter(cells));
+ }
+
+ if (cells.empty()) {
+ Cerr << "TEvProposeTransaction TX_KIND_DATA has no writes.\n";
+ return;
+ }
+
+ Cerr << "TEvProposeTransaction TX_KIND_DATA event is observed and will be replaced with EvWrite: " << record.ShortDebugString() << Endl;
+
+ TSerializedCellMatrix matrix(cells, cells.size() / colCount, colCount);
+ TString blobData = matrix.ReleaseBuffer();
+
+ UNIT_ASSERT(blobData.size() < 8_MB);
+
+ ui64 txId = record.GetTxId();
+ auto txMode = NKikimr::NDataShard::EvWrite::Convertor::GetTxMode(record.GetFlags());
+ std::vector<ui32> columnIds(colCount);
+ std::iota(columnIds.begin(), columnIds.end(), 1);
+
+ auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
+ ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
+ evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
+
+ // Replace event
+ auto handle = new IEventHandle(event->Recipient, event->Sender, evWrite.release(), 0, event->Cookie);
+ handle->Rewrite(handle->GetTypeRewrite(), event->GetRecipientRewrite());
+ event.Reset(handle);
+ });
+}
+
+TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows) {
+ if (rows.empty())
+ return {};
+
+ return runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) {
+ if (event->GetTypeRewrite() != NEvents::TDataEvents::EvWriteResult)
+ return;
+
+ rows.CompleteNextRow();
+
+ const auto& record = event->Get<NEvents::TDataEvents::TEvWriteResult>()->Record;
+ Cerr << "EvWriteResult event is observed and will be replaced with EvProposeTransactionResult: " << record.ShortDebugString() << Endl;
+
+ // Construct new EvProposeTransactionResult
+ ui64 txId = record.GetTxId();
+ ui64 origin = record.GetOrigin();
+ auto status = NKikimr::NDataShard::EvWrite::Convertor::GetStatus(record.GetStatus());
+
+ auto evResult = std::make_unique<TEvDataShard::TEvProposeTransactionResult>(NKikimrTxDataShard::TX_KIND_DATA, origin, txId, status);
+
+ // Replace event
+ auto handle = new IEventHandle(event->Recipient, event->Sender, evResult.release(), 0, event->Cookie);
+ handle->Rewrite(handle->GetTypeRewrite(), event->GetRecipientRewrite());
+ event.Reset(handle);
+ });
+}
+
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values)
{
auto txTypes = std::make_shared<NTxProxy::TUploadTypes>();
@@ -1903,9 +2000,10 @@ void WaitTabletBecomesOffline(TServer::TPtr server, ui64 tabletId)
struct IsShardStateChange
{
IsShardStateChange(ui64 tabletId)
- : TabletId(tabletId)
- {
- }
+ :
+ TabletId(tabletId)
+ {
+ }
bool operator()(IEventHandle& ev)
{
@@ -1959,7 +2057,8 @@ namespace {
, Snapshot(snapshot)
, Ordered(ordered)
, State(pause ? EState::PauseWait : EState::Normal)
- { }
+ {
+ }
void Bootstrap(const TActorContext& ctx) {
auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
index 5a0b6e7bcb..6c0831e0db 100644
--- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
@@ -713,6 +713,45 @@ void ExecSQL(Tests::TServer::TPtr server,
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});
+struct TEvWriteRow {
+ TEvWriteRow(std::initializer_list<ui32> init) {
+ for (ui32 value : init) {
+ Cells.emplace_back(TCell((const char*)&value, sizeof(ui32)));
+ }
+ }
+
+ std::vector<TCell> Cells;
+
+ enum EStatus {
+ Init,
+ Processing,
+ Completed
+ } Status = Init;
+};
+class TEvWriteRows : public std::vector<TEvWriteRow> {
+ public:
+ TEvWriteRows() = default;
+ TEvWriteRows(std::initializer_list<TEvWriteRow> init) :
+ std::vector<TEvWriteRow>(init) { }
+
+ const TEvWriteRow& ProcessNextRow() {
+ auto processedRow = std::find_if(begin(), end(), [](const auto& row) { return row.Status == TEvWriteRow::EStatus::Init; });
+ Y_VERIFY_S(processedRow != end(), "There should be at least one EvWrite row to process.");
+ processedRow->Status = TEvWriteRow::EStatus::Processing;
+ Cerr << "Processing next EvWrite row\n";
+ return *processedRow;
+ }
+ void CompleteNextRow() {
+ auto processedRow = std::find_if(begin(), end(), [](const auto& row) { return row.Status == TEvWriteRow::EStatus::Processing; });
+ Y_VERIFY_S(processedRow != end(), "There should be at lest one EvWrite row processing.");
+ processedRow->Status = TEvWriteRow::EStatus::Completed;
+ Cerr << "Completed next EvWrite row\n";
+ }
+};
+
+TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows);
+TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows);
+
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values);
struct IsTxResultComplete {
diff --git a/ydb/library/actors/testlib/test_runtime.h b/ydb/library/actors/testlib/test_runtime.h
index 8d66190383..4fc308812d 100644
--- a/ydb/library/actors/testlib/test_runtime.h
+++ b/ydb/library/actors/testlib/test_runtime.h
@@ -319,6 +319,8 @@ namespace NActors {
TEventObserverHolder& operator=(TEventObserverHolder&& other) noexcept {
if (this != &other)
{
+ Remove();
+
List = std::move(other.List);
Iter = std::move(other.Iter);