aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2023-02-10 21:19:43 +0300
committereivanov89 <eivanov89@ydb.tech>2023-02-10 21:19:43 +0300
commit000ba3aa252e9317251b1f7555b9262ed6f6526f (patch)
tree5e852e4b15d22777487e96f3fceb9ef4523ec0cf
parentfd8f1779399d48fb3dfe771c80459441c300dc32 (diff)
downloadydb-000ba3aa252e9317251b1f7555b9262ed6f6526f.tar.gz
fix issue with static base request to mkql, read iterator and kqp select consume less memory and both use random keys
-rw-r--r--ydb/core/load_test/ut_ycsb.cpp61
-rw-r--r--ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp31
-rw-r--r--ydb/core/load_test/ycsb/kqp_select.cpp24
-rw-r--r--ydb/core/load_test/ycsb/test_load_read_iterator.cpp50
4 files changed, 120 insertions, 46 deletions
diff --git a/ydb/core/load_test/ut_ycsb.cpp b/ydb/core/load_test/ut_ycsb.cpp
index 228d73cd91a..3342f0bf4a7 100644
--- a/ydb/core/load_test/ut_ycsb.cpp
+++ b/ydb/core/load_test/ut_ycsb.cpp
@@ -703,10 +703,42 @@ Y_UNIT_TEST_SUITE(ReadLoad) {
helper.CheckKeys(0, expectedRowCount);
}
+ Y_UNIT_TEST(ShouldReadIterateMoreThanRows) {
+ // 10 rows, but ask to read 1000
+ TTestHelper helper;
+
+ const ui64 expectedRowCount = 10;
+ const ui64 expectedReadCount = 1000;
+
+ std::unique_ptr<TEvDataShardLoad::TEvYCSBTestLoadRequest> request(new TEvDataShardLoad::TEvYCSBTestLoadRequest());
+ auto& record = request->Record;
+ auto& command = *record.MutableReadIteratorStart();
+
+ command.AddChunks(0);
+ command.AddChunks(1);
+ command.AddChunks(10);
+
+ command.AddInflights(1);
+ command.SetRowCount(expectedRowCount);
+ command.SetReadCount(expectedReadCount);
+
+ auto& setupTable = *record.MutableTableSetup();
+ setupTable.SetWorkingDir("/Root");
+ setupTable.SetTableName("usertable");
+
+ auto result = helper.RunTestLoad(std::move(request));
+
+ UNIT_ASSERT_VALUES_EQUAL(result->JsonResult["subtests"].GetInteger(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(result->JsonResult["oks"].GetInteger(), (3 * expectedRowCount + expectedReadCount));
+
+ // sanity check that there was data in table
+ helper.CheckKeys(0, expectedRowCount);
+ }
+
Y_UNIT_TEST(ShouldReadKqp) {
TTestHelper helper;
- const ui64 expectedRowCount = 1000;
+ const ui64 expectedRowCount = 100;
std::unique_ptr<TEvDataShardLoad::TEvYCSBTestLoadRequest> request(new TEvDataShardLoad::TEvYCSBTestLoadRequest());
auto& record = request->Record;
@@ -727,6 +759,33 @@ Y_UNIT_TEST_SUITE(ReadLoad) {
helper.CheckKeys(0, expectedRowCount);
}
+ Y_UNIT_TEST(ShouldReadKqpMoreThanRows) {
+ // 10 rows, but ask to read 100
+ TTestHelper helper;
+
+ const ui64 expectedRowCount = 10;
+ const ui64 expectedReadCount = 100;
+
+ std::unique_ptr<TEvDataShardLoad::TEvYCSBTestLoadRequest> request(new TEvDataShardLoad::TEvYCSBTestLoadRequest());
+ auto& record = request->Record;
+ auto& command = *record.MutableReadKqpStart();
+
+ command.AddInflights(10);
+ command.SetRowCount(expectedRowCount);
+ command.SetReadCount(expectedReadCount);
+
+ auto& setupTable = *record.MutableTableSetup();
+ setupTable.SetWorkingDir("/Root");
+ setupTable.SetTableName("usertable");
+
+ auto result = helper.RunTestLoad(std::move(request));
+
+ UNIT_ASSERT_VALUES_EQUAL(result->JsonResult["oks"].GetInteger(), (10 * expectedReadCount));
+
+ // sanity check that there was data in table
+ helper.CheckKeys(0, expectedRowCount);
+ }
+
} // Y_UNIT_TEST_SUITE(ReadLoad)
} // namespace NKikimr
diff --git a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
index ca95d2a1725..e9e617c4a46 100644
--- a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
+++ b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
@@ -60,24 +60,22 @@ TUploadRequest GenerateBulkRowRequest(ui64 tableId, ui64 keyStart, ui64 n) {
}
TUploadRequest GenerateMkqlRowRequest(ui64 /* tableId */, ui64 keyNum, const TString& table) {
- static TString programWithoutKey;
+ TString programWithoutKey;
- if (!programWithoutKey) {
- TString fields;
- for (size_t i = 0; i < 10; ++i) {
- fields += Sprintf("'('field%lu (String '%s))", i, Value.data());
- }
- TString rowUpd = "(let upd_ '(" + fields + "))";
+ TString fields;
+ for (size_t i = 0; i < 10; ++i) {
+ fields += Sprintf("'('field%lu (String '%s))", i, Value.data());
+ }
+ TString rowUpd = "(let upd_ '(" + fields + "))";
- programWithoutKey = rowUpd;
+ programWithoutKey = rowUpd;
- programWithoutKey += Sprintf(R"(
- (let ret_ (AsList
- (UpdateRow '__user__%s row1_ upd_
- )))
- (return ret_)
- ))", table.c_str());
- }
+ programWithoutKey += Sprintf(R"(
+ (let ret_ (AsList
+ (UpdateRow '__user__%s row1_ upd_
+ )))
+ (return ret_)
+ ))", table.c_str());
TString key = GetKey(keyNum);
@@ -154,7 +152,8 @@ public:
void Bootstrap(const TActorContext& ctx) {
LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
- << " TUpsertActor Bootstrap called: " << ConfingString << " with type# " << int(RequestType));
+ << " TUpsertActor Bootstrap called: " << ConfingString << " with type# " << int(RequestType)
+ << ", target# " << Target.DebugString());
Become(&TUpsertActor::StateFunc);
Connect(ctx);
diff --git a/ydb/core/load_test/ycsb/kqp_select.cpp b/ydb/core/load_test/ycsb/kqp_select.cpp
index ac4613c2d92..f9ad6128a77 100644
--- a/ydb/core/load_test/ycsb/kqp_select.cpp
+++ b/ydb/core/load_test/ycsb/kqp_select.cpp
@@ -17,6 +17,8 @@
#include <google/protobuf/text_format.h>
+#include <random>
+
// * Scheme is hardcoded and it is like default YCSB setup:
// 1 Text "id" column, 10 Bytes "field0" - "field9" columns
// * row is ~ 1 KB, keys are like user1000385178204227360
@@ -88,8 +90,6 @@ class TKqpSelectActor : public TActorBootstrapped<TKqpSelectActor> {
const TString Database;
const TString TableName;
const TVector<TString>& Keys;
- const size_t FromKey;
- size_t CurrentKey = 0;
const size_t ReadCount;
const bool Infinite;
@@ -98,6 +98,8 @@ class TKqpSelectActor : public TActorBootstrapped<TKqpSelectActor> {
size_t KeysRead = 0;
+ std::default_random_engine Rng;
+
TInstant StartTs;
TInstant EndTs;
@@ -109,7 +111,6 @@ public:
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
const TSubLoadId& id,
const TVector<TString>& keys,
- size_t fromKey,
size_t readCount,
bool infinite)
: Target(target)
@@ -118,8 +119,6 @@ public:
, Database(Target.GetWorkingDir())
, TableName(Target.GetTableName())
, Keys(keys)
- , FromKey(fromKey)
- , CurrentKey(fromKey)
, ReadCount(readCount)
, Infinite(infinite)
{
@@ -130,6 +129,7 @@ public:
LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id
<< " Bootstrap called");
+ Rng.seed(SelfId().Hash());
KqpProxyId = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
Become(&TKqpSelectActor::StateFunc);
@@ -159,26 +159,25 @@ private:
}
void ReadRow(const TActorContext &ctx) {
- auto request = GenerateSelectRequest(Database, TableName, Keys[CurrentKey]);
+ auto index = Rng() % Keys.size();
+ const auto& key = Keys[index];
+
+ auto request = GenerateSelectRequest(Database, TableName, key);
request->Record.MutableRequest()->SetSessionId(Session);
LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id
- << " send request# " << CurrentKey
+ << " send request# " << KeysRead
<< " to proxy# " << KqpProxyId << ": " << request->ToString());
ctx.Send(KqpProxyId, request.release());
- ++CurrentKey;
++KeysRead;
}
void OnRequestDone(const TActorContext& ctx) {
if (Infinite && KeysRead == ReadCount) {
KeysRead = 0;
- CurrentKey = FromKey;
}
- CurrentKey = CurrentKey % Keys.size();
-
if (KeysRead < ReadCount) {
ReadRow(ctx);
} else {
@@ -242,7 +241,7 @@ private:
TStringStream str;
HTML(str) {
str << "TKqpSelectActor# " << Id << " started on " << StartTs
- << " sent " << CurrentKey << " out of " << ReadCount;
+ << " sent " << KeysRead << " out of " << ReadCount;
TInstant ts = EndTs ? EndTs : TInstant::Now();
auto delta = ts - StartTs;
auto throughput = ReadCount * 1000 / (delta.MilliSeconds() ? delta.MilliSeconds() : 1);
@@ -437,7 +436,6 @@ private:
Counters,
subId,
Keys,
- 0, // keyFrom
ReadCount,
Config.GetInfinite());
Actors.emplace_back(ctx.Register(kqpActor));
diff --git a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
index 4d467bbe8d1..a3803dcbd5c 100644
--- a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
+++ b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
@@ -15,7 +15,6 @@
#include <google/protobuf/text_format.h>
-#include <algorithm>
#include <random>
// * Scheme is hardcoded and it is like default YCSB setup:
@@ -37,16 +36,20 @@ class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> {
const TActorId Parent;
const TSubLoadId Id;
+ const TVector<TOwnedCellVec>& Points;
+ const ui64 ReadCount = 0;
+ const bool Infinite;
+
+ ui64 PointsRead = 0;
+
+ std::default_random_engine Rng;
+
TActorId Pipe;
bool WasConnected = false;
ui64 ReconnectLimit = RECONNECT_LIMIT;
TInstant StartTs; // actor started to send requests
- TVector<TOwnedCellVec> Points;
- ui64 ReadCount = 0;
- const bool Infinite;
- size_t CurrentPoint = 0;
THPTimer RequestTimer;
TVector<TDuration> RequestTimes;
@@ -77,11 +80,7 @@ public:
Become(&TReadIteratorPoints::StateFunc);
- auto rng = std::default_random_engine {};
- rng.seed(SelfId().Hash());
-
- std::shuffle(Points.begin(), Points.end(), rng);
- Points.resize(ReadCount);
+ Rng.seed(SelfId().Hash());
Connect(ctx);
}
@@ -139,20 +138,36 @@ private:
}
void SendRead(const TActorContext &ctx) {
- Y_VERIFY(CurrentPoint < Points.size());
+ auto index = Rng() % Points.size();
+
+ const auto& currentKeyCells = Points[index];
+
+ if (currentKeyCells.size() != 1) {
+ TStringStream ss;
+ ss << "Wrong keyNum: " << PointsRead << " with cells count: " << currentKeyCells.size();
+ return StopWithError(ctx, ss.Str());
+ }
auto request = std::make_unique<TEvDataShard::TEvRead>();
request->Record = BaseRequest->Record;
- AddKeyQuery(*request, Points[CurrentPoint++]);
+ AddKeyQuery(*request, currentKeyCells);
+
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
+ << " sends request# " << PointsRead << ": " << request->ToString());
RequestTimer.Reset();
NTabletPipe::SendData(ctx, Pipe, request.release());
+
+ ++PointsRead;
}
void Handle(const TEvDataShard::TEvReadResult::TPtr& ev, const TActorContext& ctx) {
const auto* msg = ev->Get();
const auto& record = msg->Record;
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
+ << " received from " << ev->Sender << ": " << msg->ToString());
+
if (record.HasStatus() && record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
TStringStream ss;
ss << "Failed to read from ds# " << TabletId << ", code# " << record.GetStatus().GetCode();
@@ -170,16 +185,19 @@ private:
}
if (msg->GetRowsCount() != 1) {
- return StopWithError(ctx, "Empty reply with data");
+ TStringStream ss;
+ ss << "Wrong reply with data, rows: " << msg->GetRowsCount();
+
+ return StopWithError(ctx, ss.Str());
}
RequestTimes.push_back(TDuration::Seconds(RequestTimer.Passed()));
- if (Infinite && CurrentPoint >= Points.size()) {
- CurrentPoint = 0;
+ if (Infinite && PointsRead >= ReadCount) {
+ PointsRead = 0;
}
- if (CurrentPoint < Points.size()) {
+ if (PointsRead < ReadCount) {
SendRead(ctx);
return;
}