diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-09-23 15:50:07 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-09-23 15:50:07 +0300 |
commit | ae5c206d0c9eabe1b14e297b044dfd2becc96f01 (patch) | |
tree | 88eb8ac374a320bed24121e31a1aeb7e4a61ee81 | |
parent | f7362d18e23479d33f86e53dea41c0ef0974bc8d (diff) | |
download | ydb-ae5c206d0c9eabe1b14e297b044dfd2becc96f01.tar.gz |
basic read iterator load actor
-rw-r--r-- | ydb/core/protos/datashard_load.proto | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_testload.cpp | 68 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/kqp_upsert.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_actor.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_actor.h | 10 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_read_iterator.cpp | 677 |
6 files changed, 742 insertions, 26 deletions
diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto index 5700539266b..edb71cf82d0 100644 --- a/ydb/core/protos/datashard_load.proto +++ b/ydb/core/protos/datashard_load.proto @@ -22,8 +22,10 @@ message TEvTestLoadRequest { } message TReadStart { - optional uint64 TabletId = 1; - optional uint64 TableId = 2; + optional string Path = 1; + + // defines dataset size, normally must be withing 2 GiB + optional uint64 RowCount = 2; // Specifies the format for result data in TEvReadResult optional NKikimrTxDataShard.EScanDataFormat ResultFormat = 3; diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp index a13da3e9595..eebc1f4dec9 100644 --- a/ydb/core/tx/datashard/datashard_ut_testload.cpp +++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp @@ -159,7 +159,27 @@ struct TTestHelper { return WaitReadResult(); } - void RunTestLoad(std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request) { + void CheckKeysCount(size_t expectedRowCount) { + TVector<TString> from = {TString("user")}; + TVector<TString> to = {TString("zzz")}; + + auto request = GetBaseReadRequest(); + AddRangeQuery( + *request, + from, + true, + to, + true + ); + + auto readResult = SendRead(request.release()); + UNIT_ASSERT(readResult); + UNIT_ASSERT_VALUES_EQUAL(readResult->GetRowsCount(), expectedRowCount); + } + + std::unique_ptr<TEvDataShardLoad::TEvTestLoadFinished> RunTestLoad( + std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request) + { request->Record.SetNotifyWhenFinished(true); auto &runtime = *Server->GetRuntime(); TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(new ::NMonitoring::TDynamicCounters()); @@ -185,28 +205,14 @@ struct TTestHelper { auto response = handle->Release<TEvDataShardLoad::TEvTestLoadFinished>(); UNIT_ASSERT(response->Report); UNIT_ASSERT(!response->ErrorReason); + + return std::unique_ptr<TEvDataShardLoad::TEvTestLoadFinished>(response.Release()); } } void RunUpsertTestLoad(std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> loadRequest, size_t expectedRowCount) { RunTestLoad(std::move(loadRequest)); - - // holds memory for TCell - TVector<TString> from = {TString("user")}; - TVector<TString> to = {TString("zzz")}; - - auto request = GetBaseReadRequest(); - AddRangeQuery( - *request, - from, - true, - to, - true - ); - - auto readResult = SendRead(request.release()); - UNIT_ASSERT(readResult); - UNIT_ASSERT_VALUES_EQUAL(readResult->GetRowsCount(), expectedRowCount); + CheckKeysCount(expectedRowCount); } public: @@ -274,4 +280,30 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { } // Y_UNIT_TEST_SUITE(UpsertLoad) +Y_UNIT_TEST_SUITE(ReadLoad) { + Y_UNIT_TEST(ShouldReadIterate) { + TTestHelper helper; + + const ui64 expectedRowCount = 20; + + std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest()); + auto& record = request->Record; + auto& command = *record.MutableReadIteratorStart(); + + command.SetRowCount(expectedRowCount); + command.SetPath("/Root/usertable"); + + auto result = helper.RunTestLoad(std::move(request)); + UNIT_ASSERT(result->Report); + + // fullscans with different chunks: 5 + // read head with inflight 1 + UNIT_ASSERT_VALUES_EQUAL(result->Report->SubtestCount, 6); + + // sanity check that there was data in table + helper.CheckKeysCount(expectedRowCount); + } + +} // Y_UNIT_TEST_SUITE(ReadLoad) + } // namespace NKikimr diff --git a/ydb/core/tx/datashard/testload/kqp_upsert.cpp b/ydb/core/tx/datashard/testload/kqp_upsert.cpp index 0f24cf4a268..2c7de33c7c7 100644 --- a/ydb/core/tx/datashard/testload/kqp_upsert.cpp +++ b/ydb/core/tx/datashard/testload/kqp_upsert.cpp @@ -1,7 +1,6 @@ #include "actors.h" #include <ydb/core/base/tablet.h> -#include <ydb/core/base/tablet_pipe.h> #include <ydb/core/kqp/kqp.h> #include <ydb/core/ydb_convert/ydb_convert.h> diff --git a/ydb/core/tx/datashard/testload/test_load_actor.cpp b/ydb/core/tx/datashard/testload/test_load_actor.cpp index cec2b3674c9..a300ad506d0 100644 --- a/ydb/core/tx/datashard/testload/test_load_actor.cpp +++ b/ydb/core/tx/datashard/testload/test_load_actor.cpp @@ -229,8 +229,10 @@ public: void Handle(TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) { const auto& msg = ev->Get(); auto it = LoadActors.find(msg->Tag); - Y_VERIFY(it != LoadActors.end()); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load actor with tag# " << msg->Tag << " finished"); + Y_VERIFY(it != LoadActors.end(), "%s", (TStringBuilder() << "failed to find actor with tag# " << msg->Tag + << ", TEvTestLoadFinished from actor# " << ev->Sender).c_str()); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load actor# " << ev->Sender + << " with tag# " << msg->Tag << " finished"); if (it->second.Parent) { auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(); diff --git a/ydb/core/tx/datashard/testload/test_load_actor.h b/ydb/core/tx/datashard/testload/test_load_actor.h index b4a0549c19f..053fe433d7d 100644 --- a/ydb/core/tx/datashard/testload/test_load_actor.h +++ b/ydb/core/tx/datashard/testload/test_load_actor.h @@ -38,6 +38,10 @@ struct TEvDataShardLoad { ui64 OperationsOK = 0; ui64 OperationsError = 0; + // info might contain result for multiple subtests + TString Info; + ui64 SubtestCount = 0; + TString ToString() const { TStringStream ss; ss << "Load duration: " << Duration << ", OK=" << OperationsOK << ", Error=" << OperationsError; @@ -45,6 +49,12 @@ struct TEvDataShardLoad { ui64 throughput = OperationsOK / Duration.Seconds(); ss << ", throughput=" << throughput << " OK_ops/s"; } + if (SubtestCount) { + ss << ", subtests: " << SubtestCount; + } + if (Info) { + ss << ", Info: " << Info; + } return ss.Str(); } }; diff --git a/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp b/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp index 8531888715d..11752bd8d3b 100644 --- a/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp +++ b/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp @@ -2,14 +2,21 @@ #include <ydb/core/base/tablet.h> #include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/core/tx/tx_proxy/proxy.h> +#include <library/cpp/histogram/hdr/histogram.h> #include <library/cpp/monlib/service/pages/templates.h> #include <util/datetime/cputimer.h> #include <util/random/random.h> +#include <util/system/hp_timer.h> #include <google/protobuf/text_format.h> +#include <algorithm> +#include <random> + // * Scheme is hardcoded and it is like default YCSB setup: // table name is "usertable", 1 utf8 "key" column, 10 utf8 "field0" - "field9" columns // * row is ~ 1 KB, keys are like user1000385178204227360 @@ -18,6 +25,368 @@ namespace NKikimr::NDataShardLoad { namespace { +struct TEvPrivate { + enum EEv { + EvKeys = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvPointTimes, + EvEnd, + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE)); + + struct TEvKeys : public TEventLocal<TEvKeys, EvKeys> { + TVector<TOwnedCellVec> Keys; + + TEvKeys(TVector<TOwnedCellVec>&& keys) + : Keys(std::move(keys)) + { + } + }; + + struct TEvPointTimes : public TEventLocal<TEvPointTimes, EvPointTimes> { + TVector<TDuration> RequestTimes; + + TEvPointTimes(TVector<TDuration>&& requestTime) + : RequestTimes(std::move(requestTime)) + { + } + }; +}; + +TVector<TCell> ToCells(const std::vector<TString>& keys) { + TVector<TCell> cells; + for (auto& key: keys) { + cells.emplace_back(TCell(key.data(), key.size())); + } + return cells; +} + +void AddRangeQuery( + TEvDataShard::TEvRead& request, + const std::vector<TString>& from, + bool fromInclusive, + const std::vector<TString>& to, + bool toInclusive) +{ + auto fromCells = ToCells(from); + auto toCells = ToCells(to); + + // convertion is ugly, but for tests is OK + auto fromBuf = TSerializedCellVec::Serialize(fromCells); + auto toBuf = TSerializedCellVec::Serialize(toCells); + + request.Ranges.emplace_back(fromBuf, toBuf, fromInclusive, toInclusive); +} + +void AddKeyQuery( + TEvDataShard::TEvRead& request, + const TOwnedCellVec& key) +{ + auto buf = TSerializedCellVec::Serialize(key); + request.Keys.emplace_back(buf); +} + +class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> { + const std::unique_ptr<const TEvDataShard::TEvRead> BaseRequest; + const NKikimrTxDataShard::EScanDataFormat Format; + const ui64 TabletId; + const TActorId Parent; + + TActorId Pipe; + + TInstant StartTs; // actor started to send requests + size_t Oks = 0; + + TVector<TOwnedCellVec> Points; + size_t CurrentPoint = 0; + THPTimer RequestTimer; + + TVector<TDuration> RequestTimes; + +public: + TReadIteratorPoints(TEvDataShard::TEvRead* request, + ui64 tablet, + const TActorId& parent, + const TVector<TOwnedCellVec>& points) + : BaseRequest(request) + , Format(BaseRequest->Record.GetResultFormat()) + , TabletId(tablet) + , Parent(parent) + , Points(points) + { + RequestTimes.reserve(Points.size()); + } + + void Bootstrap(const TActorContext& ctx) { + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId() + << " with parent# " << Parent << " Bootstrap called"); + + Become(&TReadIteratorPoints::StateFunc); + + auto rng = std::default_random_engine {}; + std::shuffle(Points.begin(), Points.end(), rng); + + Connect(ctx); + } + +private: + void Connect(const TActorContext &ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId() + << " with parent# " << Parent << " Connect to# " << TabletId << " called"); + Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId)); + } + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) { + TEvTabletPipe::TEvClientConnected *msg = ev->Get(); + + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId() + << " with parent# " << Parent << " Handle TEvClientConnected called, Status# " << msg->Status); + + if (msg->Status != NKikimrProto::OK) { + TStringStream ss; + ss << "Failed to connect to " << TabletId << ", status: " << msg->Status; + return StopWithError(ctx, ss.Str()); + } + + StartTs = TInstant::Now(); + SendRead(ctx); + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId() + << " with parent# " << Parent << " Handle TEvClientDestroyed called"); + return StopWithError(ctx, "broken pipe"); + } + + void Handle(TEvents::TEvUndelivered::TPtr, const TActorContext& ctx) { + return StopWithError(ctx, "delivery failed"); + } + + void SendRead(const TActorContext &ctx) { + Y_VERIFY(CurrentPoint < Points.size()); + + auto request = std::make_unique<TEvDataShard::TEvRead>(); + request->Record = BaseRequest->Record; + AddKeyQuery(*request, Points[CurrentPoint++]); + + RequestTimer.Reset(); + NTabletPipe::SendData(ctx, Pipe, request.release()); + } + + void Handle(const TEvDataShard::TEvReadResult::TPtr& ev, const TActorContext& ctx) { + const auto* msg = ev->Get(); + const auto& record = msg->Record; + + if (record.HasStatus() && record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { + TStringStream ss; + ss << "Failed to read from ds# " << TabletId << ", code# " << record.GetStatus().GetCode(); + if (record.GetStatus().IssuesSize()) { + for (const auto& issue: record.GetStatus().GetIssues()) { + ss << ", issue: " << issue; + } + } + + return StopWithError(ctx, ss.Str()); + return; + } + + if (Format != NKikimrTxDataShard::CELLVEC) { + return StopWithError(ctx, "Unsupported format"); + } + + Oks += msg->GetRowsCount(); + RequestTimes.push_back(TDuration::Seconds(RequestTimer.Passed())); + + if (CurrentPoint < Points.size()) { + SendRead(ctx); + return; + } + + // finish + ctx.Send(Parent, new TEvPrivate::TEvPointTimes(std::move(RequestTimes))); + Die(ctx); + } + + void StopWithError(const TActorContext& ctx, const TString& reason) { + LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId() + << " with parent# " << Parent << ", stopped with error: " << reason); + + ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(0, reason)); + NTabletPipe::CloseClient(SelfId(), Pipe); + return Die(ctx); + } + + void HandlePoison(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId() + << " with parent# " << Parent << " tablet recieved PoisonPill, going to die"); + + // TODO: cancel iterator + return Die(ctx); + } + + STRICT_STFUNC(StateFunc, + CFunc(TEvents::TSystem::PoisonPill, HandlePoison) + HFunc(TEvents::TEvUndelivered, Handle) + HFunc(TEvTabletPipe::TEvClientConnected, Handle) + HFunc(TEvTabletPipe::TEvClientDestroyed, Handle) + HFunc(TEvDataShard::TEvReadResult, Handle) + ) +}; + +class TReadIteratorScan : public TActorBootstrapped<TReadIteratorScan> { + std::unique_ptr<TEvDataShard::TEvRead> Request; + const NKikimrTxDataShard::EScanDataFormat Format; + const ui64 TabletId; + const TActorId Parent; + const ui64 SampleKeyCount; + + TActorId Pipe; + + TInstant StartTs; + size_t Oks = 0; + + TVector<TOwnedCellVec> SampledKeys; + +public: + TReadIteratorScan(TEvDataShard::TEvRead* request, ui64 tablet, const TActorId& parent, ui64 sample) + : Request(request) + , Format(Request->Record.GetResultFormat()) + , TabletId(tablet) + , Parent(parent) + , SampleKeyCount(sample) + { + } + + void Bootstrap(const TActorContext& ctx) { + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() + << " with parent# " << Parent << " Bootstrap called"); + + Become(&TReadIteratorScan::StateFunc); + Connect(ctx); + } + +private: + void Connect(const TActorContext &ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() + << " with parent# " << Parent << " Connect to# " << TabletId << " called"); + Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId)); + } + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) { + TEvTabletPipe::TEvClientConnected *msg = ev->Get(); + + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() + << " with parent# " << Parent << " Handle TEvClientConnected called, Status# " << msg->Status); + + if (msg->Status != NKikimrProto::OK) { + TStringStream ss; + ss << "Failed to connect to " << TabletId << ", status: " << msg->Status; + return StopWithError(ctx, ss.Str()); + } + + StartTs = TInstant::Now(); + NTabletPipe::SendData(ctx, Pipe, Request.release()); + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() + << " with parent# " << Parent << " Handle TEvClientDestroyed called"); + return StopWithError(ctx, "broken pipe"); + } + + void Handle(TEvents::TEvUndelivered::TPtr, const TActorContext& ctx) { + return StopWithError(ctx, "delivery failed"); + } + + void Handle(const TEvDataShard::TEvReadResult::TPtr& ev, const TActorContext& ctx) { + const auto* msg = ev->Get(); + const auto& record = msg->Record; + + if (record.HasStatus() && record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { + TStringStream ss; + ss << "Failed to read from ds# " << TabletId << ", code# " << record.GetStatus().GetCode(); + if (record.GetStatus().IssuesSize()) { + for (const auto& issue: record.GetStatus().GetIssues()) { + ss << ", issue: " << issue; + } + } + + return StopWithError(ctx, ss.Str()); + return; + } + + if (Format != NKikimrTxDataShard::CELLVEC) { + return StopWithError(ctx, "Unsupported format"); + } + + Oks += msg->GetRowsCount(); + + auto ts = TInstant::Now(); + auto delta = ts - StartTs; + + if (SampleKeyCount) { + for (size_t i = 0; i < msg->GetRowsCount() && SampledKeys.size() < SampleKeyCount; ++i) { + SampledKeys.emplace_back(msg->GetCells(i)); + } + + if (record.GetFinished() || SampledKeys.size() >= SampleKeyCount) { + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() + << " with parent# " << Parent << " finished in " << delta << ", sampled# " << SampledKeys.size()); + + ctx.Send(Parent, new TEvPrivate::TEvKeys(std::move(SampledKeys))); + return Die(ctx); + } + + return; + } else if (record.GetFinished()) { + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() + << " with parent# " << Parent << " finished in " << delta); + + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(0); + response->Report = TEvDataShardLoad::TLoadReport(); + response->Report->Duration = delta; + response->Report->OperationsOK = Oks; + response->Report->OperationsError = 0; + ctx.Send(Parent, response.release()); + + return Die(ctx); + } + } + + void StopWithError(const TActorContext& ctx, const TString& reason) { + LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() + << " with parent# " << Parent << ", stopped with error: " << reason); + + ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(0, reason)); + NTabletPipe::CloseClient(SelfId(), Pipe); + return Die(ctx); + } + + void HandlePoison(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() + << " with parent# " << Parent << " tablet recieved PoisonPill, going to die"); + + // TODO: cancel iterator + return Die(ctx); + } + + STRICT_STFUNC(StateFunc, + CFunc(TEvents::TSystem::PoisonPill, HandlePoison) + HFunc(TEvents::TEvUndelivered, Handle) + HFunc(TEvTabletPipe::TEvClientConnected, Handle) + HFunc(TEvTabletPipe::TEvClientDestroyed, Handle) + HFunc(TEvDataShard::TEvReadResult, Handle) + ) +}; + +enum class EState { + DescribePath, + Upsert, + FullScan, + FullScanGetKeys, + ReadHeadPoints, +}; + class TReadIteratorLoadScenario : public TActorBootstrapped<TReadIteratorLoadScenario> { const NKikimrDataShardLoad::TEvTestLoadRequest::TReadStart Config; const TActorId Parent; @@ -28,6 +397,33 @@ class TReadIteratorLoadScenario : public TActorBootstrapped<TReadIteratorLoadSce TString ConfingString; + ui64 TabletId = 0; + ui64 TableId = 0; + ui64 OwnerId = 0; + + TVector<ui32> KeyColumnIds; + TVector<ui32> AllColumnIds; + + TVector<TActorId> StartedActors; + ui64 LastReadId = 0; + + TVector<TOwnedCellVec> Keys; + + size_t Oks = 0; + size_t Errors = 0; + TVector<TEvDataShardLoad::TLoadReport> Results; + NHdr::THistogram HeadReadsHist; + + EState State = EState::DescribePath; + + // setup for fullscan + const TVector<ui64> ChunkSizes = {0, 1, 10, 100, 1000}; + size_t ChunkIndex = 0; + + // setup for all load tests + const TVector<ui64> Inflights = {1, 2, 10, 50, 100, 200, 400 }; + size_t InflightIndex = 0; + public: TReadIteratorLoadScenario(const NKikimrDataShardLoad::TEvTestLoadRequest::TReadStart& cmd, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag) @@ -35,18 +431,262 @@ public: , Parent(parent) , Counters(std::move(counters)) , Tag(tag) + , HeadReadsHist(1000, 4) { google::protobuf::TextFormat::PrintToString(cmd, &ConfingString); } void Bootstrap(const TActorContext& ctx) { - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag - << " Bootstrap called: " << ConfingString); + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << SelfId() + << " with tag# " << Tag << " Bootstrap called: " << ConfingString); Become(&TReadIteratorLoadScenario::StateFunc); + Run(ctx); } private: + void ResetIndices() { + ChunkIndex = 0; + InflightIndex = 0; + } + + void Run(const TActorContext& ctx) { + switch (State) { + case EState::DescribePath: + DescribePath(ctx); + return; + case EState::Upsert: + UpsertData(ctx); + return; + case EState::FullScan: + RunFullScan(ctx, 0); + break; + case EState::FullScanGetKeys: + RunFullScan(ctx, Config.GetRowCount()); + break; + case EState::ReadHeadPoints: + RunHeadReads(ctx); + break; + } + } + + void DescribePath(const TActorContext& ctx) { + auto request = std::make_unique<TEvTxUserProxy::TEvNavigate>(); + request->Record.MutableDescribePath()->SetPath(Config.GetPath()); + ctx.Send(MakeTxProxyID(), request.release()); + } + + void Handle(const NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->GetRecord(); + OwnerId = record.GetPathOwnerId(); + TableId = record.GetPathId(); + + const auto& description = record.GetPathDescription(); + if (description.TablePartitionsSize() != 1) { + return StopWithError( + ctx, + TStringBuilder() << "Path must have exactly 1 part, has: " << description.TablePartitionsSize()); + } + const auto& partition = description.GetTablePartitions(0); + TabletId = partition.GetDatashardId(); + + const auto& table = description.GetTable(); + + KeyColumnIds.reserve(table.KeyColumnIdsSize()); + for (const auto& id: table.GetKeyColumnIds()) { + KeyColumnIds.push_back(id); + } + + AllColumnIds.reserve(table.ColumnsSize()); + for (const auto& column: table.GetColumns()) { + AllColumnIds.push_back(column.GetId()); + } + + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + << " will work with tablet# " << TabletId << " with ownerId# " << OwnerId + << " with tableId# " << TableId << " resolved for path# " << Config.GetPath() + << " with columnsCount# " << AllColumnIds.size() << ", keyColumnCount# " << KeyColumnIds.size()); + + State = EState::Upsert; + Run(ctx); + } + + void UpsertData(const TActorContext& ctx) { + NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart upsertConfig; + upsertConfig.SetRowCount(Config.GetRowCount()); + upsertConfig.SetTabletId(TabletId); + upsertConfig.SetTableId(TableId); + upsertConfig.SetInflight(100); // some good value to upsert fast + + auto* upsertActor = CreateUpsertBulkActor(upsertConfig, SelfId(), Counters, /* meaningless tag */ 1000000); + StartedActors.emplace_back(ctx.Register(upsertActor)); + + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + << " started upsert actor with id# " << StartedActors.back()); + } + + void RunFullScan(const TActorContext& ctx, ui64 sampleKeys) { + auto request = std::make_unique<TEvDataShard::TEvRead>(); + auto& record = request->Record; + + record.SetReadId(++LastReadId); + record.MutableTableId()->SetOwnerId(OwnerId); + record.MutableTableId()->SetTableId(TableId); + + if (sampleKeys) { + for (const auto& id: KeyColumnIds) { + record.AddColumns(id); + } + } else { + for (const auto& id: AllColumnIds) { + record.AddColumns(id); + } + } + + if (!sampleKeys && ChunkSizes[ChunkIndex]) + record.SetMaxRowsInResult(ChunkSizes[ChunkIndex]); + + TVector<TString> from = {TString("user")}; + TVector<TString> to = {TString("zzz")}; + AddRangeQuery(*request, from, true, to, true); + + record.SetResultFormat(::NKikimrTxDataShard::EScanDataFormat::CELLVEC); + + auto* actor = new TReadIteratorScan(request.release(), TabletId, SelfId(), sampleKeys); + StartedActors.emplace_back(ctx.Register(actor)); + + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "started fullscan actor# " << StartedActors.back()); + } + + void Handle(const TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) { + const auto* msg = ev->Get(); + if (msg->ErrorReason || !msg->Report) { + TStringStream ss; + ss << "read iterator actor# " << msg->Tag << " finished with error: " << msg->ErrorReason + << " in State# " << (int)State; + if (msg->Report) + ss << ", report: " << msg->Report->ToString(); + + return StopWithError(ctx, ss.Str()); + } + + switch (State) { + case EState::Upsert: { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "upsert actor# " << ev->Sender + << " finished: " << msg->Report->ToString()); + State = EState::FullScan; + Run(ctx); + return; + } + case EState::FullScan: { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "fullscan actor# " << ev->Sender + << " with chunkSize# " << ChunkSizes[ChunkIndex] + << " finished: " << msg->Report->ToString()); + Errors += msg->Report->OperationsError; + Oks += msg->Report->OperationsOK; + Results.emplace_back(*msg->Report); + + ++ChunkIndex; + if (ChunkIndex == ChunkSizes.size()) + State = EState::FullScanGetKeys; + Run(ctx); + return; + } + case EState::DescribePath: + case EState::FullScanGetKeys: + return StopWithError(ctx, TStringBuilder() << "TEvTestLoadFinished while in " << State); + case EState::ReadHeadPoints: + break; + } + } + + void Handle(TEvPrivate::TEvKeys::TPtr& ev, const TActorContext& ctx) { + Keys = std::move(ev->Get()->Keys); + + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + << " received keyCount# " << Keys.size()); + + State = EState::ReadHeadPoints; + Run(ctx); + } + + void RunHeadReads(const TActorContext& ctx) { + auto request = std::make_unique<TEvDataShard::TEvRead>(); + auto& record = request->Record; + + record.SetReadId(++LastReadId); + record.MutableTableId()->SetOwnerId(OwnerId); + record.MutableTableId()->SetTableId(TableId); + + for (const auto& id: AllColumnIds) { + record.AddColumns(id); + } + + record.SetResultFormat(::NKikimrTxDataShard::EScanDataFormat::CELLVEC); + + + auto* readActor = new TReadIteratorPoints(request.release(), TabletId, SelfId(), Keys); + StartedActors.emplace_back(ctx.Register(readActor)); + + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + << " started read actor with id# " << StartedActors.back()); + } + + void Handle(TEvPrivate::TEvPointTimes::TPtr& ev, const TActorContext& ctx) { + const auto& requestTimes = ev->Get()->RequestTimes; + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + << " received point times# " << requestTimes.size()); + + for (auto t: requestTimes) { + auto ms = t.MilliSeconds(); + if (ms == 0) + ms = 1; // round + HeadReadsHist.RecordValue(ms); + } + + Finish(ctx); + return; + } + + void Finish(const TActorContext& ctx) { + auto ts = TInstant::Now(); + auto delta = ts - StartTs; + + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag); + response->Report = TEvDataShardLoad::TLoadReport(); + response->Report->Duration = delta; + response->Report->OperationsOK = Oks; + response->Report->OperationsError = 0; + + TStringStream ss; + for (const auto& report: Results) { + ss << report.ToString() << Endl; + } + + { + i64 v50 = HeadReadsHist.GetValueAtPercentile(50.0); + i64 v95 = HeadReadsHist.GetValueAtPercentile(95.00); + i64 v99 = HeadReadsHist.GetValueAtPercentile(99.00); + i64 v999 = HeadReadsHist.GetValueAtPercentile(99.9); + + ss << "HeadReads (ms): " << "50%: " << v50 + << "95%: " << v95 + << "99%: " << v99 + << "99.9%: " << v999 + << Endl; + } + + response->Report->Info = ss.Str(); + response->Report->SubtestCount = Results.size() + 1; + + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + << " finished in " << delta << " with report:\n" << response->Report->Info); + + ctx.Send(Parent, response.release()); + + return Die(ctx); + } + void Handle(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) { TStringStream str; HTML(str) { @@ -70,12 +710,19 @@ private: } void Stop(const TActorContext& ctx) { - Die(ctx); + for (const auto& actorId: StartedActors) { + ctx.Send(actorId, new TEvents::TEvPoison()); + } + return Die(ctx); } STRICT_STFUNC(StateFunc, CFunc(TEvents::TSystem::PoisonPill, HandlePoison) HFunc(NMon::TEvHttpInfo, Handle) + HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle) + HFunc(TEvDataShardLoad::TEvTestLoadFinished, Handle) + HFunc(TEvPrivate::TEvKeys, Handle) + HFunc(TEvPrivate::TEvPointTimes, Handle) ) }; @@ -88,3 +735,27 @@ NActors::IActor *CreateReadIteratorActor(const NKikimrDataShardLoad::TEvTestLoad } } // NKikimr::NDataShardLoad + +template <> +inline void Out<NKikimr::NDataShardLoad::EState>(IOutputStream& o, NKikimr::NDataShardLoad::EState state) { + switch (state) { + case NKikimr::NDataShardLoad::EState::DescribePath: + o << "describepath"; + break; + case NKikimr::NDataShardLoad::EState::Upsert: + o << "upsert"; + break; + case NKikimr::NDataShardLoad::EState::FullScan: + o << "fullscan"; + break; + case NKikimr::NDataShardLoad::EState::FullScanGetKeys: + o << "fullscangetkeys"; + break; + case NKikimr::NDataShardLoad::EState::ReadHeadPoints: + o << "readheadpoints"; + break; + default: + o << (int)state; + break; + } +} |