diff options
author | eivanov89 <eivanov89@ydb.tech> | 2023-02-08 12:04:46 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2023-02-08 12:04:46 +0300 |
commit | a7f00ce360da75ca0fd9974c4bcd8c5ab91194e6 (patch) | |
tree | 64af346fa1884079058969249131b84ef7732afe | |
parent | 556d24d5a68509194d4901875a44e745b0a5de38 (diff) | |
download | ydb-a7f00ce360da75ca0fd9974c4bcd8c5ab91194e6.tar.gz |
add ycsb kqp select load actor
-rw-r--r-- | ydb/core/load_test/CMakeLists.darwin.txt | 2 | ||||
-rw-r--r-- | ydb/core/load_test/CMakeLists.linux-aarch64.txt | 2 | ||||
-rw-r--r-- | ydb/core/load_test/CMakeLists.linux.txt | 2 | ||||
-rw-r--r-- | ydb/core/load_test/ut_ycsb.cpp | 30 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/actors.h | 7 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp | 6 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/common.cpp | 235 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/common.h | 58 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/defs.h | 4 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/kqp_select.cpp | 534 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/kqp_upsert.cpp | 6 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/test_load_actor.cpp | 19 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/test_load_read_iterator.cpp | 234 | ||||
-rw-r--r-- | ydb/core/protos/datashard_load.proto | 4 |
14 files changed, 893 insertions, 250 deletions
diff --git a/ydb/core/load_test/CMakeLists.darwin.txt b/ydb/core/load_test/CMakeLists.darwin.txt index 7022fd47dbb..53c7051ec60 100644 --- a/ydb/core/load_test/CMakeLists.darwin.txt +++ b/ydb/core/load_test/CMakeLists.darwin.txt @@ -39,7 +39,9 @@ target_sources(ydb-core-load_test PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/load_test/group_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/vdisk_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/info_collector.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/kqp_select.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/kqp_upsert.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/test_load_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/test_load_read_iterator.cpp diff --git a/ydb/core/load_test/CMakeLists.linux-aarch64.txt b/ydb/core/load_test/CMakeLists.linux-aarch64.txt index a7ece616e2b..edf344a5c59 100644 --- a/ydb/core/load_test/CMakeLists.linux-aarch64.txt +++ b/ydb/core/load_test/CMakeLists.linux-aarch64.txt @@ -40,7 +40,9 @@ target_sources(ydb-core-load_test PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/load_test/group_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/vdisk_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/info_collector.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/kqp_select.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/kqp_upsert.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/test_load_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/test_load_read_iterator.cpp diff --git a/ydb/core/load_test/CMakeLists.linux.txt b/ydb/core/load_test/CMakeLists.linux.txt index a7ece616e2b..edf344a5c59 100644 --- a/ydb/core/load_test/CMakeLists.linux.txt +++ b/ydb/core/load_test/CMakeLists.linux.txt @@ -40,7 +40,9 @@ target_sources(ydb-core-load_test PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/load_test/group_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/vdisk_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/info_collector.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/kqp_select.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/kqp_upsert.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/test_load_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/load_test/ycsb/test_load_read_iterator.cpp diff --git a/ydb/core/load_test/ut_ycsb.cpp b/ydb/core/load_test/ut_ycsb.cpp index b7c0d557e5e..228d73cd91a 100644 --- a/ydb/core/load_test/ut_ycsb.cpp +++ b/ydb/core/load_test/ut_ycsb.cpp @@ -1,5 +1,6 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> // Y_UNIT_TEST_(TWIN|QUAD), Q_ #include <ydb/core/load_test/events.h> +#include <ydb/core/load_test/ycsb/common.h> #include <ydb/core/load_test/ycsb/test_load_actor.h> #include <ydb/core/scheme/scheme_types_defs.h> #include <ydb/core/scheme/scheme_types_proto.h> @@ -22,11 +23,6 @@ const TString DefaultTableName = "usertable"; const TString FieldPrefix = "field"; const size_t ValueColumnsCount = 10; -TString GetKey(size_t n) { - // user1000385178204227360 - return Sprintf("user%.19lu", n); -} - void InitRoot(Tests::TServer::TPtr server, TActorId sender) { server->SetupRootStoragePools(sender); @@ -707,6 +703,30 @@ Y_UNIT_TEST_SUITE(ReadLoad) { helper.CheckKeys(0, expectedRowCount); } + Y_UNIT_TEST(ShouldReadKqp) { + TTestHelper helper; + + const ui64 expectedRowCount = 1000; + + std::unique_ptr<TEvDataShardLoad::TEvYCSBTestLoadRequest> request(new TEvDataShardLoad::TEvYCSBTestLoadRequest()); + auto& record = request->Record; + auto& command = *record.MutableReadKqpStart(); + + command.AddInflights(10); + command.SetRowCount(expectedRowCount); + + auto& setupTable = *record.MutableTableSetup(); + setupTable.SetWorkingDir("/Root"); + setupTable.SetTableName("usertable"); + + auto result = helper.RunTestLoad(std::move(request)); + + UNIT_ASSERT_VALUES_EQUAL(result->JsonResult["oks"].GetInteger(), (10 * expectedRowCount)); + + // sanity check that there was data in table + helper.CheckKeys(0, expectedRowCount); + } + } // Y_UNIT_TEST_SUITE(ReadLoad) } // namespace NKikimr diff --git a/ydb/core/load_test/ycsb/actors.h b/ydb/core/load_test/ycsb/actors.h index 759bc60f05a..36a160f1bba 100644 --- a/ydb/core/load_test/ycsb/actors.h +++ b/ydb/core/load_test/ycsb/actors.h @@ -42,6 +42,13 @@ IActor *CreateReadIteratorActor( TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const TSubLoadId& id); +IActor *CreateKqpSelectActor( + const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TReadStart& cmd, + const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, + const TActorId& parent, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TSubLoadId& id); + class TLoadManagerException : public yexception { }; diff --git a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp index c10da88adea..574553c5148 100644 --- a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp +++ b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp @@ -1,4 +1,5 @@ #include "actors.h" +#include "common.h" #include <ydb/core/base/tablet.h> #include <ydb/core/base/tablet_pipe.h> @@ -16,11 +17,6 @@ namespace NKikimr::NDataShardLoad { -TString GetKey(size_t n) { - // user1000385178204227360 - return Sprintf("user%.19lu", n); -} - using TUploadRowsRequestPtr = std::unique_ptr<TEvDataShard::TEvUploadRowsRequest>; namespace { diff --git a/ydb/core/load_test/ycsb/common.cpp b/ydb/core/load_test/ycsb/common.cpp new file mode 100644 index 00000000000..8f408a95581 --- /dev/null +++ b/ydb/core/load_test/ycsb/common.cpp @@ -0,0 +1,235 @@ +#include "actors.h" +#include "common.h" + +#include <ydb/core/base/tablet.h> +#include <ydb/core/base/tablet_pipe.h> + +#include <library/cpp/monlib/service/pages/templates.h> + +#include <google/protobuf/text_format.h> + +namespace NKikimr::NDataShardLoad { + +namespace { + +// TReadIteratorScan + +class TReadIteratorScan : public TActorBootstrapped<TReadIteratorScan> { + std::unique_ptr<TEvDataShard::TEvRead> Request; + const NKikimrTxDataShard::EScanDataFormat Format; + const ui64 TabletId; + const TActorId Parent; + const TSubLoadId Id; + const ui64 SampleKeyCount; + + TActorId Pipe; + bool WasConnected = false; + ui64 ReconnectLimit = 10; + + TInstant StartTs; + size_t Oks = 0; + + TVector<TOwnedCellVec> SampledKeys; + +public: + TReadIteratorScan(TEvDataShard::TEvRead* request, + ui64 tablet, + const TActorId& parent, + const TSubLoadId& id, + ui64 sample) + : Request(request) + , Format(Request->Record.GetResultFormat()) + , TabletId(tablet) + , Parent(parent) + , Id(id) + , SampleKeyCount(sample) + { + } + + void Bootstrap(const TActorContext& ctx) { + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << " Bootstrap called, sample# " << SampleKeyCount); + + Become(&TReadIteratorScan::StateFunc); + Connect(ctx); + } + +private: + void Connect(const TActorContext &ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << " Connect to# " << TabletId << " called"); + --ReconnectLimit; + if (ReconnectLimit == 0) { + TStringStream ss; + ss << "Failed to set pipe to " << TabletId; + return StopWithError(ctx, ss.Str()); + } + Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId)); + } + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) { + TEvTabletPipe::TEvClientConnected *msg = ev->Get(); + + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << " Handle TEvClientConnected called, Status# " << msg->Status); + + if (msg->Status != NKikimrProto::OK) { + return Connect(ctx); + } + + StartTs = TInstant::Now(); + WasConnected = true; + NTabletPipe::SendData(ctx, Pipe, Request.release()); + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << " Handle TEvClientDestroyed called"); + + // sanity check + if (!WasConnected) { + return Connect(ctx); + } + + return StopWithError(ctx, "broken pipe"); + } + + void Handle(TEvents::TEvUndelivered::TPtr, const TActorContext& ctx) { + return StopWithError(ctx, "delivery failed"); + } + + void Handle(const TEvDataShard::TEvReadResult::TPtr& ev, const TActorContext& ctx) { + const auto* msg = ev->Get(); + const auto& record = msg->Record; + + if (record.HasStatus() && record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { + TStringStream ss; + ss << "Failed to read from ds# " << TabletId << ", code# " << record.GetStatus().GetCode(); + if (record.GetStatus().IssuesSize()) { + for (const auto& issue: record.GetStatus().GetIssues()) { + ss << ", issue: " << issue; + } + } + + return StopWithError(ctx, ss.Str()); + } + + if (Format != NKikimrTxDataShard::CELLVEC) { + return StopWithError(ctx, "Unsupported format"); + } + + Oks += msg->GetRowsCount(); + + auto ts = TInstant::Now(); + auto delta = ts - StartTs; + + if (SampleKeyCount) { + for (size_t i = 0; i < msg->GetRowsCount() && SampledKeys.size() < SampleKeyCount; ++i) { + SampledKeys.emplace_back(msg->GetCells(i)); + } + + if (record.GetFinished() || SampledKeys.size() >= SampleKeyCount) { + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << " finished in " << delta + << ", sampled# " << SampledKeys.size() + << ", iter finished# " << record.GetFinished() + << ", oks# " << Oks); + + ctx.Send(Parent, new TEvPrivate::TEvKeys(std::move(SampledKeys))); + return Die(ctx); + } + + return; + } else if (record.GetFinished()) { + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << " finished in " << delta + << ", read# " << Oks); + + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(0); + auto& report = *response->Record.MutableReport(); + report.SetDurationMs(delta.MilliSeconds()); + report.SetOperationsOK(Oks); + report.SetOperationsError(0); + ctx.Send(Parent, response.release()); + + return Die(ctx); + } + } + + void StopWithError(const TActorContext& ctx, const TString& reason) { + LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << ", stopped with error: " << reason); + + ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(0, reason)); + NTabletPipe::CloseClient(SelfId(), Pipe); + return Die(ctx); + } + + void HandlePoison(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << " tablet recieved PoisonPill, going to die"); + + // TODO: cancel iterator + return Die(ctx); + } + + STRICT_STFUNC(StateFunc, + CFunc(TEvents::TSystem::PoisonPill, HandlePoison) + HFunc(TEvents::TEvUndelivered, Handle) + HFunc(TEvTabletPipe::TEvClientConnected, Handle) + HFunc(TEvTabletPipe::TEvClientDestroyed, Handle) + HFunc(TEvDataShard::TEvReadResult, Handle) + ) +}; + +} // anonymous + +TString GetKey(size_t n) { + // user1000385178204227360 + return Sprintf("user%.19lu", n); +} + +TVector<TCell> ToCells(const std::vector<TString> &keys) { + TVector<TCell> cells; + for (auto &key : keys) { + cells.emplace_back(TCell(key.data(), key.size())); + } + return cells; +} + +void AddRangeQuery( + TEvDataShard::TEvRead &request, + const std::vector<TString> &from, + bool fromInclusive, + const std::vector<TString> &to, + bool toInclusive) +{ + auto fromCells = ToCells(from); + auto toCells = ToCells(to); + + // convertion is ugly, but for tests is OK + auto fromBuf = TSerializedCellVec::Serialize(fromCells); + auto toBuf = TSerializedCellVec::Serialize(toCells); + + request.Ranges.emplace_back(fromBuf, toBuf, fromInclusive, toInclusive); +} + +void AddKeyQuery( + TEvDataShard::TEvRead &request, + const TOwnedCellVec &key) +{ + auto buf = TSerializedCellVec::Serialize(key); + request.Keys.emplace_back(buf); +} + +IActor *CreateReadIteratorScan( + TEvDataShard::TEvRead* request, + ui64 tablet, + const TActorId& parent, + const TSubLoadId& id, + ui64 sample) +{ + return new TReadIteratorScan(request, tablet, parent, id, sample); +} + +} // NKikimr::NDataShardLoad diff --git a/ydb/core/load_test/ycsb/common.h b/ydb/core/load_test/ycsb/common.h new file mode 100644 index 00000000000..d531eb6fe18 --- /dev/null +++ b/ydb/core/load_test/ycsb/common.h @@ -0,0 +1,58 @@ +#pragma once + +#include "defs.h" +#include "test_load_actor.h" + +#include <ydb/core/base/events.h> +#include <ydb/core/tx/datashard/datashard.h> + +namespace NKikimr::NDataShardLoad { + +TString GetKey(size_t n); + +static const TString Value = TString(100, 'x'); + +struct TEvPrivate { + enum EEv { + EvKeys = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvPointTimes, + EvEnd, + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE)); + + struct TEvKeys : public TEventLocal<TEvKeys, EvKeys> { + TVector<TOwnedCellVec> Keys; + + TEvKeys(TVector<TOwnedCellVec>&& keys) + : Keys(std::move(keys)) + { + } + }; + + struct TEvPointTimes : public TEventLocal<TEvPointTimes, EvPointTimes> { + TVector<TDuration> RequestTimes; + + TEvPointTimes(TVector<TDuration>&& requestTime) + : RequestTimes(std::move(requestTime)) + { + } + }; +}; + +TVector<TCell> ToCells(const std::vector<TString> &keys); + +void AddRangeQuery(TEvDataShard::TEvRead &request, + const std::vector<TString> &from, bool fromInclusive, + const std::vector<TString> &to, bool toInclusive); + +void AddKeyQuery(TEvDataShard::TEvRead &request, const TOwnedCellVec &key); + +IActor *CreateReadIteratorScan( + TEvDataShard::TEvRead* request, + ui64 tablet, + const TActorId& parent, + const TSubLoadId& id, + ui64 sample); + +} // NKikimr::NDataShardLoad diff --git a/ydb/core/load_test/ycsb/defs.h b/ydb/core/load_test/ycsb/defs.h index 56b6952963f..fc4889ec7c9 100644 --- a/ydb/core/load_test/ycsb/defs.h +++ b/ydb/core/load_test/ycsb/defs.h @@ -11,8 +11,4 @@ namespace NKikimr::NDataShardLoad { using TUploadRequest = std::unique_ptr<IEventBase>; using TRequestsVector = std::vector<TUploadRequest>; -TString GetKey(size_t n); - -static const TString Value = TString(100, 'x'); - } // NKikimr::NDataShardLoad diff --git a/ydb/core/load_test/ycsb/kqp_select.cpp b/ydb/core/load_test/ycsb/kqp_select.cpp new file mode 100644 index 00000000000..42589476c16 --- /dev/null +++ b/ydb/core/load_test/ycsb/kqp_select.cpp @@ -0,0 +1,534 @@ +#include "actors.h" +#include "common.h" + +#include <ydb/core/base/tablet.h> +#include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/kqp/common/kqp.h> +#include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/ydb_convert/ydb_convert.h> + +#include <ydb/public/lib/operation_id/operation_id.h> +#include <ydb/public/sdk/cpp/client/ydb_params/params.h> +#include <ydb/public/sdk/cpp/client/ydb_params/params.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> + +#include <library/cpp/monlib/service/pages/templates.h> + +#include <google/protobuf/text_format.h> + +// * Scheme is hardcoded and it is like default YCSB setup: +// 1 Text "id" column, 10 Bytes "field0" - "field9" columns +// * row is ~ 1 KB, keys are like user1000385178204227360 + +namespace NKikimr::NDataShardLoad { + +namespace { + +struct TQueryInfo { + TQueryInfo() + : Query("") + , Params(NYdb::TParamsBuilder().Build()) + {} + + TQueryInfo(const std::string& query, const NYdb::TParams&& params) + : Query(query) + , Params(std::move(params)) + {} + + TString Query; + NYdb::TParams Params; +}; + +TQueryInfo GenerateSelect(const TString& table, const TString& key) { + TStringStream str; + + str << Sprintf(R"__( + --!syntax_v1 + + DECLARE $key AS Text; + + SELECT * FROM `%s` WHERE id == '$key'; + )__", table.c_str()); + + NYdb::TParamsBuilder paramsBuilder; + paramsBuilder.AddParam("$key").Utf8(key).Build(); + auto params = paramsBuilder.Build(); + + return TQueryInfo(str.Str(), std::move(params)); +} + +// it's a partial copy-paste from TUpsertActor: logic slightly differs, so that +// it seems better to have copy-paste rather if/else for different loads +class TKqpSelectActor : public TActorBootstrapped<TKqpSelectActor> { + const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard Target; + const TActorId Parent; + const TSubLoadId Id; + const TString Database; + + TString Session; + TRequestsVector Requests; + bool Infinite; + + size_t CurrentRequest = 0; + + TInstant StartTs; + TInstant EndTs; + + size_t Errors = 0; + +public: + TKqpSelectActor(const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, + const TActorId& parent, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TSubLoadId& id, + TRequestsVector requests, + bool infinite) + : Target(target) + , Parent(parent) + , Id(id) + , Database(Target.GetWorkingDir()) + , Requests(std::move(requests)) + , Infinite(infinite) + { + Y_UNUSED(counters); + } + + void Bootstrap(const TActorContext& ctx) { + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id + << " Bootstrap called"); + + Become(&TKqpSelectActor::StateFunc); + CreateSession(ctx); + } + +private: + void CreateSession(const TActorContext& ctx) { + auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id + << " sends event for session creation to proxy: " << kqpProxy.ToString()); + + auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); + ev->Record.MutableRequest()->SetDatabase(Database); + Send(kqpProxy, ev.Release()); + } + + void CloseSession(const TActorContext& ctx) { + if (!Session) + return; + + auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id + << " sends session close query to proxy: " << kqpProxy); + + auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); + ev->Record.MutableRequest()->SetSessionId(Session); + ctx.Send(kqpProxy, ev.Release()); + } + + void ReadRow(const TActorContext &ctx) { + auto* request = static_cast<NKqp::TEvKqp::TEvQueryRequest*>(Requests[CurrentRequest].get()); + request->Record.MutableRequest()->SetSessionId(Session); + + auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id + << " send request# " << CurrentRequest + << " to proxy# " << kqpProxy << ": " << request->ToString()); + + if (!Infinite) { + ctx.Send(kqpProxy, Requests[CurrentRequest].release()); + } else { + auto requestCopy = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>(); + requestCopy->Record = request->Record; + ctx.Send(kqpProxy, requestCopy.release()); + } + + ++CurrentRequest; + } + + void OnRequestDone(const TActorContext& ctx) { + if (Infinite && CurrentRequest >= Requests.size()) { + CurrentRequest = 0; + } + + if (CurrentRequest < Requests.size()) { + ReadRow(ctx); + } else { + EndTs = TInstant::Now(); + auto delta = EndTs - StartTs; + + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Id.SubTag); + auto& report = *response->Record.MutableReport(); + report.SetTag(Id.SubTag); + report.SetDurationMs(delta.MilliSeconds()); + report.SetOperationsOK(Requests.size() - Errors); + report.SetOperationsError(Errors); + + ctx.Send(Parent, response.release()); + + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id + << " finished in " << delta << ", errors=" << Errors); + Die(ctx); + } + } + + void HandlePoison(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id + << " tablet recieved PoisonPill, going to die"); + CloseSession(ctx); + Die(ctx); + } + + void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) { + auto& response = ev->Get()->Record; + + if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { + Session = response.GetResponse().GetSessionId(); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id << " session: " << Session); + ReadRow(ctx); + } else { + StopWithError(ctx, "failed to create session: " + ev->Get()->ToString()); + } + } + + void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id + << " received from " << ev->Sender << ": " << ev->Get()->Record.DebugString()); + + auto& response = ev->Get()->Record.GetRef(); + if (response.GetYdbStatus() != Ydb::StatusIds_StatusCode_SUCCESS) { + ++Errors; + } + + OnRequestDone(ctx); + } + + void StopWithError(const TActorContext& ctx, const TString& reason) { + LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason); + ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason)); + Die(ctx); + } + + void Handle(TEvDataShardLoad::TEvTestLoadInfoRequest::TPtr& ev, const TActorContext& ctx) { + TStringStream str; + HTML(str) { + str << "TKqpSelectActor# " << Id << " started on " << StartTs + << " sent " << CurrentRequest << " out of " << Requests.size(); + TInstant ts = EndTs ? EndTs : TInstant::Now(); + auto delta = ts - StartTs; + auto throughput = Requests.size() / delta.Seconds(); + str << " in " << delta << " (" << throughput << " op/s)" + << " errors=" << Errors; + } + + ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Id.SubTag, str.Str())); + } + + STRICT_STFUNC(StateFunc, + CFunc(TEvents::TSystem::PoisonPill, HandlePoison) + HFunc(TEvDataShardLoad::TEvTestLoadInfoRequest, Handle) + HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle) + HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle) + ) +}; + +// creates multiple TKqpSelectActor for inflight > 1 and waits completion +class TKqpSelectActorMultiSession : public TActorBootstrapped<TKqpSelectActorMultiSession> { + const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TReadStart Config; + const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard Target; + const TActorId Parent; + const TSubLoadId Id; + TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; + const TString Database; + + TString ConfingString; + + ui64 ReadCount = 0; + + ui64 TabletId = 0; + ui64 TableId = 0; + ui64 OwnerId = 0; + + TVector<ui32> KeyColumnIds; + TVector<ui32> AllColumnIds; + + TVector<TOwnedCellVec> Keys; + + ui64 LastReadId = 0; + ui64 LastSubTag = 0; + TVector<TActorId> Actors; + + size_t Inflight = 0; + + TInstant StartTs; + TInstant EndTs; + + size_t Oks = 0; + size_t Errors = 0; + +public: + TKqpSelectActorMultiSession(const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TReadStart& cmd, + const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, + const TActorId& parent, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TSubLoadId& id) + : Config(cmd) + , Target(target) + , Parent(parent) + , Id(id) + , Counters(counters) + , Database(target.GetWorkingDir()) + { + Y_UNUSED(counters); + google::protobuf::TextFormat::PrintToString(cmd, &ConfingString); + + if (Config.HasReadCount()) { + ReadCount = Config.GetReadCount(); + } else { + ReadCount = Config.GetRowCount(); + } + } + + void Bootstrap(const TActorContext& ctx) { + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id + << " Bootstrap called: " << ConfingString); + + Become(&TKqpSelectActorMultiSession::StateFunc); + DescribePath(ctx); + } + +private: + void DescribePath(const TActorContext& ctx) { + TString path = Target.GetWorkingDir() + "/" + Target.GetTableName(); + auto request = std::make_unique<TEvTxUserProxy::TEvNavigate>(); + request->Record.MutableDescribePath()->SetPath(path); + ctx.Send(MakeTxProxyID(), request.release()); + } + + void Handle(const NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->GetRecord(); + OwnerId = record.GetPathOwnerId(); + TableId = record.GetPathId(); + + const auto& description = record.GetPathDescription(); + if (description.TablePartitionsSize() != 1) { + return StopWithError( + ctx, + TStringBuilder() << "Path must have exactly 1 part, has: " << description.TablePartitionsSize()); + } + const auto& partition = description.GetTablePartitions(0); + TabletId = partition.GetDatashardId(); + + const auto& table = description.GetTable(); + + KeyColumnIds.reserve(table.KeyColumnIdsSize()); + for (const auto& id: table.GetKeyColumnIds()) { + KeyColumnIds.push_back(id); + } + + AllColumnIds.reserve(table.ColumnsSize()); + for (const auto& column: table.GetColumns()) { + AllColumnIds.push_back(column.GetId()); + } + + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id + << " will work with tablet# " << TabletId << " with ownerId# " << OwnerId + << " with tableId# " << TableId << " resolved for path# " + << Target.GetWorkingDir() << "/" << Target.GetTableName() + << " with columnsCount# " << AllColumnIds.size() << ", keyColumnCount# " << KeyColumnIds.size()); + + RunFullScan(ctx, Config.GetRowCount()); + } + + void RunFullScan(const TActorContext& ctx, ui64 sampleKeys) { + auto request = std::make_unique<TEvDataShard::TEvRead>(); + auto& record = request->Record; + + record.SetReadId(++LastReadId); + record.MutableTableId()->SetOwnerId(OwnerId); + record.MutableTableId()->SetTableId(TableId); + + if (sampleKeys) { + for (const auto& id: KeyColumnIds) { + record.AddColumns(id); + } + } else { + for (const auto& id: AllColumnIds) { + record.AddColumns(id); + } + } + + TVector<TString> from = {TString("user")}; + TVector<TString> to = {TString("zzz")}; + AddRangeQuery(*request, from, true, to, true); + + record.SetResultFormat(::NKikimrTxDataShard::EScanDataFormat::CELLVEC); + + TSubLoadId subId(Id.Tag, SelfId(), ++LastSubTag); + auto* actor = CreateReadIteratorScan(request.release(), TabletId, SelfId(), subId, sampleKeys); + Actors.emplace_back(ctx.Register(actor)); + + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id + << " started fullscan actor# " << Actors.back()); + } + + void Handle(TEvPrivate::TEvKeys::TPtr& ev, const TActorContext& ctx) { + Keys = std::move(ev->Get()->Keys); + if (Keys.size() == 0) { + return StopWithError(ctx, "Failed to read keys"); + } + + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id + << " received keyCount# " << Keys.size()); + + StartActors(ctx); + } + + void StartActors(const TActorContext& ctx) { + Inflight = Config.GetInflights(0); + + TVector<TRequestsVector> perActorRequests; + perActorRequests.reserve(Inflight); + + size_t keyCounter = 0; + for (size_t i = 0; i < Inflight; ++i) { + TRequestsVector requests; + + const auto& keyCell = Keys[keyCounter++ % Keys.size()][0]; + TStringBuf keyBuf = keyCell.AsBuf(); + TString key(keyBuf.data(), keyBuf.size()); + + requests.reserve(ReadCount); + for (size_t i = 0; i < ReadCount; ++i) { + auto queryInfo = GenerateSelect(Target.GetTableName(), key); + + auto request = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>(); + request->Record.MutableRequest()->SetKeepSession(true); + request->Record.MutableRequest()->SetDatabase(Database); + + request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + request->Record.MutableRequest()->SetQuery(queryInfo.Query); + + request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); + request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); + + const auto& params = NYdb::TProtoAccessor::GetProtoMap(queryInfo.Params); + request->Record.MutableRequest()->MutableYdbParameters()->insert(params.begin(), params.end()); + + requests.emplace_back(std::move(request)); + } + perActorRequests.emplace_back(std::move(requests)); + } + + StartTs = TInstant::Now(); + + Actors.reserve(Inflight); + for (size_t i = 0; i < Inflight; ++i) { + TSubLoadId subId(Id.Tag, SelfId(), ++LastSubTag); + + auto* kqpActor = new TKqpSelectActor( + Target, + SelfId(), + Counters, + subId, + std::move(perActorRequests[i]), + Config.GetInfinite()); + Actors.emplace_back(ctx.Register(kqpActor)); + } + + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id + << " started# " << Inflight << " actors each with inflight# 1"); + } + + void Handle(const TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->Record; + if (record.HasErrorReason() || !record.HasReport()) { + TStringStream ss; + ss << "kqp actor# " << record.GetTag() << " finished with error: " << record.GetErrorReason(); + if (record.HasReport()) + ss << ", report: " << ev->Get()->ToString(); + + StopWithError(ctx, ss.Str()); + return; + } + + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id + << " finished: " << ev->Get()->ToString()); + + Errors += record.GetReport().GetOperationsError(); + Oks += record.GetReport().GetOperationsOK(); + + --Inflight; + if (Inflight == 0) { + EndTs = TInstant::Now(); + auto delta = EndTs - StartTs; + + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Id.SubTag); + auto& report = *response->Record.MutableReport(); + report.SetTag(Id.SubTag); + report.SetDurationMs(delta.MilliSeconds()); + report.SetOperationsOK(Oks); + report.SetOperationsError(Errors); + ctx.Send(Parent, response.release()); + + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id + << " finished in " << delta << ", oks# " << Oks << ", errors# " << Errors); + + Stop(ctx); + } + } + + void Handle(TEvDataShardLoad::TEvTestLoadInfoRequest::TPtr& ev, const TActorContext& ctx) { + TStringStream str; + HTML(str) { + str << "TKqpSelectActorMultiSession# " << Id << " started on " << StartTs; + } + ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Id.SubTag, str.Str())); + } + + void HandlePoison(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id + << " tablet recieved PoisonPill, going to die"); + Stop(ctx); + } + + void StopWithError(const TActorContext& ctx, const TString& reason) { + LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id + << " stopped with error: " << reason); + + ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason)); + Stop(ctx); + } + + void Stop(const TActorContext& ctx) { + for (const auto& actorId: Actors) { + ctx.Send(actorId, new TEvents::TEvPoison()); + } + + Die(ctx); + } + + STRICT_STFUNC(StateFunc, + CFunc(TEvents::TSystem::PoisonPill, HandlePoison) + HFunc(TEvDataShardLoad::TEvTestLoadInfoRequest, Handle) + HFunc(TEvDataShardLoad::TEvTestLoadFinished, Handle); + HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle) + HFunc(TEvPrivate::TEvKeys, Handle) + ) +}; + +} // anonymous + +IActor *CreateKqpSelectActor( + const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TReadStart& cmd, + const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, + const TActorId& parent, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TSubLoadId& id) +{ + return new TKqpSelectActorMultiSession(cmd, target, parent, std::move(counters), id); +} + +} // NKikimr::NDataShardLoad diff --git a/ydb/core/load_test/ycsb/kqp_upsert.cpp b/ydb/core/load_test/ycsb/kqp_upsert.cpp index 7b32495a03d..25dd625c99d 100644 --- a/ydb/core/load_test/ycsb/kqp_upsert.cpp +++ b/ydb/core/load_test/ycsb/kqp_upsert.cpp @@ -1,4 +1,5 @@ #include "actors.h" +#include "common.h" #include <ydb/core/base/tablet.h> #include <ydb/core/kqp/common/kqp.h> @@ -39,8 +40,6 @@ struct TQueryInfo { TQueryInfo GenerateUpsert(size_t n, const TString& table) { TStringStream str; - NYdb::TParamsBuilder paramsBuilder; - str << Sprintf(R"__( --!syntax_v1 @@ -60,8 +59,9 @@ TQueryInfo GenerateUpsert(size_t n, const TString& table) { VALUES ( $key, $field0, $field1, $field2, $field3, $field4, $field5, $field6, $field7, $field8, $field9 ); )__", table.c_str()); - paramsBuilder.AddParam("$key").Utf8(GetKey(n)).Build(); + NYdb::TParamsBuilder paramsBuilder; + paramsBuilder.AddParam("$key").Utf8(GetKey(n)).Build(); for (size_t i = 0; i < 10; ++i) { TString name = "$field" + ToString(i); paramsBuilder.AddParam(name).String(Value).Build(); diff --git a/ydb/core/load_test/ycsb/test_load_actor.cpp b/ydb/core/load_test/ycsb/test_load_actor.cpp index 4ae6a2994bc..b06494c758e 100644 --- a/ydb/core/load_test/ycsb/test_load_actor.cpp +++ b/ydb/core/load_test/ycsb/test_load_actor.cpp @@ -303,6 +303,9 @@ public: // i.e. shard which calculates stats, compacts, etc if (!Request.HasTableSetup() || Request.GetTableSetup().GetSkipWarmup()) { + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag + << " skipped warmup"); + State = EState::RunLoad; PrepareTable(ctx); return; @@ -325,10 +328,16 @@ public: case NKikimrDataShardLoad::TEvYCSBTestLoadRequest::CommandCase::kReadIteratorStart: cmd.SetRowCount(Request.GetReadIteratorStart().GetRowCount()); break; - default: + case NKikimrDataShardLoad::TEvYCSBTestLoadRequest::CommandCase::kReadKqpStart: + cmd.SetRowCount(Request.GetReadKqpStart().GetRowCount()); + break; + default: { + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag + << " skipped warmup"); State = EState::RunLoad; return PrepareTable(ctx); } + } const auto& target = Request.GetTargetShard(); LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag @@ -395,6 +404,14 @@ public: counters, TSubLoadId(Tag, ctx.SelfID, ++LastTag))); break; + case NKikimrDataShardLoad::TEvYCSBTestLoadRequest::CommandCase::kReadKqpStart: + actor.reset(CreateKqpSelectActor( + Request.GetReadKqpStart(), + Request.GetTargetShard(), + ctx.SelfID, + counters, + TSubLoadId(Tag, ctx.SelfID, ++LastTag))); + break; default: { TStringStream ss; ss << "TLoad: unexpected command case# " << Request.Command_case() diff --git a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp index 4502f6314ca..93c307c0aa5 100644 --- a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp +++ b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp @@ -1,4 +1,5 @@ #include "actors.h" +#include "common.h" #include <ydb/core/base/tablet.h> #include <ydb/core/base/tablet_pipe.h> @@ -25,67 +26,6 @@ namespace NKikimr::NDataShardLoad { namespace { -struct TEvPrivate { - enum EEv { - EvKeys = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), - EvPointTimes, - EvEnd, - }; - - static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE)); - - struct TEvKeys : public TEventLocal<TEvKeys, EvKeys> { - TVector<TOwnedCellVec> Keys; - - TEvKeys(TVector<TOwnedCellVec>&& keys) - : Keys(std::move(keys)) - { - } - }; - - struct TEvPointTimes : public TEventLocal<TEvPointTimes, EvPointTimes> { - TVector<TDuration> RequestTimes; - - TEvPointTimes(TVector<TDuration>&& requestTime) - : RequestTimes(std::move(requestTime)) - { - } - }; -}; - -TVector<TCell> ToCells(const std::vector<TString>& keys) { - TVector<TCell> cells; - for (auto& key: keys) { - cells.emplace_back(TCell(key.data(), key.size())); - } - return cells; -} - -void AddRangeQuery( - TEvDataShard::TEvRead& request, - const std::vector<TString>& from, - bool fromInclusive, - const std::vector<TString>& to, - bool toInclusive) -{ - auto fromCells = ToCells(from); - auto toCells = ToCells(to); - - // convertion is ugly, but for tests is OK - auto fromBuf = TSerializedCellVec::Serialize(fromCells); - auto toBuf = TSerializedCellVec::Serialize(toCells); - - request.Ranges.emplace_back(fromBuf, toBuf, fromInclusive, toInclusive); -} - -void AddKeyQuery( - TEvDataShard::TEvRead& request, - const TOwnedCellVec& key) -{ - auto buf = TSerializedCellVec::Serialize(key); - request.Keys.emplace_back(buf); -} - // TReadIteratorPoints class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> { @@ -266,176 +206,6 @@ private: ) }; -// TReadIteratorScan - -class TReadIteratorScan : public TActorBootstrapped<TReadIteratorScan> { - std::unique_ptr<TEvDataShard::TEvRead> Request; - const NKikimrTxDataShard::EScanDataFormat Format; - const ui64 TabletId; - const TActorId Parent; - const TSubLoadId Id; - const ui64 SampleKeyCount; - - TActorId Pipe; - bool WasConnected = false; - ui64 ReconnectLimit = 10; - - TInstant StartTs; - size_t Oks = 0; - - TVector<TOwnedCellVec> SampledKeys; - -public: - TReadIteratorScan(TEvDataShard::TEvRead* request, - ui64 tablet, - const TActorId& parent, - const TSubLoadId& id, - ui64 sample) - : Request(request) - , Format(Request->Record.GetResultFormat()) - , TabletId(tablet) - , Parent(parent) - , Id(id) - , SampleKeyCount(sample) - { - } - - void Bootstrap(const TActorContext& ctx) { - LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id - << " Bootstrap called, sample# " << SampleKeyCount); - - Become(&TReadIteratorScan::StateFunc); - Connect(ctx); - } - -private: - void Connect(const TActorContext &ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id - << " Connect to# " << TabletId << " called"); - --ReconnectLimit; - if (ReconnectLimit == 0) { - TStringStream ss; - ss << "Failed to set pipe to " << TabletId; - return StopWithError(ctx, ss.Str()); - } - Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId)); - } - - void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) { - TEvTabletPipe::TEvClientConnected *msg = ev->Get(); - - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id - << " Handle TEvClientConnected called, Status# " << msg->Status); - - if (msg->Status != NKikimrProto::OK) { - return Connect(ctx); - } - - StartTs = TInstant::Now(); - WasConnected = true; - NTabletPipe::SendData(ctx, Pipe, Request.release()); - } - - void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id - << " Handle TEvClientDestroyed called"); - - // sanity check - if (!WasConnected) { - return Connect(ctx); - } - - return StopWithError(ctx, "broken pipe"); - } - - void Handle(TEvents::TEvUndelivered::TPtr, const TActorContext& ctx) { - return StopWithError(ctx, "delivery failed"); - } - - void Handle(const TEvDataShard::TEvReadResult::TPtr& ev, const TActorContext& ctx) { - const auto* msg = ev->Get(); - const auto& record = msg->Record; - - if (record.HasStatus() && record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { - TStringStream ss; - ss << "Failed to read from ds# " << TabletId << ", code# " << record.GetStatus().GetCode(); - if (record.GetStatus().IssuesSize()) { - for (const auto& issue: record.GetStatus().GetIssues()) { - ss << ", issue: " << issue; - } - } - - return StopWithError(ctx, ss.Str()); - } - - if (Format != NKikimrTxDataShard::CELLVEC) { - return StopWithError(ctx, "Unsupported format"); - } - - Oks += msg->GetRowsCount(); - - auto ts = TInstant::Now(); - auto delta = ts - StartTs; - - if (SampleKeyCount) { - for (size_t i = 0; i < msg->GetRowsCount() && SampledKeys.size() < SampleKeyCount; ++i) { - SampledKeys.emplace_back(msg->GetCells(i)); - } - - if (record.GetFinished() || SampledKeys.size() >= SampleKeyCount) { - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id - << " finished in " << delta - << ", sampled# " << SampledKeys.size() - << ", iter finished# " << record.GetFinished() - << ", oks# " << Oks); - - ctx.Send(Parent, new TEvPrivate::TEvKeys(std::move(SampledKeys))); - return Die(ctx); - } - - return; - } else if (record.GetFinished()) { - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id - << " finished in " << delta - << ", read# " << Oks); - - auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(0); - auto& report = *response->Record.MutableReport(); - report.SetDurationMs(delta.MilliSeconds()); - report.SetOperationsOK(Oks); - report.SetOperationsError(0); - ctx.Send(Parent, response.release()); - - return Die(ctx); - } - } - - void StopWithError(const TActorContext& ctx, const TString& reason) { - LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id - << ", stopped with error: " << reason); - - ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(0, reason)); - NTabletPipe::CloseClient(SelfId(), Pipe); - return Die(ctx); - } - - void HandlePoison(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id - << " tablet recieved PoisonPill, going to die"); - - // TODO: cancel iterator - return Die(ctx); - } - - STRICT_STFUNC(StateFunc, - CFunc(TEvents::TSystem::PoisonPill, HandlePoison) - HFunc(TEvents::TEvUndelivered, Handle) - HFunc(TEvTabletPipe::TEvClientConnected, Handle) - HFunc(TEvTabletPipe::TEvClientDestroyed, Handle) - HFunc(TEvDataShard::TEvReadResult, Handle) - ) -}; - // TReadIteratorLoadScenario enum class EState { @@ -634,7 +404,7 @@ private: record.SetResultFormat(::NKikimrTxDataShard::EScanDataFormat::CELLVEC); TSubLoadId subId(Id.Tag, SelfId(), ++LastSubTag); - auto* actor = new TReadIteratorScan(request.release(), TabletId, SelfId(), subId, sampleKeys); + auto* actor = CreateReadIteratorScan(request.release(), TabletId, SelfId(), subId, sampleKeys); StartedActors.emplace_back(ctx.Register(actor)); LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "started fullscan actor# " << StartedActors.back()); diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto index 1991a020c00..8be193bae64 100644 --- a/ydb/core/protos/datashard_load.proto +++ b/ydb/core/protos/datashard_load.proto @@ -40,7 +40,10 @@ message TEvYCSBTestLoadRequest { // number of random rows to be read (point reads) optional uint64 ReadCount = 2; + // note that KQP supports only 1 inflight repeated uint32 Inflights = 3; + + // not used in KQP repeated uint32 Chunks = 4; // Specifies the format for result data in TEvReadResult @@ -87,6 +90,7 @@ message TEvYCSBTestLoadRequest { TUpdateStart UpsertProposeStart = 24; TReadStart ReadIteratorStart = 25; + TReadStart ReadKqpStart = 26; } } |