diff options
author | azevaykin <145343289+azevaykin@users.noreply.github.com> | 2024-01-12 09:08:33 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-12 09:08:33 +0300 |
commit | db4e90915f31cde567461a77d468b4b4a6a0c15e (patch) | |
tree | 2cb1664c3f5841bfe6a2db670470f2f8a268c68f | |
parent | 3a4b1a7c0264efa0fd0cdb7f43e8b640b2dda9fc (diff) | |
download | ydb-db4e90915f31cde567461a77d468b4b4a6a0c15e.tar.gz |
Tests: Replace EvProposeTransaction with EvWrite (#932)
-rw-r--r-- | ydb/core/tx/datashard/datashard__write.cpp | 23 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_write.cpp | 21 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_write.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_write_operation.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp | 107 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ut_common/datashard_ut_common.h | 39 | ||||
-rw-r--r-- | ydb/library/actors/testlib/test_runtime.h | 2 |
8 files changed, 193 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/datashard__write.cpp b/ydb/core/tx/datashard/datashard__write.cpp index 028846ea02..63ff4a55c1 100644 --- a/ydb/core/tx/datashard/datashard__write.cpp +++ b/ydb/core/tx/datashard/datashard__write.cpp @@ -254,4 +254,27 @@ ui64 EvWrite::Convertor::GetProposeFlags(NKikimrDataEvents::TEvWrite::ETxMode tx Y_FAIL_S("Unexpected tx mode " << txMode); } } + +NKikimrDataEvents::TEvWrite::ETxMode EvWrite::Convertor::GetTxMode(ui64 flags) { + if ((flags & TTxFlags::Immediate) && !(flags & TTxFlags::ForceOnline)) { + return NKikimrDataEvents::TEvWrite::ETxMode::TEvWrite_ETxMode_MODE_IMMEDIATE; + } + else if (flags & TTxFlags::VolatilePrepare) { + return NKikimrDataEvents::TEvWrite::ETxMode::TEvWrite_ETxMode_MODE_VOLATILE_PREPARE; + } + else { + return NKikimrDataEvents::TEvWrite::ETxMode::TEvWrite_ETxMode_MODE_PREPARE; + } +} + +NKikimrTxDataShard::TEvProposeTransactionResult::EStatus EvWrite::Convertor::GetStatus(NKikimrDataEvents::TEvWriteResult::EStatus status) { + switch (status) { + case NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED: + return NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE; + case NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED: + return NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED; + default: + return NKikimrTxDataShard::TEvProposeTransactionResult::ERROR; + } +} }
\ No newline at end of file diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index fbdc8728b0..8e9f325d3e 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1570,7 +1570,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr& Y_ABORT_UNLESS(writeTx); auto badRequest = [&](const TString& error) { - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << "at tablet# " << Self->TabletID(), Self->TabletID()); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << " at tablet# " << Self->TabletID(), Self->TabletID()); LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error); }; diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp index 82ee700ff2..870295488a 100644 --- a/ydb/core/tx/datashard/datashard_ut_write.cpp +++ b/ydb/core/tx/datashard/datashard_ut_write.cpp @@ -29,6 +29,27 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { return {runtime, server, sender}; } + Y_UNIT_TEST_TWIN(Upsert, EvWrite) { + auto [runtime, server, sender] = TestCreateServer(); + + auto opts = TShardedTableOptions(); + auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts); + + auto rows = EvWrite ? TEvWriteRows{{{0, 1}}, {{2, 3}}, {{4, 5}}} : TEvWriteRows{}; + auto upsertObserver = ReplaceEvProposeTransactionWithEvWrite(runtime, rows); + auto upsertResultObserver = ReplaceEvProposeTransactionResultWithEvWrite(runtime, rows); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 1);")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 3);")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 5);")); + + auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All(); + + UNIT_ASSERT_VALUES_EQUAL(table1state, "key = 0, value = 1\n" + "key = 2, value = 3\n" + "key = 4, value = 5\n"); + } + Y_UNIT_TEST(WriteImmediateOnShard) { auto [runtime, server, sender] = TestCreateServer(); diff --git a/ydb/core/tx/datashard/datashard_write.h b/ydb/core/tx/datashard/datashard_write.h index f58bd947b8..af19b37c22 100644 --- a/ydb/core/tx/datashard/datashard_write.h +++ b/ydb/core/tx/datashard/datashard_write.h @@ -2,6 +2,7 @@ #include <ydb/library/actors/core/event.h> #include <ydb/core/protos/data_events.pb.h> +#include <ydb/core/protos/tx_datashard.pb.h> #include <util/generic/ptr.h> @@ -13,5 +14,7 @@ class Convertor { public: static ui64 GetTxId(const TAutoPtr<IEventHandle>& ev); static ui64 GetProposeFlags(NKikimrDataEvents::TEvWrite::ETxMode txMode); + static NKikimrDataEvents::TEvWrite::ETxMode GetTxMode(ui64 flags); + static NKikimrTxDataShard::TEvProposeTransactionResult::EStatus GetStatus(NKikimrDataEvents::TEvWriteResult::EStatus status); }; }
\ No newline at end of file diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp index aa25e3359d..07979ae3a4 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.cpp +++ b/ydb/core/tx/datashard/datashard_write_operation.cpp @@ -67,7 +67,7 @@ bool TValidatedWriteTx::ParseRecord(const TDataShard::TTableInfos& tableInfos) { auto tableInfoPtr = tableInfos.FindPtr(tableIdRecord.GetTableId()); if (!tableInfoPtr) { ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR; - ErrStr = TStringBuilder() << "Table '" << tableIdRecord.GetTableId() << "' doesn't exist"; + ErrStr = TStringBuilder() << "Table '" << tableIdRecord.GetTableId() << "' doesn't exist."; return false; } TableInfo = tableInfoPtr->Get(); diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp index 7b0ff48f22..b02e2fb303 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp @@ -1875,6 +1875,103 @@ NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sen return Write(runtime, sender, shardId, std::move(request), expectedStatus, std::move(traceId)); } + + +TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows) { + if (rows.empty()) + return {}; + + return runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) { + if (event->GetTypeRewrite() != TEvDataShard::EvProposeTransaction) + return; + + const auto& record = event->Get<TEvDataShard::TEvProposeTransaction>()->Record; + + if (record.GetTxKind() != NKikimrTxDataShard::TX_KIND_DATA) + return; + + // Parse original TEvProposeTransaction + const TString& txBody = record.GetTxBody(); + NKikimrTxDataShard::TDataTransaction tx; + Y_VERIFY(tx.ParseFromArray(txBody.data(), txBody.size())); + + // Construct new EvWrite + TVector<TCell> cells; + ui64 tableId = 0; + ui16 colCount = 0; + for (const auto& task : tx.GetKqpTransaction().GetTasks()) { + NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta; + Y_VERIFY(task.GetMeta().UnpackTo(&meta)); + if (!meta.HasWrites()) + continue; + + const auto& tableMeta = meta.GetTable(); + Y_VERIFY_S(tableId == 0 || tableId == tableMeta.GetTableId().GetTableId(), "Only writes to one table is supported now"); + tableId = tableMeta.GetTableId().GetTableId(); + const auto& writes = meta.GetWrites(); + Y_VERIFY_S(colCount == 0 || colCount == writes.GetColumns().size(), "Only equal column count is supported now."); + colCount = writes.GetColumns().size(); + + const auto& row = rows.ProcessNextRow(); + Y_VERIFY(row.Cells.size() == colCount); + std::copy(row.Cells.begin(), row.Cells.end(), std::back_inserter(cells)); + } + + if (cells.empty()) { + Cerr << "TEvProposeTransaction TX_KIND_DATA has no writes.\n"; + return; + } + + Cerr << "TEvProposeTransaction TX_KIND_DATA event is observed and will be replaced with EvWrite: " << record.ShortDebugString() << Endl; + + TSerializedCellMatrix matrix(cells, cells.size() / colCount, colCount); + TString blobData = matrix.ReleaseBuffer(); + + UNIT_ASSERT(blobData.size() < 8_MB); + + ui64 txId = record.GetTxId(); + auto txMode = NKikimr::NDataShard::EvWrite::Convertor::GetTxMode(record.GetFlags()); + std::vector<ui32> columnIds(colCount); + std::iota(columnIds.begin(), columnIds.end(), 1); + + auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode); + ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)); + evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC); + + // Replace event + auto handle = new IEventHandle(event->Recipient, event->Sender, evWrite.release(), 0, event->Cookie); + handle->Rewrite(handle->GetTypeRewrite(), event->GetRecipientRewrite()); + event.Reset(handle); + }); +} + +TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows) { + if (rows.empty()) + return {}; + + return runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) { + if (event->GetTypeRewrite() != NEvents::TDataEvents::EvWriteResult) + return; + + rows.CompleteNextRow(); + + const auto& record = event->Get<NEvents::TDataEvents::TEvWriteResult>()->Record; + Cerr << "EvWriteResult event is observed and will be replaced with EvProposeTransactionResult: " << record.ShortDebugString() << Endl; + + // Construct new EvProposeTransactionResult + ui64 txId = record.GetTxId(); + ui64 origin = record.GetOrigin(); + auto status = NKikimr::NDataShard::EvWrite::Convertor::GetStatus(record.GetStatus()); + + auto evResult = std::make_unique<TEvDataShard::TEvProposeTransactionResult>(NKikimrTxDataShard::TX_KIND_DATA, origin, txId, status); + + // Replace event + auto handle = new IEventHandle(event->Recipient, event->Sender, evResult.release(), 0, event->Cookie); + handle->Rewrite(handle->GetTypeRewrite(), event->GetRecipientRewrite()); + event.Reset(handle); + }); +} + void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values) { auto txTypes = std::make_shared<NTxProxy::TUploadTypes>(); @@ -1903,9 +2000,10 @@ void WaitTabletBecomesOffline(TServer::TPtr server, ui64 tabletId) struct IsShardStateChange { IsShardStateChange(ui64 tabletId) - : TabletId(tabletId) - { - } + : + TabletId(tabletId) + { + } bool operator()(IEventHandle& ev) { @@ -1959,7 +2057,8 @@ namespace { , Snapshot(snapshot) , Ordered(ordered) , State(pause ? EState::PauseWait : EState::Normal) - { } + { + } void Bootstrap(const TActorContext& ctx) { auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h index 5a0b6e7bcb..6c0831e0db 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h @@ -713,6 +713,45 @@ void ExecSQL(Tests::TServer::TPtr server, NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {}); NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {}); +struct TEvWriteRow { + TEvWriteRow(std::initializer_list<ui32> init) { + for (ui32 value : init) { + Cells.emplace_back(TCell((const char*)&value, sizeof(ui32))); + } + } + + std::vector<TCell> Cells; + + enum EStatus { + Init, + Processing, + Completed + } Status = Init; +}; +class TEvWriteRows : public std::vector<TEvWriteRow> { + public: + TEvWriteRows() = default; + TEvWriteRows(std::initializer_list<TEvWriteRow> init) : + std::vector<TEvWriteRow>(init) { } + + const TEvWriteRow& ProcessNextRow() { + auto processedRow = std::find_if(begin(), end(), [](const auto& row) { return row.Status == TEvWriteRow::EStatus::Init; }); + Y_VERIFY_S(processedRow != end(), "There should be at least one EvWrite row to process."); + processedRow->Status = TEvWriteRow::EStatus::Processing; + Cerr << "Processing next EvWrite row\n"; + return *processedRow; + } + void CompleteNextRow() { + auto processedRow = std::find_if(begin(), end(), [](const auto& row) { return row.Status == TEvWriteRow::EStatus::Processing; }); + Y_VERIFY_S(processedRow != end(), "There should be at lest one EvWrite row processing."); + processedRow->Status = TEvWriteRow::EStatus::Completed; + Cerr << "Completed next EvWrite row\n"; + } +}; + +TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows); +TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows); + void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values); struct IsTxResultComplete { diff --git a/ydb/library/actors/testlib/test_runtime.h b/ydb/library/actors/testlib/test_runtime.h index 8d66190383..4fc308812d 100644 --- a/ydb/library/actors/testlib/test_runtime.h +++ b/ydb/library/actors/testlib/test_runtime.h @@ -319,6 +319,8 @@ namespace NActors { TEventObserverHolder& operator=(TEventObserverHolder&& other) noexcept { if (this != &other) { + Remove(); + List = std::move(other.List); Iter = std::move(other.Iter); |