diff options
author | eivanov89 <eivanov89@ydb.tech> | 2023-02-10 21:19:43 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2023-02-10 21:19:43 +0300 |
commit | 000ba3aa252e9317251b1f7555b9262ed6f6526f (patch) | |
tree | 5e852e4b15d22777487e96f3fceb9ef4523ec0cf | |
parent | fd8f1779399d48fb3dfe771c80459441c300dc32 (diff) | |
download | ydb-000ba3aa252e9317251b1f7555b9262ed6f6526f.tar.gz |
fix issue with static base request to mkql, read iterator and kqp select consume less memory and both use random keys
-rw-r--r-- | ydb/core/load_test/ut_ycsb.cpp | 61 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp | 31 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/kqp_select.cpp | 24 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/test_load_read_iterator.cpp | 50 |
4 files changed, 120 insertions, 46 deletions
diff --git a/ydb/core/load_test/ut_ycsb.cpp b/ydb/core/load_test/ut_ycsb.cpp index 228d73cd91a..3342f0bf4a7 100644 --- a/ydb/core/load_test/ut_ycsb.cpp +++ b/ydb/core/load_test/ut_ycsb.cpp @@ -703,10 +703,42 @@ Y_UNIT_TEST_SUITE(ReadLoad) { helper.CheckKeys(0, expectedRowCount); } + Y_UNIT_TEST(ShouldReadIterateMoreThanRows) { + // 10 rows, but ask to read 1000 + TTestHelper helper; + + const ui64 expectedRowCount = 10; + const ui64 expectedReadCount = 1000; + + std::unique_ptr<TEvDataShardLoad::TEvYCSBTestLoadRequest> request(new TEvDataShardLoad::TEvYCSBTestLoadRequest()); + auto& record = request->Record; + auto& command = *record.MutableReadIteratorStart(); + + command.AddChunks(0); + command.AddChunks(1); + command.AddChunks(10); + + command.AddInflights(1); + command.SetRowCount(expectedRowCount); + command.SetReadCount(expectedReadCount); + + auto& setupTable = *record.MutableTableSetup(); + setupTable.SetWorkingDir("/Root"); + setupTable.SetTableName("usertable"); + + auto result = helper.RunTestLoad(std::move(request)); + + UNIT_ASSERT_VALUES_EQUAL(result->JsonResult["subtests"].GetInteger(), 4); + UNIT_ASSERT_VALUES_EQUAL(result->JsonResult["oks"].GetInteger(), (3 * expectedRowCount + expectedReadCount)); + + // sanity check that there was data in table + helper.CheckKeys(0, expectedRowCount); + } + Y_UNIT_TEST(ShouldReadKqp) { TTestHelper helper; - const ui64 expectedRowCount = 1000; + const ui64 expectedRowCount = 100; std::unique_ptr<TEvDataShardLoad::TEvYCSBTestLoadRequest> request(new TEvDataShardLoad::TEvYCSBTestLoadRequest()); auto& record = request->Record; @@ -727,6 +759,33 @@ Y_UNIT_TEST_SUITE(ReadLoad) { helper.CheckKeys(0, expectedRowCount); } + Y_UNIT_TEST(ShouldReadKqpMoreThanRows) { + // 10 rows, but ask to read 100 + TTestHelper helper; + + const ui64 expectedRowCount = 10; + const ui64 expectedReadCount = 100; + + std::unique_ptr<TEvDataShardLoad::TEvYCSBTestLoadRequest> request(new TEvDataShardLoad::TEvYCSBTestLoadRequest()); + auto& record = request->Record; + auto& command = *record.MutableReadKqpStart(); + + command.AddInflights(10); + command.SetRowCount(expectedRowCount); + command.SetReadCount(expectedReadCount); + + auto& setupTable = *record.MutableTableSetup(); + setupTable.SetWorkingDir("/Root"); + setupTable.SetTableName("usertable"); + + auto result = helper.RunTestLoad(std::move(request)); + + UNIT_ASSERT_VALUES_EQUAL(result->JsonResult["oks"].GetInteger(), (10 * expectedReadCount)); + + // sanity check that there was data in table + helper.CheckKeys(0, expectedRowCount); + } + } // Y_UNIT_TEST_SUITE(ReadLoad) } // namespace NKikimr diff --git a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp index ca95d2a1725..e9e617c4a46 100644 --- a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp +++ b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp @@ -60,24 +60,22 @@ TUploadRequest GenerateBulkRowRequest(ui64 tableId, ui64 keyStart, ui64 n) { } TUploadRequest GenerateMkqlRowRequest(ui64 /* tableId */, ui64 keyNum, const TString& table) { - static TString programWithoutKey; + TString programWithoutKey; - if (!programWithoutKey) { - TString fields; - for (size_t i = 0; i < 10; ++i) { - fields += Sprintf("'('field%lu (String '%s))", i, Value.data()); - } - TString rowUpd = "(let upd_ '(" + fields + "))"; + TString fields; + for (size_t i = 0; i < 10; ++i) { + fields += Sprintf("'('field%lu (String '%s))", i, Value.data()); + } + TString rowUpd = "(let upd_ '(" + fields + "))"; - programWithoutKey = rowUpd; + programWithoutKey = rowUpd; - programWithoutKey += Sprintf(R"( - (let ret_ (AsList - (UpdateRow '__user__%s row1_ upd_ - ))) - (return ret_) - ))", table.c_str()); - } + programWithoutKey += Sprintf(R"( + (let ret_ (AsList + (UpdateRow '__user__%s row1_ upd_ + ))) + (return ret_) + ))", table.c_str()); TString key = GetKey(keyNum); @@ -154,7 +152,8 @@ public: void Bootstrap(const TActorContext& ctx) { LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id - << " TUpsertActor Bootstrap called: " << ConfingString << " with type# " << int(RequestType)); + << " TUpsertActor Bootstrap called: " << ConfingString << " with type# " << int(RequestType) + << ", target# " << Target.DebugString()); Become(&TUpsertActor::StateFunc); Connect(ctx); diff --git a/ydb/core/load_test/ycsb/kqp_select.cpp b/ydb/core/load_test/ycsb/kqp_select.cpp index ac4613c2d92..f9ad6128a77 100644 --- a/ydb/core/load_test/ycsb/kqp_select.cpp +++ b/ydb/core/load_test/ycsb/kqp_select.cpp @@ -17,6 +17,8 @@ #include <google/protobuf/text_format.h> +#include <random> + // * Scheme is hardcoded and it is like default YCSB setup: // 1 Text "id" column, 10 Bytes "field0" - "field9" columns // * row is ~ 1 KB, keys are like user1000385178204227360 @@ -88,8 +90,6 @@ class TKqpSelectActor : public TActorBootstrapped<TKqpSelectActor> { const TString Database; const TString TableName; const TVector<TString>& Keys; - const size_t FromKey; - size_t CurrentKey = 0; const size_t ReadCount; const bool Infinite; @@ -98,6 +98,8 @@ class TKqpSelectActor : public TActorBootstrapped<TKqpSelectActor> { size_t KeysRead = 0; + std::default_random_engine Rng; + TInstant StartTs; TInstant EndTs; @@ -109,7 +111,6 @@ public: TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const TSubLoadId& id, const TVector<TString>& keys, - size_t fromKey, size_t readCount, bool infinite) : Target(target) @@ -118,8 +119,6 @@ public: , Database(Target.GetWorkingDir()) , TableName(Target.GetTableName()) , Keys(keys) - , FromKey(fromKey) - , CurrentKey(fromKey) , ReadCount(readCount) , Infinite(infinite) { @@ -130,6 +129,7 @@ public: LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id << " Bootstrap called"); + Rng.seed(SelfId().Hash()); KqpProxyId = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); Become(&TKqpSelectActor::StateFunc); @@ -159,26 +159,25 @@ private: } void ReadRow(const TActorContext &ctx) { - auto request = GenerateSelectRequest(Database, TableName, Keys[CurrentKey]); + auto index = Rng() % Keys.size(); + const auto& key = Keys[index]; + + auto request = GenerateSelectRequest(Database, TableName, key); request->Record.MutableRequest()->SetSessionId(Session); LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id - << " send request# " << CurrentKey + << " send request# " << KeysRead << " to proxy# " << KqpProxyId << ": " << request->ToString()); ctx.Send(KqpProxyId, request.release()); - ++CurrentKey; ++KeysRead; } void OnRequestDone(const TActorContext& ctx) { if (Infinite && KeysRead == ReadCount) { KeysRead = 0; - CurrentKey = FromKey; } - CurrentKey = CurrentKey % Keys.size(); - if (KeysRead < ReadCount) { ReadRow(ctx); } else { @@ -242,7 +241,7 @@ private: TStringStream str; HTML(str) { str << "TKqpSelectActor# " << Id << " started on " << StartTs - << " sent " << CurrentKey << " out of " << ReadCount; + << " sent " << KeysRead << " out of " << ReadCount; TInstant ts = EndTs ? EndTs : TInstant::Now(); auto delta = ts - StartTs; auto throughput = ReadCount * 1000 / (delta.MilliSeconds() ? delta.MilliSeconds() : 1); @@ -437,7 +436,6 @@ private: Counters, subId, Keys, - 0, // keyFrom ReadCount, Config.GetInfinite()); Actors.emplace_back(ctx.Register(kqpActor)); diff --git a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp index 4d467bbe8d1..a3803dcbd5c 100644 --- a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp +++ b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp @@ -15,7 +15,6 @@ #include <google/protobuf/text_format.h> -#include <algorithm> #include <random> // * Scheme is hardcoded and it is like default YCSB setup: @@ -37,16 +36,20 @@ class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> { const TActorId Parent; const TSubLoadId Id; + const TVector<TOwnedCellVec>& Points; + const ui64 ReadCount = 0; + const bool Infinite; + + ui64 PointsRead = 0; + + std::default_random_engine Rng; + TActorId Pipe; bool WasConnected = false; ui64 ReconnectLimit = RECONNECT_LIMIT; TInstant StartTs; // actor started to send requests - TVector<TOwnedCellVec> Points; - ui64 ReadCount = 0; - const bool Infinite; - size_t CurrentPoint = 0; THPTimer RequestTimer; TVector<TDuration> RequestTimes; @@ -77,11 +80,7 @@ public: Become(&TReadIteratorPoints::StateFunc); - auto rng = std::default_random_engine {}; - rng.seed(SelfId().Hash()); - - std::shuffle(Points.begin(), Points.end(), rng); - Points.resize(ReadCount); + Rng.seed(SelfId().Hash()); Connect(ctx); } @@ -139,20 +138,36 @@ private: } void SendRead(const TActorContext &ctx) { - Y_VERIFY(CurrentPoint < Points.size()); + auto index = Rng() % Points.size(); + + const auto& currentKeyCells = Points[index]; + + if (currentKeyCells.size() != 1) { + TStringStream ss; + ss << "Wrong keyNum: " << PointsRead << " with cells count: " << currentKeyCells.size(); + return StopWithError(ctx, ss.Str()); + } auto request = std::make_unique<TEvDataShard::TEvRead>(); request->Record = BaseRequest->Record; - AddKeyQuery(*request, Points[CurrentPoint++]); + AddKeyQuery(*request, currentKeyCells); + + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id + << " sends request# " << PointsRead << ": " << request->ToString()); RequestTimer.Reset(); NTabletPipe::SendData(ctx, Pipe, request.release()); + + ++PointsRead; } void Handle(const TEvDataShard::TEvReadResult::TPtr& ev, const TActorContext& ctx) { const auto* msg = ev->Get(); const auto& record = msg->Record; + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id + << " received from " << ev->Sender << ": " << msg->ToString()); + if (record.HasStatus() && record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { TStringStream ss; ss << "Failed to read from ds# " << TabletId << ", code# " << record.GetStatus().GetCode(); @@ -170,16 +185,19 @@ private: } if (msg->GetRowsCount() != 1) { - return StopWithError(ctx, "Empty reply with data"); + TStringStream ss; + ss << "Wrong reply with data, rows: " << msg->GetRowsCount(); + + return StopWithError(ctx, ss.Str()); } RequestTimes.push_back(TDuration::Seconds(RequestTimer.Passed())); - if (Infinite && CurrentPoint >= Points.size()) { - CurrentPoint = 0; + if (Infinite && PointsRead >= ReadCount) { + PointsRead = 0; } - if (CurrentPoint < Points.size()) { + if (PointsRead < ReadCount) { SendRead(ctx); return; } |