aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <145343289+azevaykin@users.noreply.github.com>2023-12-27 17:38:50 +0300
committerGitHub <noreply@github.com>2023-12-27 15:38:50 +0100
commit75470f709400ae7c2fac349fbc8d1aa7efc92dfa (patch)
treef98e79a39adfa7ce701bc827c32946ae2c3f9525
parentf59afadcbb8cec4f6a50cabf0ee4cf8507dcf3c0 (diff)
downloadydb-75470f709400ae7c2fac349fbc8d1aa7efc92dfa.tar.gz
ut_read_iterator + EvWrite (#742)
ut_read_iterator + EvWrite
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp92
-rw-r--r--ydb/core/tx/datashard/datashard_ut_write.cpp11
-rw-r--r--ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp16
-rw-r--r--ydb/core/tx/datashard/ut_common/datashard_ut_common.h3
4 files changed, 84 insertions, 38 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index 635f5d422a..a34be8bfc2 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -10,6 +10,9 @@
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/tx_proxy/read_table.h>
+#include <ydb/core/tx/data_events/events.h>
+#include <ydb/core/tx/data_events/payload_helper.h>
+
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
#include <algorithm>
@@ -25,46 +28,52 @@ namespace {
using TCellVec = std::vector<TCell>;
-void CreateTable(Tests::TServer::TPtr server,
+TVector<TShardedTableOptions::TColumn> GetColumns() {
+ TVector<TShardedTableOptions::TColumn> columns = {
+ {"key1", "Uint32", true, false},
+ {"key2", "Uint32", true, false},
+ {"key3", "Uint32", true, false},
+ {"value", "Uint32", false, false}};
+
+ return columns;
+}
+
+TVector<TShardedTableOptions::TColumn> GetMoviesColumns() {
+ TVector<TShardedTableOptions::TColumn> columns = {
+ {"id", "Uint32", true, false},
+ {"title", "String", false, false},
+ {"rating", "Uint32", false, false}};
+
+ return columns;
+}
+
+std::tuple<TVector<ui64>, ui64> CreateTable(Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
const TString &name,
bool withFollower = false,
ui64 shardCount = 1)
{
- TVector<TShardedTableOptions::TColumn> columns = {
- {"key1", "Uint32", true, false},
- {"key2", "Uint32", true, false},
- {"key3", "Uint32", true, false},
- {"value", "Uint32", false, false}
- };
-
auto opts = TShardedTableOptions()
.Shards(shardCount)
- .Columns(columns);
+ .Columns(GetColumns());
if (withFollower)
opts.Followers(1);
- CreateShardedTable(server, sender, root, name, opts);
+ return CreateShardedTable(server, sender, root, name, opts);
}
-void CreateMoviesTable(Tests::TServer::TPtr server,
+std::tuple<TVector<ui64>, ui64> CreateMoviesTable(Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
const TString &name)
{
- TVector<TShardedTableOptions::TColumn> columns = {
- {"id", "Uint32", true, false},
- {"title", "String", false, false},
- {"rating", "Uint32", false, false}
- };
-
auto opts = TShardedTableOptions()
.Shards(1)
- .Columns(columns);
+ .Columns(GetMoviesColumns());
- CreateShardedTable(server, sender, root, name, opts);
+ return CreateShardedTable(server, sender, root, name, opts);
}
struct TRowWriter : public NArrow::IRowWriter {
@@ -308,11 +317,14 @@ void AddRangeQuery(
struct TTableInfo {
TString Name;
+ ui64 TableId;
ui64 TabletId;
ui64 OwnerId;
NKikimrTxDataShard::TEvGetInfoResponse::TUserTable UserTable;
TActorId ClientId;
+
+ TVector<TShardedTableOptions::TColumn> Columns;
};
struct TTestHelper {
@@ -345,7 +357,7 @@ struct TTestHelper {
{
auto& table1 = Tables["table-1"];
table1.Name = "table-1";
- CreateTable(Server, Sender, "/Root", "table-1", WithFollower, ShardCount);
+ auto [shards, tableId] = CreateTable(Server, Sender, "/Root", "table-1", WithFollower, ShardCount);
ExecSQL(Server, Sender, R"(
UPSERT INTO `/Root/table-1`
(key1, key2, key3, value)
@@ -360,7 +372,7 @@ struct TTestHelper {
(11, 11, 11, 1111);
)");
- auto shards = GetTableShards(Server, Sender, "/Root/table-1");
+ table1.TableId = tableId;
table1.TabletId = shards.at(0);
auto [tables, ownerId] = GetTables(Server, table1.TabletId);
@@ -368,12 +380,14 @@ struct TTestHelper {
table1.UserTable = tables["table-1"];
table1.ClientId = runtime.ConnectToPipe(table1.TabletId, Sender, 0, GetTestPipeConfig());
+
+ table1.Columns = GetColumns();
}
{
auto& table2 = Tables["movies"];
table2.Name = "movies";
- CreateMoviesTable(Server, Sender, "/Root", "movies");
+ auto [shards, tableId] = CreateMoviesTable(Server, Sender, "/Root", "movies");
ExecSQL(Server, Sender, R"(
UPSERT INTO `/Root/movies`
(id, title, rating)
@@ -383,7 +397,7 @@ struct TTestHelper {
(3, "Hard die", 8);
)");
- auto shards = GetTableShards(Server, Sender, "/Root/movies");
+ table2.TableId = tableId;
table2.TabletId = shards.at(0);
auto [tables, ownerId] = GetTables(Server, table2.TabletId);
@@ -391,14 +405,16 @@ struct TTestHelper {
table2.UserTable = tables["movies"];
table2.ClientId = runtime.ConnectToPipe(table2.TabletId, Sender, 0, GetTestPipeConfig());
+
+ table2.Columns = GetMoviesColumns();
}
{
auto& table3 = Tables["table-1-many"];
table3.Name = "table-1-many";
- CreateTable(Server, Sender, "/Root", "table-1-many", WithFollower, ShardCount);
+ auto [shards, tableId] = CreateTable(Server, Sender, "/Root", "table-1-many", WithFollower, ShardCount);
- auto shards = GetTableShards(Server, Sender, "/Root/table-1-many");
+ table3.TableId = tableId;
table3.TabletId = shards.at(0);
auto [tables, ownerId] = GetTables(Server, table3.TabletId);
@@ -406,6 +422,8 @@ struct TTestHelper {
table3.UserTable = tables["table-1-many"];
table3.ClientId = runtime.ConnectToPipe(table3.TabletId, Sender, 0, GetTestPipeConfig());
+
+ table3.Columns = GetColumns();
}
}
@@ -717,6 +735,30 @@ struct TTestHelper {
UNIT_ASSERT_VALUES_EQUAL(rowsRead, Min(rowCount, limit));
}
+ NKikimrDataEvents::TEvWriteResult WriteRow(const TString& tableName, ui64 txId, const TVector<ui32>& values, NKikimrDataEvents::TEvWrite::ETxMode txMode = NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE) {
+ const auto& table = Tables[tableName];
+
+ auto opts = TShardedTableOptions().Columns(table.Columns);
+ size_t columnCount = table.Columns.size();
+
+ std::vector<ui32> columnIds(columnCount);
+ std::iota(columnIds.begin(), columnIds.end(), 1);
+
+ Y_ABORT_UNLESS(values.size() == columnCount);
+
+ TVector<TCell> cells;
+ for (ui32 col = 0; col < columnCount; ++col)
+ cells.emplace_back(TCell((const char*)&values[col], sizeof(ui32)));
+
+ TSerializedCellMatrix matrix(cells, 1, columnCount);
+
+ auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
+ ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
+ evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, table.TableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
+
+ return Write(*Server->GetRuntime(), Sender, table.TabletId, std::move(evWrite));
+ }
+
struct THangedReturn {
ui64 LastPlanStep = 0;
TVector<THolder<IEventHandle>> ReadSets;
diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp
index 9e95182b4f..a209980ad8 100644
--- a/ydb/core/tx/datashard/datashard_ut_write.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_write.cpp
@@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
const ui32 rowCount = 3;
ui64 txId = 100;
- Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+ Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
@@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
const ui32 rowCount = 3;
ui64 txId = 100;
- Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+ Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
@@ -77,11 +77,8 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, {1}, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
- runtime.SendToPipe(shards[0], sender, evWrite.release(), 0, GetPipeConfigWithRetries());
- auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
+ const auto& record = Write(runtime, sender, shards[0], std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
- const auto& record = ev->Get()->Record;
- UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
UNIT_ASSERT_VALUES_EQUAL(record.GetIssues().size(), 1);
UNIT_ASSERT(record.GetIssues(0).message().Contains("Operation [0:100] writes key of 1049601 bytes which exceeds limit 1049600 bytes"));
}
@@ -94,7 +91,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
const ui32 rowCount = 3;
ui64 txId = 100;
- Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
+ Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
index 190a197a3e..06d56c4fff 100644
--- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
@@ -1843,13 +1843,13 @@ std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKik
return evWrite;
}
-NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
+NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
{
- auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount);
+ auto txMode = request->Record.GetTxMode();
runtime.SendToPipe(shardId, sender, request.release(), 0, GetPipeConfigWithRetries());
auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
- auto status = ev->Get()->Record.GetStatus();
+ auto resultRecord = ev->Get()->Record;
if (expectedStatus == NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED) {
switch (txMode) {
@@ -1866,9 +1866,15 @@ NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId
break;
}
}
- UNIT_ASSERT_C(status == expectedStatus, "Status: " << ev->Get()->Record.GetStatus() << " Issues: " << ev->Get()->Record.GetIssues());
+ UNIT_ASSERT_C(resultRecord.GetStatus() == expectedStatus, "Status: " << resultRecord.GetStatus() << " Issues: " << resultRecord.GetIssues());
- return ev->Get()->Record;
+ return resultRecord;
+}
+
+NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
+{
+ auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount);
+ return Write(runtime, sender, shardId, std::move(request), expectedStatus);
}
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values)
diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
index 613b7a86e4..2f659a87e1 100644
--- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
@@ -710,7 +710,8 @@ void ExecSQL(Tests::TServer::TPtr server,
bool dml = true,
Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS);
-NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
+NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
+NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values);