diff options
author | azevaykin <145343289+azevaykin@users.noreply.github.com> | 2023-12-27 17:38:50 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-27 15:38:50 +0100 |
commit | 75470f709400ae7c2fac349fbc8d1aa7efc92dfa (patch) | |
tree | f98e79a39adfa7ce701bc827c32946ae2c3f9525 | |
parent | f59afadcbb8cec4f6a50cabf0ee4cf8507dcf3c0 (diff) | |
download | ydb-75470f709400ae7c2fac349fbc8d1aa7efc92dfa.tar.gz |
ut_read_iterator + EvWrite (#742)
ut_read_iterator + EvWrite
4 files changed, 84 insertions, 38 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 635f5d422a..a34be8bfc2 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -10,6 +10,9 @@ #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/tx/tx_proxy/read_table.h> +#include <ydb/core/tx/data_events/events.h> +#include <ydb/core/tx/data_events/payload_helper.h> + #include <ydb/public/sdk/cpp/client/ydb_result/result.h> #include <algorithm> @@ -25,46 +28,52 @@ namespace { using TCellVec = std::vector<TCell>; -void CreateTable(Tests::TServer::TPtr server, +TVector<TShardedTableOptions::TColumn> GetColumns() { + TVector<TShardedTableOptions::TColumn> columns = { + {"key1", "Uint32", true, false}, + {"key2", "Uint32", true, false}, + {"key3", "Uint32", true, false}, + {"value", "Uint32", false, false}}; + + return columns; +} + +TVector<TShardedTableOptions::TColumn> GetMoviesColumns() { + TVector<TShardedTableOptions::TColumn> columns = { + {"id", "Uint32", true, false}, + {"title", "String", false, false}, + {"rating", "Uint32", false, false}}; + + return columns; +} + +std::tuple<TVector<ui64>, ui64> CreateTable(Tests::TServer::TPtr server, TActorId sender, const TString &root, const TString &name, bool withFollower = false, ui64 shardCount = 1) { - TVector<TShardedTableOptions::TColumn> columns = { - {"key1", "Uint32", true, false}, - {"key2", "Uint32", true, false}, - {"key3", "Uint32", true, false}, - {"value", "Uint32", false, false} - }; - auto opts = TShardedTableOptions() .Shards(shardCount) - .Columns(columns); + .Columns(GetColumns()); if (withFollower) opts.Followers(1); - CreateShardedTable(server, sender, root, name, opts); + return CreateShardedTable(server, sender, root, name, opts); } -void CreateMoviesTable(Tests::TServer::TPtr server, +std::tuple<TVector<ui64>, ui64> CreateMoviesTable(Tests::TServer::TPtr server, TActorId sender, const TString &root, const TString &name) { - TVector<TShardedTableOptions::TColumn> columns = { - {"id", "Uint32", true, false}, - {"title", "String", false, false}, - {"rating", "Uint32", false, false} - }; - auto opts = TShardedTableOptions() .Shards(1) - .Columns(columns); + .Columns(GetMoviesColumns()); - CreateShardedTable(server, sender, root, name, opts); + return CreateShardedTable(server, sender, root, name, opts); } struct TRowWriter : public NArrow::IRowWriter { @@ -308,11 +317,14 @@ void AddRangeQuery( struct TTableInfo { TString Name; + ui64 TableId; ui64 TabletId; ui64 OwnerId; NKikimrTxDataShard::TEvGetInfoResponse::TUserTable UserTable; TActorId ClientId; + + TVector<TShardedTableOptions::TColumn> Columns; }; struct TTestHelper { @@ -345,7 +357,7 @@ struct TTestHelper { { auto& table1 = Tables["table-1"]; table1.Name = "table-1"; - CreateTable(Server, Sender, "/Root", "table-1", WithFollower, ShardCount); + auto [shards, tableId] = CreateTable(Server, Sender, "/Root", "table-1", WithFollower, ShardCount); ExecSQL(Server, Sender, R"( UPSERT INTO `/Root/table-1` (key1, key2, key3, value) @@ -360,7 +372,7 @@ struct TTestHelper { (11, 11, 11, 1111); )"); - auto shards = GetTableShards(Server, Sender, "/Root/table-1"); + table1.TableId = tableId; table1.TabletId = shards.at(0); auto [tables, ownerId] = GetTables(Server, table1.TabletId); @@ -368,12 +380,14 @@ struct TTestHelper { table1.UserTable = tables["table-1"]; table1.ClientId = runtime.ConnectToPipe(table1.TabletId, Sender, 0, GetTestPipeConfig()); + + table1.Columns = GetColumns(); } { auto& table2 = Tables["movies"]; table2.Name = "movies"; - CreateMoviesTable(Server, Sender, "/Root", "movies"); + auto [shards, tableId] = CreateMoviesTable(Server, Sender, "/Root", "movies"); ExecSQL(Server, Sender, R"( UPSERT INTO `/Root/movies` (id, title, rating) @@ -383,7 +397,7 @@ struct TTestHelper { (3, "Hard die", 8); )"); - auto shards = GetTableShards(Server, Sender, "/Root/movies"); + table2.TableId = tableId; table2.TabletId = shards.at(0); auto [tables, ownerId] = GetTables(Server, table2.TabletId); @@ -391,14 +405,16 @@ struct TTestHelper { table2.UserTable = tables["movies"]; table2.ClientId = runtime.ConnectToPipe(table2.TabletId, Sender, 0, GetTestPipeConfig()); + + table2.Columns = GetMoviesColumns(); } { auto& table3 = Tables["table-1-many"]; table3.Name = "table-1-many"; - CreateTable(Server, Sender, "/Root", "table-1-many", WithFollower, ShardCount); + auto [shards, tableId] = CreateTable(Server, Sender, "/Root", "table-1-many", WithFollower, ShardCount); - auto shards = GetTableShards(Server, Sender, "/Root/table-1-many"); + table3.TableId = tableId; table3.TabletId = shards.at(0); auto [tables, ownerId] = GetTables(Server, table3.TabletId); @@ -406,6 +422,8 @@ struct TTestHelper { table3.UserTable = tables["table-1-many"]; table3.ClientId = runtime.ConnectToPipe(table3.TabletId, Sender, 0, GetTestPipeConfig()); + + table3.Columns = GetColumns(); } } @@ -717,6 +735,30 @@ struct TTestHelper { UNIT_ASSERT_VALUES_EQUAL(rowsRead, Min(rowCount, limit)); } + NKikimrDataEvents::TEvWriteResult WriteRow(const TString& tableName, ui64 txId, const TVector<ui32>& values, NKikimrDataEvents::TEvWrite::ETxMode txMode = NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE) { + const auto& table = Tables[tableName]; + + auto opts = TShardedTableOptions().Columns(table.Columns); + size_t columnCount = table.Columns.size(); + + std::vector<ui32> columnIds(columnCount); + std::iota(columnIds.begin(), columnIds.end(), 1); + + Y_ABORT_UNLESS(values.size() == columnCount); + + TVector<TCell> cells; + for (ui32 col = 0; col < columnCount; ++col) + cells.emplace_back(TCell((const char*)&values[col], sizeof(ui32))); + + TSerializedCellMatrix matrix(cells, 1, columnCount); + + auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode); + ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer()); + evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, table.TableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC); + + return Write(*Server->GetRuntime(), Sender, table.TabletId, std::move(evWrite)); + } + struct THangedReturn { ui64 LastPlanStep = 0; TVector<THolder<IEventHandle>> ReadSets; diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp index 9e95182b4f..a209980ad8 100644 --- a/ydb/core/tx/datashard/datashard_ut_write.cpp +++ b/ydb/core/tx/datashard/datashard_ut_write.cpp @@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { const ui32 rowCount = 3; ui64 txId = 100; - Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); + Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All(); @@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { const ui32 rowCount = 3; ui64 txId = 100; - Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); + Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All(); @@ -77,11 +77,8 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer()); evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, {1}, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC); - runtime.SendToPipe(shards[0], sender, evWrite.release(), 0, GetPipeConfigWithRetries()); - auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender); + const auto& record = Write(runtime, sender, shards[0], std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST); - const auto& record = ev->Get()->Record; - UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST); UNIT_ASSERT_VALUES_EQUAL(record.GetIssues().size(), 1); UNIT_ASSERT(record.GetIssues(0).message().Contains("Operation [0:100] writes key of 1049601 bytes which exceeds limit 1049600 bytes")); } @@ -94,7 +91,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { const ui32 rowCount = 3; ui64 txId = 100; - Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE); + Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE); auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All(); diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp index 190a197a3e..06d56c4fff 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp @@ -1843,13 +1843,13 @@ std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKik return evWrite; } -NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus) +NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus) { - auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount); + auto txMode = request->Record.GetTxMode(); runtime.SendToPipe(shardId, sender, request.release(), 0, GetPipeConfigWithRetries()); auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender); - auto status = ev->Get()->Record.GetStatus(); + auto resultRecord = ev->Get()->Record; if (expectedStatus == NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED) { switch (txMode) { @@ -1866,9 +1866,15 @@ NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId break; } } - UNIT_ASSERT_C(status == expectedStatus, "Status: " << ev->Get()->Record.GetStatus() << " Issues: " << ev->Get()->Record.GetIssues()); + UNIT_ASSERT_C(resultRecord.GetStatus() == expectedStatus, "Status: " << resultRecord.GetStatus() << " Issues: " << resultRecord.GetIssues()); - return ev->Get()->Record; + return resultRecord; +} + +NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus) +{ + auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount); + return Write(runtime, sender, shardId, std::move(request), expectedStatus); } void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values) diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h index 613b7a86e4..2f659a87e1 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h @@ -710,7 +710,8 @@ void ExecSQL(Tests::TServer::TPtr server, bool dml = true, Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS); -NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED); +NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED); +NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED); void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values); |