aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2023-02-08 12:04:46 +0300
committereivanov89 <eivanov89@ydb.tech>2023-02-08 12:04:46 +0300
commita7f00ce360da75ca0fd9974c4bcd8c5ab91194e6 (patch)
tree64af346fa1884079058969249131b84ef7732afe
parent556d24d5a68509194d4901875a44e745b0a5de38 (diff)
downloadydb-a7f00ce360da75ca0fd9974c4bcd8c5ab91194e6.tar.gz
add ycsb kqp select load actor
-rw-r--r--ydb/core/load_test/CMakeLists.darwin.txt2
-rw-r--r--ydb/core/load_test/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/load_test/CMakeLists.linux.txt2
-rw-r--r--ydb/core/load_test/ut_ycsb.cpp30
-rw-r--r--ydb/core/load_test/ycsb/actors.h7
-rw-r--r--ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp6
-rw-r--r--ydb/core/load_test/ycsb/common.cpp235
-rw-r--r--ydb/core/load_test/ycsb/common.h58
-rw-r--r--ydb/core/load_test/ycsb/defs.h4
-rw-r--r--ydb/core/load_test/ycsb/kqp_select.cpp534
-rw-r--r--ydb/core/load_test/ycsb/kqp_upsert.cpp6
-rw-r--r--ydb/core/load_test/ycsb/test_load_actor.cpp19
-rw-r--r--ydb/core/load_test/ycsb/test_load_read_iterator.cpp234
-rw-r--r--ydb/core/protos/datashard_load.proto4
14 files changed, 893 insertions, 250 deletions
diff --git a/ydb/core/load_test/CMakeLists.darwin.txt b/ydb/core/load_test/CMakeLists.darwin.txt
index 7022fd47dbb..53c7051ec60 100644
--- a/ydb/core/load_test/CMakeLists.darwin.txt
+++ b/ydb/core/load_test/CMakeLists.darwin.txt
@@ -39,7 +39,9 @@ target_sources(ydb-core-load_test PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/load_test/group_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/vdisk_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/info_collector.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/kqp_select.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/kqp_upsert.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/test_load_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
diff --git a/ydb/core/load_test/CMakeLists.linux-aarch64.txt b/ydb/core/load_test/CMakeLists.linux-aarch64.txt
index a7ece616e2b..edf344a5c59 100644
--- a/ydb/core/load_test/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/load_test/CMakeLists.linux-aarch64.txt
@@ -40,7 +40,9 @@ target_sources(ydb-core-load_test PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/load_test/group_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/vdisk_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/info_collector.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/kqp_select.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/kqp_upsert.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/test_load_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
diff --git a/ydb/core/load_test/CMakeLists.linux.txt b/ydb/core/load_test/CMakeLists.linux.txt
index a7ece616e2b..edf344a5c59 100644
--- a/ydb/core/load_test/CMakeLists.linux.txt
+++ b/ydb/core/load_test/CMakeLists.linux.txt
@@ -40,7 +40,9 @@ target_sources(ydb-core-load_test PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/load_test/group_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/vdisk_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/info_collector.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/kqp_select.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/kqp_upsert.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/test_load_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
diff --git a/ydb/core/load_test/ut_ycsb.cpp b/ydb/core/load_test/ut_ycsb.cpp
index b7c0d557e5e..228d73cd91a 100644
--- a/ydb/core/load_test/ut_ycsb.cpp
+++ b/ydb/core/load_test/ut_ycsb.cpp
@@ -1,5 +1,6 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h> // Y_UNIT_TEST_(TWIN|QUAD), Q_
#include <ydb/core/load_test/events.h>
+#include <ydb/core/load_test/ycsb/common.h>
#include <ydb/core/load_test/ycsb/test_load_actor.h>
#include <ydb/core/scheme/scheme_types_defs.h>
#include <ydb/core/scheme/scheme_types_proto.h>
@@ -22,11 +23,6 @@ const TString DefaultTableName = "usertable";
const TString FieldPrefix = "field";
const size_t ValueColumnsCount = 10;
-TString GetKey(size_t n) {
- // user1000385178204227360
- return Sprintf("user%.19lu", n);
-}
-
void InitRoot(Tests::TServer::TPtr server,
TActorId sender) {
server->SetupRootStoragePools(sender);
@@ -707,6 +703,30 @@ Y_UNIT_TEST_SUITE(ReadLoad) {
helper.CheckKeys(0, expectedRowCount);
}
+ Y_UNIT_TEST(ShouldReadKqp) {
+ TTestHelper helper;
+
+ const ui64 expectedRowCount = 1000;
+
+ std::unique_ptr<TEvDataShardLoad::TEvYCSBTestLoadRequest> request(new TEvDataShardLoad::TEvYCSBTestLoadRequest());
+ auto& record = request->Record;
+ auto& command = *record.MutableReadKqpStart();
+
+ command.AddInflights(10);
+ command.SetRowCount(expectedRowCount);
+
+ auto& setupTable = *record.MutableTableSetup();
+ setupTable.SetWorkingDir("/Root");
+ setupTable.SetTableName("usertable");
+
+ auto result = helper.RunTestLoad(std::move(request));
+
+ UNIT_ASSERT_VALUES_EQUAL(result->JsonResult["oks"].GetInteger(), (10 * expectedRowCount));
+
+ // sanity check that there was data in table
+ helper.CheckKeys(0, expectedRowCount);
+ }
+
} // Y_UNIT_TEST_SUITE(ReadLoad)
} // namespace NKikimr
diff --git a/ydb/core/load_test/ycsb/actors.h b/ydb/core/load_test/ycsb/actors.h
index 759bc60f05a..36a160f1bba 100644
--- a/ydb/core/load_test/ycsb/actors.h
+++ b/ydb/core/load_test/ycsb/actors.h
@@ -42,6 +42,13 @@ IActor *CreateReadIteratorActor(
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
const TSubLoadId& id);
+IActor *CreateKqpSelectActor(
+ const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TReadStart& cmd,
+ const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
+ const TActorId& parent,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TSubLoadId& id);
+
class TLoadManagerException : public yexception {
};
diff --git a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
index c10da88adea..574553c5148 100644
--- a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
+++ b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
@@ -1,4 +1,5 @@
#include "actors.h"
+#include "common.h"
#include <ydb/core/base/tablet.h>
#include <ydb/core/base/tablet_pipe.h>
@@ -16,11 +17,6 @@
namespace NKikimr::NDataShardLoad {
-TString GetKey(size_t n) {
- // user1000385178204227360
- return Sprintf("user%.19lu", n);
-}
-
using TUploadRowsRequestPtr = std::unique_ptr<TEvDataShard::TEvUploadRowsRequest>;
namespace {
diff --git a/ydb/core/load_test/ycsb/common.cpp b/ydb/core/load_test/ycsb/common.cpp
new file mode 100644
index 00000000000..8f408a95581
--- /dev/null
+++ b/ydb/core/load_test/ycsb/common.cpp
@@ -0,0 +1,235 @@
+#include "actors.h"
+#include "common.h"
+
+#include <ydb/core/base/tablet.h>
+#include <ydb/core/base/tablet_pipe.h>
+
+#include <library/cpp/monlib/service/pages/templates.h>
+
+#include <google/protobuf/text_format.h>
+
+namespace NKikimr::NDataShardLoad {
+
+namespace {
+
+// TReadIteratorScan
+
+class TReadIteratorScan : public TActorBootstrapped<TReadIteratorScan> {
+ std::unique_ptr<TEvDataShard::TEvRead> Request;
+ const NKikimrTxDataShard::EScanDataFormat Format;
+ const ui64 TabletId;
+ const TActorId Parent;
+ const TSubLoadId Id;
+ const ui64 SampleKeyCount;
+
+ TActorId Pipe;
+ bool WasConnected = false;
+ ui64 ReconnectLimit = 10;
+
+ TInstant StartTs;
+ size_t Oks = 0;
+
+ TVector<TOwnedCellVec> SampledKeys;
+
+public:
+ TReadIteratorScan(TEvDataShard::TEvRead* request,
+ ui64 tablet,
+ const TActorId& parent,
+ const TSubLoadId& id,
+ ui64 sample)
+ : Request(request)
+ , Format(Request->Record.GetResultFormat())
+ , TabletId(tablet)
+ , Parent(parent)
+ , Id(id)
+ , SampleKeyCount(sample)
+ {
+ }
+
+ void Bootstrap(const TActorContext& ctx) {
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << " Bootstrap called, sample# " << SampleKeyCount);
+
+ Become(&TReadIteratorScan::StateFunc);
+ Connect(ctx);
+ }
+
+private:
+ void Connect(const TActorContext &ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << " Connect to# " << TabletId << " called");
+ --ReconnectLimit;
+ if (ReconnectLimit == 0) {
+ TStringStream ss;
+ ss << "Failed to set pipe to " << TabletId;
+ return StopWithError(ctx, ss.Str());
+ }
+ Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId));
+ }
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) {
+ TEvTabletPipe::TEvClientConnected *msg = ev->Get();
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << " Handle TEvClientConnected called, Status# " << msg->Status);
+
+ if (msg->Status != NKikimrProto::OK) {
+ return Connect(ctx);
+ }
+
+ StartTs = TInstant::Now();
+ WasConnected = true;
+ NTabletPipe::SendData(ctx, Pipe, Request.release());
+ }
+
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << " Handle TEvClientDestroyed called");
+
+ // sanity check
+ if (!WasConnected) {
+ return Connect(ctx);
+ }
+
+ return StopWithError(ctx, "broken pipe");
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr, const TActorContext& ctx) {
+ return StopWithError(ctx, "delivery failed");
+ }
+
+ void Handle(const TEvDataShard::TEvReadResult::TPtr& ev, const TActorContext& ctx) {
+ const auto* msg = ev->Get();
+ const auto& record = msg->Record;
+
+ if (record.HasStatus() && record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
+ TStringStream ss;
+ ss << "Failed to read from ds# " << TabletId << ", code# " << record.GetStatus().GetCode();
+ if (record.GetStatus().IssuesSize()) {
+ for (const auto& issue: record.GetStatus().GetIssues()) {
+ ss << ", issue: " << issue;
+ }
+ }
+
+ return StopWithError(ctx, ss.Str());
+ }
+
+ if (Format != NKikimrTxDataShard::CELLVEC) {
+ return StopWithError(ctx, "Unsupported format");
+ }
+
+ Oks += msg->GetRowsCount();
+
+ auto ts = TInstant::Now();
+ auto delta = ts - StartTs;
+
+ if (SampleKeyCount) {
+ for (size_t i = 0; i < msg->GetRowsCount() && SampledKeys.size() < SampleKeyCount; ++i) {
+ SampledKeys.emplace_back(msg->GetCells(i));
+ }
+
+ if (record.GetFinished() || SampledKeys.size() >= SampleKeyCount) {
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << " finished in " << delta
+ << ", sampled# " << SampledKeys.size()
+ << ", iter finished# " << record.GetFinished()
+ << ", oks# " << Oks);
+
+ ctx.Send(Parent, new TEvPrivate::TEvKeys(std::move(SampledKeys)));
+ return Die(ctx);
+ }
+
+ return;
+ } else if (record.GetFinished()) {
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << " finished in " << delta
+ << ", read# " << Oks);
+
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(0);
+ auto& report = *response->Record.MutableReport();
+ report.SetDurationMs(delta.MilliSeconds());
+ report.SetOperationsOK(Oks);
+ report.SetOperationsError(0);
+ ctx.Send(Parent, response.release());
+
+ return Die(ctx);
+ }
+ }
+
+ void StopWithError(const TActorContext& ctx, const TString& reason) {
+ LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << ", stopped with error: " << reason);
+
+ ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(0, reason));
+ NTabletPipe::CloseClient(SelfId(), Pipe);
+ return Die(ctx);
+ }
+
+ void HandlePoison(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << " tablet recieved PoisonPill, going to die");
+
+ // TODO: cancel iterator
+ return Die(ctx);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ CFunc(TEvents::TSystem::PoisonPill, HandlePoison)
+ HFunc(TEvents::TEvUndelivered, Handle)
+ HFunc(TEvTabletPipe::TEvClientConnected, Handle)
+ HFunc(TEvTabletPipe::TEvClientDestroyed, Handle)
+ HFunc(TEvDataShard::TEvReadResult, Handle)
+ )
+};
+
+} // anonymous
+
+TString GetKey(size_t n) {
+ // user1000385178204227360
+ return Sprintf("user%.19lu", n);
+}
+
+TVector<TCell> ToCells(const std::vector<TString> &keys) {
+ TVector<TCell> cells;
+ for (auto &key : keys) {
+ cells.emplace_back(TCell(key.data(), key.size()));
+ }
+ return cells;
+}
+
+void AddRangeQuery(
+ TEvDataShard::TEvRead &request,
+ const std::vector<TString> &from,
+ bool fromInclusive,
+ const std::vector<TString> &to,
+ bool toInclusive)
+{
+ auto fromCells = ToCells(from);
+ auto toCells = ToCells(to);
+
+ // convertion is ugly, but for tests is OK
+ auto fromBuf = TSerializedCellVec::Serialize(fromCells);
+ auto toBuf = TSerializedCellVec::Serialize(toCells);
+
+ request.Ranges.emplace_back(fromBuf, toBuf, fromInclusive, toInclusive);
+}
+
+void AddKeyQuery(
+ TEvDataShard::TEvRead &request,
+ const TOwnedCellVec &key)
+{
+ auto buf = TSerializedCellVec::Serialize(key);
+ request.Keys.emplace_back(buf);
+}
+
+IActor *CreateReadIteratorScan(
+ TEvDataShard::TEvRead* request,
+ ui64 tablet,
+ const TActorId& parent,
+ const TSubLoadId& id,
+ ui64 sample)
+{
+ return new TReadIteratorScan(request, tablet, parent, id, sample);
+}
+
+} // NKikimr::NDataShardLoad
diff --git a/ydb/core/load_test/ycsb/common.h b/ydb/core/load_test/ycsb/common.h
new file mode 100644
index 00000000000..d531eb6fe18
--- /dev/null
+++ b/ydb/core/load_test/ycsb/common.h
@@ -0,0 +1,58 @@
+#pragma once
+
+#include "defs.h"
+#include "test_load_actor.h"
+
+#include <ydb/core/base/events.h>
+#include <ydb/core/tx/datashard/datashard.h>
+
+namespace NKikimr::NDataShardLoad {
+
+TString GetKey(size_t n);
+
+static const TString Value = TString(100, 'x');
+
+struct TEvPrivate {
+ enum EEv {
+ EvKeys = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+ EvPointTimes,
+ EvEnd,
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE));
+
+ struct TEvKeys : public TEventLocal<TEvKeys, EvKeys> {
+ TVector<TOwnedCellVec> Keys;
+
+ TEvKeys(TVector<TOwnedCellVec>&& keys)
+ : Keys(std::move(keys))
+ {
+ }
+ };
+
+ struct TEvPointTimes : public TEventLocal<TEvPointTimes, EvPointTimes> {
+ TVector<TDuration> RequestTimes;
+
+ TEvPointTimes(TVector<TDuration>&& requestTime)
+ : RequestTimes(std::move(requestTime))
+ {
+ }
+ };
+};
+
+TVector<TCell> ToCells(const std::vector<TString> &keys);
+
+void AddRangeQuery(TEvDataShard::TEvRead &request,
+ const std::vector<TString> &from, bool fromInclusive,
+ const std::vector<TString> &to, bool toInclusive);
+
+void AddKeyQuery(TEvDataShard::TEvRead &request, const TOwnedCellVec &key);
+
+IActor *CreateReadIteratorScan(
+ TEvDataShard::TEvRead* request,
+ ui64 tablet,
+ const TActorId& parent,
+ const TSubLoadId& id,
+ ui64 sample);
+
+} // NKikimr::NDataShardLoad
diff --git a/ydb/core/load_test/ycsb/defs.h b/ydb/core/load_test/ycsb/defs.h
index 56b6952963f..fc4889ec7c9 100644
--- a/ydb/core/load_test/ycsb/defs.h
+++ b/ydb/core/load_test/ycsb/defs.h
@@ -11,8 +11,4 @@ namespace NKikimr::NDataShardLoad {
using TUploadRequest = std::unique_ptr<IEventBase>;
using TRequestsVector = std::vector<TUploadRequest>;
-TString GetKey(size_t n);
-
-static const TString Value = TString(100, 'x');
-
} // NKikimr::NDataShardLoad
diff --git a/ydb/core/load_test/ycsb/kqp_select.cpp b/ydb/core/load_test/ycsb/kqp_select.cpp
new file mode 100644
index 00000000000..42589476c16
--- /dev/null
+++ b/ydb/core/load_test/ycsb/kqp_select.cpp
@@ -0,0 +1,534 @@
+#include "actors.h"
+#include "common.h"
+
+#include <ydb/core/base/tablet.h>
+#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/core/kqp/common/kqp.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/ydb_convert/ydb_convert.h>
+
+#include <ydb/public/lib/operation_id/operation_id.h>
+#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
+#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+
+#include <library/cpp/monlib/service/pages/templates.h>
+
+#include <google/protobuf/text_format.h>
+
+// * Scheme is hardcoded and it is like default YCSB setup:
+// 1 Text "id" column, 10 Bytes "field0" - "field9" columns
+// * row is ~ 1 KB, keys are like user1000385178204227360
+
+namespace NKikimr::NDataShardLoad {
+
+namespace {
+
+struct TQueryInfo {
+ TQueryInfo()
+ : Query("")
+ , Params(NYdb::TParamsBuilder().Build())
+ {}
+
+ TQueryInfo(const std::string& query, const NYdb::TParams&& params)
+ : Query(query)
+ , Params(std::move(params))
+ {}
+
+ TString Query;
+ NYdb::TParams Params;
+};
+
+TQueryInfo GenerateSelect(const TString& table, const TString& key) {
+ TStringStream str;
+
+ str << Sprintf(R"__(
+ --!syntax_v1
+
+ DECLARE $key AS Text;
+
+ SELECT * FROM `%s` WHERE id == '$key';
+ )__", table.c_str());
+
+ NYdb::TParamsBuilder paramsBuilder;
+ paramsBuilder.AddParam("$key").Utf8(key).Build();
+ auto params = paramsBuilder.Build();
+
+ return TQueryInfo(str.Str(), std::move(params));
+}
+
+// it's a partial copy-paste from TUpsertActor: logic slightly differs, so that
+// it seems better to have copy-paste rather if/else for different loads
+class TKqpSelectActor : public TActorBootstrapped<TKqpSelectActor> {
+ const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard Target;
+ const TActorId Parent;
+ const TSubLoadId Id;
+ const TString Database;
+
+ TString Session;
+ TRequestsVector Requests;
+ bool Infinite;
+
+ size_t CurrentRequest = 0;
+
+ TInstant StartTs;
+ TInstant EndTs;
+
+ size_t Errors = 0;
+
+public:
+ TKqpSelectActor(const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
+ const TActorId& parent,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TSubLoadId& id,
+ TRequestsVector requests,
+ bool infinite)
+ : Target(target)
+ , Parent(parent)
+ , Id(id)
+ , Database(Target.GetWorkingDir())
+ , Requests(std::move(requests))
+ , Infinite(infinite)
+ {
+ Y_UNUSED(counters);
+ }
+
+ void Bootstrap(const TActorContext& ctx) {
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id
+ << " Bootstrap called");
+
+ Become(&TKqpSelectActor::StateFunc);
+ CreateSession(ctx);
+ }
+
+private:
+ void CreateSession(const TActorContext& ctx) {
+ auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id
+ << " sends event for session creation to proxy: " << kqpProxy.ToString());
+
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
+ ev->Record.MutableRequest()->SetDatabase(Database);
+ Send(kqpProxy, ev.Release());
+ }
+
+ void CloseSession(const TActorContext& ctx) {
+ if (!Session)
+ return;
+
+ auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id
+ << " sends session close query to proxy: " << kqpProxy);
+
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
+ ev->Record.MutableRequest()->SetSessionId(Session);
+ ctx.Send(kqpProxy, ev.Release());
+ }
+
+ void ReadRow(const TActorContext &ctx) {
+ auto* request = static_cast<NKqp::TEvKqp::TEvQueryRequest*>(Requests[CurrentRequest].get());
+ request->Record.MutableRequest()->SetSessionId(Session);
+
+ auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id
+ << " send request# " << CurrentRequest
+ << " to proxy# " << kqpProxy << ": " << request->ToString());
+
+ if (!Infinite) {
+ ctx.Send(kqpProxy, Requests[CurrentRequest].release());
+ } else {
+ auto requestCopy = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
+ requestCopy->Record = request->Record;
+ ctx.Send(kqpProxy, requestCopy.release());
+ }
+
+ ++CurrentRequest;
+ }
+
+ void OnRequestDone(const TActorContext& ctx) {
+ if (Infinite && CurrentRequest >= Requests.size()) {
+ CurrentRequest = 0;
+ }
+
+ if (CurrentRequest < Requests.size()) {
+ ReadRow(ctx);
+ } else {
+ EndTs = TInstant::Now();
+ auto delta = EndTs - StartTs;
+
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Id.SubTag);
+ auto& report = *response->Record.MutableReport();
+ report.SetTag(Id.SubTag);
+ report.SetDurationMs(delta.MilliSeconds());
+ report.SetOperationsOK(Requests.size() - Errors);
+ report.SetOperationsError(Errors);
+
+ ctx.Send(Parent, response.release());
+
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id
+ << " finished in " << delta << ", errors=" << Errors);
+ Die(ctx);
+ }
+ }
+
+ void HandlePoison(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id
+ << " tablet recieved PoisonPill, going to die");
+ CloseSession(ctx);
+ Die(ctx);
+ }
+
+ void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) {
+ auto& response = ev->Get()->Record;
+
+ if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
+ Session = response.GetResponse().GetSessionId();
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id << " session: " << Session);
+ ReadRow(ctx);
+ } else {
+ StopWithError(ctx, "failed to create session: " + ev->Get()->ToString());
+ }
+ }
+
+ void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id
+ << " received from " << ev->Sender << ": " << ev->Get()->Record.DebugString());
+
+ auto& response = ev->Get()->Record.GetRef();
+ if (response.GetYdbStatus() != Ydb::StatusIds_StatusCode_SUCCESS) {
+ ++Errors;
+ }
+
+ OnRequestDone(ctx);
+ }
+
+ void StopWithError(const TActorContext& ctx, const TString& reason) {
+ LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason);
+ ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason));
+ Die(ctx);
+ }
+
+ void Handle(TEvDataShardLoad::TEvTestLoadInfoRequest::TPtr& ev, const TActorContext& ctx) {
+ TStringStream str;
+ HTML(str) {
+ str << "TKqpSelectActor# " << Id << " started on " << StartTs
+ << " sent " << CurrentRequest << " out of " << Requests.size();
+ TInstant ts = EndTs ? EndTs : TInstant::Now();
+ auto delta = ts - StartTs;
+ auto throughput = Requests.size() / delta.Seconds();
+ str << " in " << delta << " (" << throughput << " op/s)"
+ << " errors=" << Errors;
+ }
+
+ ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Id.SubTag, str.Str()));
+ }
+
+ STRICT_STFUNC(StateFunc,
+ CFunc(TEvents::TSystem::PoisonPill, HandlePoison)
+ HFunc(TEvDataShardLoad::TEvTestLoadInfoRequest, Handle)
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle)
+ HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle)
+ )
+};
+
+// creates multiple TKqpSelectActor for inflight > 1 and waits completion
+class TKqpSelectActorMultiSession : public TActorBootstrapped<TKqpSelectActorMultiSession> {
+ const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TReadStart Config;
+ const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard Target;
+ const TActorId Parent;
+ const TSubLoadId Id;
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
+ const TString Database;
+
+ TString ConfingString;
+
+ ui64 ReadCount = 0;
+
+ ui64 TabletId = 0;
+ ui64 TableId = 0;
+ ui64 OwnerId = 0;
+
+ TVector<ui32> KeyColumnIds;
+ TVector<ui32> AllColumnIds;
+
+ TVector<TOwnedCellVec> Keys;
+
+ ui64 LastReadId = 0;
+ ui64 LastSubTag = 0;
+ TVector<TActorId> Actors;
+
+ size_t Inflight = 0;
+
+ TInstant StartTs;
+ TInstant EndTs;
+
+ size_t Oks = 0;
+ size_t Errors = 0;
+
+public:
+ TKqpSelectActorMultiSession(const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TReadStart& cmd,
+ const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
+ const TActorId& parent,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TSubLoadId& id)
+ : Config(cmd)
+ , Target(target)
+ , Parent(parent)
+ , Id(id)
+ , Counters(counters)
+ , Database(target.GetWorkingDir())
+ {
+ Y_UNUSED(counters);
+ google::protobuf::TextFormat::PrintToString(cmd, &ConfingString);
+
+ if (Config.HasReadCount()) {
+ ReadCount = Config.GetReadCount();
+ } else {
+ ReadCount = Config.GetRowCount();
+ }
+ }
+
+ void Bootstrap(const TActorContext& ctx) {
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id
+ << " Bootstrap called: " << ConfingString);
+
+ Become(&TKqpSelectActorMultiSession::StateFunc);
+ DescribePath(ctx);
+ }
+
+private:
+ void DescribePath(const TActorContext& ctx) {
+ TString path = Target.GetWorkingDir() + "/" + Target.GetTableName();
+ auto request = std::make_unique<TEvTxUserProxy::TEvNavigate>();
+ request->Record.MutableDescribePath()->SetPath(path);
+ ctx.Send(MakeTxProxyID(), request.release());
+ }
+
+ void Handle(const NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
+ const auto& record = ev->Get()->GetRecord();
+ OwnerId = record.GetPathOwnerId();
+ TableId = record.GetPathId();
+
+ const auto& description = record.GetPathDescription();
+ if (description.TablePartitionsSize() != 1) {
+ return StopWithError(
+ ctx,
+ TStringBuilder() << "Path must have exactly 1 part, has: " << description.TablePartitionsSize());
+ }
+ const auto& partition = description.GetTablePartitions(0);
+ TabletId = partition.GetDatashardId();
+
+ const auto& table = description.GetTable();
+
+ KeyColumnIds.reserve(table.KeyColumnIdsSize());
+ for (const auto& id: table.GetKeyColumnIds()) {
+ KeyColumnIds.push_back(id);
+ }
+
+ AllColumnIds.reserve(table.ColumnsSize());
+ for (const auto& column: table.GetColumns()) {
+ AllColumnIds.push_back(column.GetId());
+ }
+
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id
+ << " will work with tablet# " << TabletId << " with ownerId# " << OwnerId
+ << " with tableId# " << TableId << " resolved for path# "
+ << Target.GetWorkingDir() << "/" << Target.GetTableName()
+ << " with columnsCount# " << AllColumnIds.size() << ", keyColumnCount# " << KeyColumnIds.size());
+
+ RunFullScan(ctx, Config.GetRowCount());
+ }
+
+ void RunFullScan(const TActorContext& ctx, ui64 sampleKeys) {
+ auto request = std::make_unique<TEvDataShard::TEvRead>();
+ auto& record = request->Record;
+
+ record.SetReadId(++LastReadId);
+ record.MutableTableId()->SetOwnerId(OwnerId);
+ record.MutableTableId()->SetTableId(TableId);
+
+ if (sampleKeys) {
+ for (const auto& id: KeyColumnIds) {
+ record.AddColumns(id);
+ }
+ } else {
+ for (const auto& id: AllColumnIds) {
+ record.AddColumns(id);
+ }
+ }
+
+ TVector<TString> from = {TString("user")};
+ TVector<TString> to = {TString("zzz")};
+ AddRangeQuery(*request, from, true, to, true);
+
+ record.SetResultFormat(::NKikimrTxDataShard::EScanDataFormat::CELLVEC);
+
+ TSubLoadId subId(Id.Tag, SelfId(), ++LastSubTag);
+ auto* actor = CreateReadIteratorScan(request.release(), TabletId, SelfId(), subId, sampleKeys);
+ Actors.emplace_back(ctx.Register(actor));
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id
+ << " started fullscan actor# " << Actors.back());
+ }
+
+ void Handle(TEvPrivate::TEvKeys::TPtr& ev, const TActorContext& ctx) {
+ Keys = std::move(ev->Get()->Keys);
+ if (Keys.size() == 0) {
+ return StopWithError(ctx, "Failed to read keys");
+ }
+
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id
+ << " received keyCount# " << Keys.size());
+
+ StartActors(ctx);
+ }
+
+ void StartActors(const TActorContext& ctx) {
+ Inflight = Config.GetInflights(0);
+
+ TVector<TRequestsVector> perActorRequests;
+ perActorRequests.reserve(Inflight);
+
+ size_t keyCounter = 0;
+ for (size_t i = 0; i < Inflight; ++i) {
+ TRequestsVector requests;
+
+ const auto& keyCell = Keys[keyCounter++ % Keys.size()][0];
+ TStringBuf keyBuf = keyCell.AsBuf();
+ TString key(keyBuf.data(), keyBuf.size());
+
+ requests.reserve(ReadCount);
+ for (size_t i = 0; i < ReadCount; ++i) {
+ auto queryInfo = GenerateSelect(Target.GetTableName(), key);
+
+ auto request = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
+ request->Record.MutableRequest()->SetKeepSession(true);
+ request->Record.MutableRequest()->SetDatabase(Database);
+
+ request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
+ request->Record.MutableRequest()->SetQuery(queryInfo.Query);
+
+ request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
+ request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
+
+ const auto& params = NYdb::TProtoAccessor::GetProtoMap(queryInfo.Params);
+ request->Record.MutableRequest()->MutableYdbParameters()->insert(params.begin(), params.end());
+
+ requests.emplace_back(std::move(request));
+ }
+ perActorRequests.emplace_back(std::move(requests));
+ }
+
+ StartTs = TInstant::Now();
+
+ Actors.reserve(Inflight);
+ for (size_t i = 0; i < Inflight; ++i) {
+ TSubLoadId subId(Id.Tag, SelfId(), ++LastSubTag);
+
+ auto* kqpActor = new TKqpSelectActor(
+ Target,
+ SelfId(),
+ Counters,
+ subId,
+ std::move(perActorRequests[i]),
+ Config.GetInfinite());
+ Actors.emplace_back(ctx.Register(kqpActor));
+ }
+
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id
+ << " started# " << Inflight << " actors each with inflight# 1");
+ }
+
+ void Handle(const TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) {
+ const auto& record = ev->Get()->Record;
+ if (record.HasErrorReason() || !record.HasReport()) {
+ TStringStream ss;
+ ss << "kqp actor# " << record.GetTag() << " finished with error: " << record.GetErrorReason();
+ if (record.HasReport())
+ ss << ", report: " << ev->Get()->ToString();
+
+ StopWithError(ctx, ss.Str());
+ return;
+ }
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id
+ << " finished: " << ev->Get()->ToString());
+
+ Errors += record.GetReport().GetOperationsError();
+ Oks += record.GetReport().GetOperationsOK();
+
+ --Inflight;
+ if (Inflight == 0) {
+ EndTs = TInstant::Now();
+ auto delta = EndTs - StartTs;
+
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Id.SubTag);
+ auto& report = *response->Record.MutableReport();
+ report.SetTag(Id.SubTag);
+ report.SetDurationMs(delta.MilliSeconds());
+ report.SetOperationsOK(Oks);
+ report.SetOperationsError(Errors);
+ ctx.Send(Parent, response.release());
+
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id
+ << " finished in " << delta << ", oks# " << Oks << ", errors# " << Errors);
+
+ Stop(ctx);
+ }
+ }
+
+ void Handle(TEvDataShardLoad::TEvTestLoadInfoRequest::TPtr& ev, const TActorContext& ctx) {
+ TStringStream str;
+ HTML(str) {
+ str << "TKqpSelectActorMultiSession# " << Id << " started on " << StartTs;
+ }
+ ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Id.SubTag, str.Str()));
+ }
+
+ void HandlePoison(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id
+ << " tablet recieved PoisonPill, going to die");
+ Stop(ctx);
+ }
+
+ void StopWithError(const TActorContext& ctx, const TString& reason) {
+ LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id
+ << " stopped with error: " << reason);
+
+ ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason));
+ Stop(ctx);
+ }
+
+ void Stop(const TActorContext& ctx) {
+ for (const auto& actorId: Actors) {
+ ctx.Send(actorId, new TEvents::TEvPoison());
+ }
+
+ Die(ctx);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ CFunc(TEvents::TSystem::PoisonPill, HandlePoison)
+ HFunc(TEvDataShardLoad::TEvTestLoadInfoRequest, Handle)
+ HFunc(TEvDataShardLoad::TEvTestLoadFinished, Handle);
+ HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle)
+ HFunc(TEvPrivate::TEvKeys, Handle)
+ )
+};
+
+} // anonymous
+
+IActor *CreateKqpSelectActor(
+ const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TReadStart& cmd,
+ const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
+ const TActorId& parent,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TSubLoadId& id)
+{
+ return new TKqpSelectActorMultiSession(cmd, target, parent, std::move(counters), id);
+}
+
+} // NKikimr::NDataShardLoad
diff --git a/ydb/core/load_test/ycsb/kqp_upsert.cpp b/ydb/core/load_test/ycsb/kqp_upsert.cpp
index 7b32495a03d..25dd625c99d 100644
--- a/ydb/core/load_test/ycsb/kqp_upsert.cpp
+++ b/ydb/core/load_test/ycsb/kqp_upsert.cpp
@@ -1,4 +1,5 @@
#include "actors.h"
+#include "common.h"
#include <ydb/core/base/tablet.h>
#include <ydb/core/kqp/common/kqp.h>
@@ -39,8 +40,6 @@ struct TQueryInfo {
TQueryInfo GenerateUpsert(size_t n, const TString& table) {
TStringStream str;
- NYdb::TParamsBuilder paramsBuilder;
-
str << Sprintf(R"__(
--!syntax_v1
@@ -60,8 +59,9 @@ TQueryInfo GenerateUpsert(size_t n, const TString& table) {
VALUES ( $key, $field0, $field1, $field2, $field3, $field4, $field5, $field6, $field7, $field8, $field9 );
)__", table.c_str());
- paramsBuilder.AddParam("$key").Utf8(GetKey(n)).Build();
+ NYdb::TParamsBuilder paramsBuilder;
+ paramsBuilder.AddParam("$key").Utf8(GetKey(n)).Build();
for (size_t i = 0; i < 10; ++i) {
TString name = "$field" + ToString(i);
paramsBuilder.AddParam(name).String(Value).Build();
diff --git a/ydb/core/load_test/ycsb/test_load_actor.cpp b/ydb/core/load_test/ycsb/test_load_actor.cpp
index 4ae6a2994bc..b06494c758e 100644
--- a/ydb/core/load_test/ycsb/test_load_actor.cpp
+++ b/ydb/core/load_test/ycsb/test_load_actor.cpp
@@ -303,6 +303,9 @@ public:
// i.e. shard which calculates stats, compacts, etc
if (!Request.HasTableSetup() || Request.GetTableSetup().GetSkipWarmup()) {
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
+ << " skipped warmup");
+
State = EState::RunLoad;
PrepareTable(ctx);
return;
@@ -325,10 +328,16 @@ public:
case NKikimrDataShardLoad::TEvYCSBTestLoadRequest::CommandCase::kReadIteratorStart:
cmd.SetRowCount(Request.GetReadIteratorStart().GetRowCount());
break;
- default:
+ case NKikimrDataShardLoad::TEvYCSBTestLoadRequest::CommandCase::kReadKqpStart:
+ cmd.SetRowCount(Request.GetReadKqpStart().GetRowCount());
+ break;
+ default: {
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
+ << " skipped warmup");
State = EState::RunLoad;
return PrepareTable(ctx);
}
+ }
const auto& target = Request.GetTargetShard();
LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
@@ -395,6 +404,14 @@ public:
counters,
TSubLoadId(Tag, ctx.SelfID, ++LastTag)));
break;
+ case NKikimrDataShardLoad::TEvYCSBTestLoadRequest::CommandCase::kReadKqpStart:
+ actor.reset(CreateKqpSelectActor(
+ Request.GetReadKqpStart(),
+ Request.GetTargetShard(),
+ ctx.SelfID,
+ counters,
+ TSubLoadId(Tag, ctx.SelfID, ++LastTag)));
+ break;
default: {
TStringStream ss;
ss << "TLoad: unexpected command case# " << Request.Command_case()
diff --git a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
index 4502f6314ca..93c307c0aa5 100644
--- a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
+++ b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
@@ -1,4 +1,5 @@
#include "actors.h"
+#include "common.h"
#include <ydb/core/base/tablet.h>
#include <ydb/core/base/tablet_pipe.h>
@@ -25,67 +26,6 @@ namespace NKikimr::NDataShardLoad {
namespace {
-struct TEvPrivate {
- enum EEv {
- EvKeys = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
- EvPointTimes,
- EvEnd,
- };
-
- static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE));
-
- struct TEvKeys : public TEventLocal<TEvKeys, EvKeys> {
- TVector<TOwnedCellVec> Keys;
-
- TEvKeys(TVector<TOwnedCellVec>&& keys)
- : Keys(std::move(keys))
- {
- }
- };
-
- struct TEvPointTimes : public TEventLocal<TEvPointTimes, EvPointTimes> {
- TVector<TDuration> RequestTimes;
-
- TEvPointTimes(TVector<TDuration>&& requestTime)
- : RequestTimes(std::move(requestTime))
- {
- }
- };
-};
-
-TVector<TCell> ToCells(const std::vector<TString>& keys) {
- TVector<TCell> cells;
- for (auto& key: keys) {
- cells.emplace_back(TCell(key.data(), key.size()));
- }
- return cells;
-}
-
-void AddRangeQuery(
- TEvDataShard::TEvRead& request,
- const std::vector<TString>& from,
- bool fromInclusive,
- const std::vector<TString>& to,
- bool toInclusive)
-{
- auto fromCells = ToCells(from);
- auto toCells = ToCells(to);
-
- // convertion is ugly, but for tests is OK
- auto fromBuf = TSerializedCellVec::Serialize(fromCells);
- auto toBuf = TSerializedCellVec::Serialize(toCells);
-
- request.Ranges.emplace_back(fromBuf, toBuf, fromInclusive, toInclusive);
-}
-
-void AddKeyQuery(
- TEvDataShard::TEvRead& request,
- const TOwnedCellVec& key)
-{
- auto buf = TSerializedCellVec::Serialize(key);
- request.Keys.emplace_back(buf);
-}
-
// TReadIteratorPoints
class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> {
@@ -266,176 +206,6 @@ private:
)
};
-// TReadIteratorScan
-
-class TReadIteratorScan : public TActorBootstrapped<TReadIteratorScan> {
- std::unique_ptr<TEvDataShard::TEvRead> Request;
- const NKikimrTxDataShard::EScanDataFormat Format;
- const ui64 TabletId;
- const TActorId Parent;
- const TSubLoadId Id;
- const ui64 SampleKeyCount;
-
- TActorId Pipe;
- bool WasConnected = false;
- ui64 ReconnectLimit = 10;
-
- TInstant StartTs;
- size_t Oks = 0;
-
- TVector<TOwnedCellVec> SampledKeys;
-
-public:
- TReadIteratorScan(TEvDataShard::TEvRead* request,
- ui64 tablet,
- const TActorId& parent,
- const TSubLoadId& id,
- ui64 sample)
- : Request(request)
- , Format(Request->Record.GetResultFormat())
- , TabletId(tablet)
- , Parent(parent)
- , Id(id)
- , SampleKeyCount(sample)
- {
- }
-
- void Bootstrap(const TActorContext& ctx) {
- LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
- << " Bootstrap called, sample# " << SampleKeyCount);
-
- Become(&TReadIteratorScan::StateFunc);
- Connect(ctx);
- }
-
-private:
- void Connect(const TActorContext &ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
- << " Connect to# " << TabletId << " called");
- --ReconnectLimit;
- if (ReconnectLimit == 0) {
- TStringStream ss;
- ss << "Failed to set pipe to " << TabletId;
- return StopWithError(ctx, ss.Str());
- }
- Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId));
- }
-
- void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) {
- TEvTabletPipe::TEvClientConnected *msg = ev->Get();
-
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
- << " Handle TEvClientConnected called, Status# " << msg->Status);
-
- if (msg->Status != NKikimrProto::OK) {
- return Connect(ctx);
- }
-
- StartTs = TInstant::Now();
- WasConnected = true;
- NTabletPipe::SendData(ctx, Pipe, Request.release());
- }
-
- void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
- << " Handle TEvClientDestroyed called");
-
- // sanity check
- if (!WasConnected) {
- return Connect(ctx);
- }
-
- return StopWithError(ctx, "broken pipe");
- }
-
- void Handle(TEvents::TEvUndelivered::TPtr, const TActorContext& ctx) {
- return StopWithError(ctx, "delivery failed");
- }
-
- void Handle(const TEvDataShard::TEvReadResult::TPtr& ev, const TActorContext& ctx) {
- const auto* msg = ev->Get();
- const auto& record = msg->Record;
-
- if (record.HasStatus() && record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
- TStringStream ss;
- ss << "Failed to read from ds# " << TabletId << ", code# " << record.GetStatus().GetCode();
- if (record.GetStatus().IssuesSize()) {
- for (const auto& issue: record.GetStatus().GetIssues()) {
- ss << ", issue: " << issue;
- }
- }
-
- return StopWithError(ctx, ss.Str());
- }
-
- if (Format != NKikimrTxDataShard::CELLVEC) {
- return StopWithError(ctx, "Unsupported format");
- }
-
- Oks += msg->GetRowsCount();
-
- auto ts = TInstant::Now();
- auto delta = ts - StartTs;
-
- if (SampleKeyCount) {
- for (size_t i = 0; i < msg->GetRowsCount() && SampledKeys.size() < SampleKeyCount; ++i) {
- SampledKeys.emplace_back(msg->GetCells(i));
- }
-
- if (record.GetFinished() || SampledKeys.size() >= SampleKeyCount) {
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
- << " finished in " << delta
- << ", sampled# " << SampledKeys.size()
- << ", iter finished# " << record.GetFinished()
- << ", oks# " << Oks);
-
- ctx.Send(Parent, new TEvPrivate::TEvKeys(std::move(SampledKeys)));
- return Die(ctx);
- }
-
- return;
- } else if (record.GetFinished()) {
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
- << " finished in " << delta
- << ", read# " << Oks);
-
- auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(0);
- auto& report = *response->Record.MutableReport();
- report.SetDurationMs(delta.MilliSeconds());
- report.SetOperationsOK(Oks);
- report.SetOperationsError(0);
- ctx.Send(Parent, response.release());
-
- return Die(ctx);
- }
- }
-
- void StopWithError(const TActorContext& ctx, const TString& reason) {
- LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
- << ", stopped with error: " << reason);
-
- ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(0, reason));
- NTabletPipe::CloseClient(SelfId(), Pipe);
- return Die(ctx);
- }
-
- void HandlePoison(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
- << " tablet recieved PoisonPill, going to die");
-
- // TODO: cancel iterator
- return Die(ctx);
- }
-
- STRICT_STFUNC(StateFunc,
- CFunc(TEvents::TSystem::PoisonPill, HandlePoison)
- HFunc(TEvents::TEvUndelivered, Handle)
- HFunc(TEvTabletPipe::TEvClientConnected, Handle)
- HFunc(TEvTabletPipe::TEvClientDestroyed, Handle)
- HFunc(TEvDataShard::TEvReadResult, Handle)
- )
-};
-
// TReadIteratorLoadScenario
enum class EState {
@@ -634,7 +404,7 @@ private:
record.SetResultFormat(::NKikimrTxDataShard::EScanDataFormat::CELLVEC);
TSubLoadId subId(Id.Tag, SelfId(), ++LastSubTag);
- auto* actor = new TReadIteratorScan(request.release(), TabletId, SelfId(), subId, sampleKeys);
+ auto* actor = CreateReadIteratorScan(request.release(), TabletId, SelfId(), subId, sampleKeys);
StartedActors.emplace_back(ctx.Register(actor));
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "started fullscan actor# " << StartedActors.back());
diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto
index 1991a020c00..8be193bae64 100644
--- a/ydb/core/protos/datashard_load.proto
+++ b/ydb/core/protos/datashard_load.proto
@@ -40,7 +40,10 @@ message TEvYCSBTestLoadRequest {
// number of random rows to be read (point reads)
optional uint64 ReadCount = 2;
+ // note that KQP supports only 1 inflight
repeated uint32 Inflights = 3;
+
+ // not used in KQP
repeated uint32 Chunks = 4;
// Specifies the format for result data in TEvReadResult
@@ -87,6 +90,7 @@ message TEvYCSBTestLoadRequest {
TUpdateStart UpsertProposeStart = 24;
TReadStart ReadIteratorStart = 25;
+ TReadStart ReadKqpStart = 26;
}
}