diff options
author | alexvru <alexvru@ydb.tech> | 2023-06-30 21:52:45 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-06-30 21:52:45 +0300 |
commit | a6623794d2fcbb37f4ae0e6d2435130e7187cc2e (patch) | |
tree | 8dbf54985b47723698f6fa2b9bfdaacdc25cb0f5 | |
parent | 1dd51935f8e289a4f832773b327671d60cf60913 (diff) | |
download | ydb-a6623794d2fcbb37f4ae0e6d2435130e7187cc2e.tar.gz |
Improve testshard
-rw-r--r-- | ydb/core/test_tablet/load_actor_delete.cpp | 3 | ||||
-rw-r--r-- | ydb/core/test_tablet/load_actor_impl.cpp | 18 | ||||
-rw-r--r-- | ydb/core/test_tablet/load_actor_impl.h | 9 | ||||
-rw-r--r-- | ydb/core/test_tablet/load_actor_mon.cpp | 19 | ||||
-rw-r--r-- | ydb/core/test_tablet/load_actor_read_validate.cpp | 101 |
5 files changed, 119 insertions, 31 deletions
diff --git a/ydb/core/test_tablet/load_actor_delete.cpp b/ydb/core/test_tablet/load_actor_delete.cpp index a06ab6655ac..f5a4f941c04 100644 --- a/ydb/core/test_tablet/load_actor_delete.cpp +++ b/ydb/core/test_tablet/load_actor_delete.cpp @@ -7,7 +7,8 @@ namespace NKikimr::NTestShard { options.reserve(Keys.size()); ui64 accumLen = 0; for (const auto& [key, info] : Keys) { - if (info.ConfirmedState == info.PendingState && info.ConfirmedState == ::NTestShard::TStateServer::CONFIRMED) { + if (info.ConfirmedState == info.PendingState && info.ConfirmedState == ::NTestShard::TStateServer::CONFIRMED && + !KeysBeingRead.contains(key)) { accumLen += info.Len; options.emplace_back(accumLen, key); } diff --git a/ydb/core/test_tablet/load_actor_impl.cpp b/ydb/core/test_tablet/load_actor_impl.cpp index bd190af76e4..d5cf853036d 100644 --- a/ydb/core/test_tablet/load_actor_impl.cpp +++ b/ydb/core/test_tablet/load_actor_impl.cpp @@ -10,6 +10,7 @@ namespace NKikimr::NTestShard { , Settings(settings) , StateServerWriteLatency(1024) , WriteLatency(1024) + , ReadLatency(1024) {} void TLoadActor::Bootstrap(const TActorId& parentId) { @@ -39,7 +40,7 @@ namespace NKikimr::NTestShard { return; } if (StallCounter > 500) { - if (WritesInFlight.empty() && DeletesInFlight.empty() && TransitionInFlight.empty()) { + if (WritesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && TransitionInFlight.empty()) { StallCounter = 0; } else { return; @@ -50,7 +51,7 @@ namespace NKikimr::NTestShard { barrier = Settings.GetValidateAfterBytes(); } if (BytesProcessed > barrier) { // time to perform validation - if (WritesInFlight.empty() && DeletesInFlight.empty() && TransitionInFlight.empty()) { + if (WritesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && TransitionInFlight.empty()) { RunValidation(false); } } else { // resume load @@ -60,6 +61,9 @@ namespace NKikimr::NTestShard { TActivationContext::Send(new IEventHandle(EvDoSomeAction, 0, SelfId(), {}, nullptr, 0)); } } + if (ReadsInFlight.size() < 10 && IssueRead()) { + TActivationContext::Send(new IEventHandle(EvDoSomeAction, 0, SelfId(), {}, nullptr, 0)); + } if (BytesOfData > Settings.GetMaxDataBytes()) { // delete some data if needed IssueDelete(); } @@ -132,10 +136,20 @@ namespace NKikimr::NTestShard { } DeletesInFlight.erase(it); } + if (const auto it = ReadsInFlight.find(record.GetCookie()); it != ReadsInFlight.end()) { + const auto& [key, offset, size, timestamp] = it->second; + const auto jt = KeysBeingRead.find(key); + Y_VERIFY(jt != KeysBeingRead.end() && jt->second); + if (!--jt->second) { + KeysBeingRead.erase(jt); + } + ReadsInFlight.erase(it); + } } else { STLOG(PRI_INFO, TEST_SHARD, TS04, "TEvKeyValue::TEvResponse", (TabletId, TabletId), (Msg, ev->Get()->ToString())); ProcessWriteResult(record.GetCookie(), record.GetWriteResult()); ProcessDeleteResult(record.GetCookie(), record.GetDeleteRangeResult()); + ProcessReadResult(record.GetCookie(), record.GetReadResult()); } Action(); } diff --git a/ydb/core/test_tablet/load_actor_impl.h b/ydb/core/test_tablet/load_actor_impl.h index 390baa47ac7..47c46901d6b 100644 --- a/ydb/core/test_tablet/load_actor_impl.h +++ b/ydb/core/test_tablet/load_actor_impl.h @@ -111,15 +111,24 @@ namespace NKikimr::NTestShard { std::unordered_map<ui64, TWriteInfo> WritesInFlight; // cookie -> TWriteInfo ui32 KeysWritten = 0; static constexpr TDuration WriteSpeedWindow = TDuration::Seconds(10); + static constexpr TDuration ReadSpeedWindow = TDuration::Seconds(10); TSpeedMeter WriteSpeed{WriteSpeedWindow}; + TSpeedMeter ReadSpeed{ReadSpeedWindow}; TTimeSeries StateServerWriteLatency; TTimeSeries WriteLatency; + TTimeSeries ReadLatency; void GenerateKeyValue(TString *key, TString *value, bool *isInline); void IssueWrite(); void ProcessWriteResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TWriteResult>& results); void TrimBytesWritten(TInstant now); + std::unordered_map<ui64, std::tuple<TString, ui32, ui32, TMonotonic>> ReadsInFlight; + std::unordered_map<TString, ui32> KeysBeingRead; + + bool IssueRead(); + void ProcessReadResult(ui64 cookie, const NProtoBuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TReadResult>& results); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // KV tablet delete management code diff --git a/ydb/core/test_tablet/load_actor_mon.cpp b/ydb/core/test_tablet/load_actor_mon.cpp index 24dfb0bc3c2..f96095435e9 100644 --- a/ydb/core/test_tablet/load_actor_mon.cpp +++ b/ydb/core/test_tablet/load_actor_mon.cpp @@ -68,6 +68,14 @@ namespace NKikimr::NTestShard { } TABLER() { + size_t numPoints = 0; + TDuration timeSpan; + const ui64 speed = self->ReadSpeed.GetSpeedInBytesPerSecond(now, &numPoints, &timeSpan); + TABLED() { str << "Read speed"; } + TABLED() { str << Sprintf("%.2lf", speed * 1e-6) << " MB/s @" << numPoints << ":" << timeSpan; } + } + + TABLER() { TABLED() { str << "Bytes of data stored"; } TABLED() { str << self->BytesOfData; } } @@ -88,6 +96,11 @@ namespace NKikimr::NTestShard { } TABLER() { + TABLED() { str << "Bytes read"; } + TABLED() { str << self->ReadSpeed.GetBytesAccum(); } + } + + TABLER() { TABLED() { str << "Keys written"; } TABLED() { str << self->KeysWritten; } } @@ -103,6 +116,11 @@ namespace NKikimr::NTestShard { } TABLER() { + TABLED() { str << "Reads in flight"; } + TABLED() { str << self->ReadsInFlight.size() << '/' << self->KeysBeingRead.size(); } + } + + TABLER() { TABLED() { str << "Delete requests in flight"; } TABLED() { str << self->DeletesInFlight.size(); } } @@ -135,6 +153,7 @@ namespace NKikimr::NTestShard { output(self->StateServerWriteLatency, "StateServerWriteLatency"); output(self->WriteLatency, "WriteLatency"); + output(self->ReadLatency, "ReadLatency"); } } } diff --git a/ydb/core/test_tablet/load_actor_read_validate.cpp b/ydb/core/test_tablet/load_actor_read_validate.cpp index 87947f1d69b..c13e6444f88 100644 --- a/ydb/core/test_tablet/load_actor_read_validate.cpp +++ b/ydb/core/test_tablet/load_actor_read_validate.cpp @@ -17,7 +17,7 @@ namespace NKikimr::NTestShard { std::unordered_map<TString, TKeyInfo> KeysBefore; std::unordered_map<TString, TKeyInfo> Keys; std::deque<TKey*> TransitionInFlight; - std::unordered_map<ui64, std::tuple<TString, ui32, ui32>> QueriesInFlight; + std::unordered_map<ui64, TString> QueriesInFlight; ui64 LastCookie = 0; std::deque<TString> KeysPending; @@ -114,7 +114,7 @@ namespace NKikimr::NTestShard { Send(TabletActorId, ev.release()); } - std::tuple<TString, ui32, ui32> PopQueryByCookie(ui64 cookie) { + TString PopQueryByCookie(ui64 cookie) { auto node = QueriesInFlight.extract(cookie); Y_VERIFY(node); return node.mapped(); @@ -198,7 +198,7 @@ namespace NKikimr::NTestShard { }; void ProcessReadResult(ui64 cookie, const TString& message, EReadOutcome outcome, const TString& value) { - const auto [key, offset, size] = PopQueryByCookie(cookie); + const TString& key = PopQueryByCookie(cookie); if (outcome == EReadOutcome::IMMEDIATE_RETRY) { STLOG(PRI_ERROR, TEST_SHARD, TS23, "read immediate retry", (TabletId, TabletId), (Message, message)); @@ -222,9 +222,8 @@ namespace NKikimr::NTestShard { StringSplitter(key).Split(',').CollectInto(&len, &seed, &id); TString data = FastGenDataForLZ4(len, seed); - Y_VERIFY_S(offset < len && size <= len - offset && value.size() == size && data.substr(offset, size) == value, - "TabletId# " << TabletId << " Key# " << key << " value mismatch" - << " value.size# " << value.size() << " offset# " << offset << " size# " << size); + Y_VERIFY_S(value == data, "TabletId# " << TabletId << " Key# " << key << " value mismatch" + << " value.size# " << value.size()); STLOG(PRI_DEBUG, TEST_SHARD, TS25, "read key", (TabletId, TabletId), (Key, key)); } @@ -270,17 +269,8 @@ namespace NKikimr::NTestShard { } void IssueRead(const TString& key) { - ui64 len, seed, id; - StringSplitter(key).Split(',').CollectInto(&len, &seed, &id); - - std::optional<ui32> offset, size; - if (RandomNumber(2u)) { - offset = RandomNumber(len); - size = 1 + RandomNumber(len - *offset); - } - const ui64 cookie = ++LastCookie; - const bool inserted = QueriesInFlight.try_emplace(cookie, key, offset.value_or(0), size.value_or(len)).second; + const bool inserted = QueriesInFlight.try_emplace(cookie, key).second; Y_VERIFY(inserted); std::unique_ptr<IEventBase> ev; @@ -292,12 +282,6 @@ namespace NKikimr::NTestShard { request->Record.SetCookie(cookie); auto *cmdRead = request->Record.AddCmdRead(); cmdRead->SetKey(key); - if (offset) { - cmdRead->SetOffset(*offset); - } - if (size) { - cmdRead->SetSize(*size); - } ++WaitedReadsViaEvResponse; ev = std::move(request); } else { @@ -305,12 +289,6 @@ namespace NKikimr::NTestShard { request->Record.set_tablet_id(TabletId); request->Record.set_cookie(cookie); request->Record.set_key(key); - if (offset) { - request->Record.set_offset(*offset); - } - if (size) { - request->Record.set_size(*size); - } ++WaitedReadsViaEvReadResponse; ev = std::move(request); } @@ -649,4 +627,71 @@ namespace NKikimr::NTestShard { } } + bool TLoadActor::IssueRead() { + std::vector<TString> options; + for (const auto& [key, info] : Keys) { + if (info.ConfirmedState == info.PendingState && info.ConfirmedState == ::NTestShard::TStateServer::CONFIRMED) { + options.push_back(key); + } + } + if (options.empty()) { + return false; + } + const size_t index = RandomNumber(options.size()); + const TString& key = options[index]; + ui64 len, seed, id; + StringSplitter(key).Split(',').CollectInto(&len, &seed, &id); + const ui32 offset = RandomNumber(len); + const ui32 size = 1 + RandomNumber(len - offset); + + auto request = CreateRequest(); + auto *cmdRead = request->Record.AddCmdRead(); + cmdRead->SetKey(key); + cmdRead->SetOffset(offset); + cmdRead->SetSize(size); + + STLOG(PRI_INFO, TEST_SHARD, TS16, "reading key", (TabletId, TabletId), (Key, key), (Offset, offset), (Size, size)); + + ReadsInFlight.try_emplace(request->Record.GetCookie(), key, offset, size, TActivationContext::Monotonic()); + ++KeysBeingRead[key]; + Send(TabletActorId, request.release()); + + return true; + } + + void TLoadActor::ProcessReadResult(ui64 cookie, const NProtoBuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TReadResult>& results) { + for (const auto& result : results) { + auto node = ReadsInFlight.extract(cookie); + Y_VERIFY(node); + auto& [key, offset, size, timestamp] = node.mapped(); + const TMonotonic now = TActivationContext::Monotonic(); + + auto it = KeysBeingRead.find(key); + Y_VERIFY(it != KeysBeingRead.end() && it->second); + if (!--it->second) { + KeysBeingRead.erase(key); + } + + ui64 len, seed, id; + StringSplitter(key).Split(',').CollectInto(&len, &seed, &id); + TString data = FastGenDataForLZ4(len, seed); + + STLOG(PRI_INFO, TEST_SHARD, TS18, "read key", (TabletId, TabletId), (Key, key), (Offset, offset), (Size, size), + (Status, NKikimrProto::EReplyStatus_Name(result.GetStatus()))); + + Y_VERIFY(result.GetStatus() == NKikimrProto::OK || result.GetStatus() == NKikimrProto::ERROR); + + if (result.GetStatus() == NKikimrProto::OK) { + const TString value = result.GetValue(); + + Y_VERIFY_S(offset < len && size <= len - offset && value.size() == size && data.substr(offset, size) == value, + "TabletId# " << TabletId << " Key# " << key << " value mismatch" + << " value.size# " << value.size() << " offset# " << offset << " size# " << size); + + ReadLatency.Add(now - timestamp); + ReadSpeed.Add(TActivationContext::Now(), size); + } + } + } + } // NKikimr::NTestShard |