aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-06-30 21:52:45 +0300
committeralexvru <alexvru@ydb.tech>2023-06-30 21:52:45 +0300
commita6623794d2fcbb37f4ae0e6d2435130e7187cc2e (patch)
tree8dbf54985b47723698f6fa2b9bfdaacdc25cb0f5
parent1dd51935f8e289a4f832773b327671d60cf60913 (diff)
downloadydb-a6623794d2fcbb37f4ae0e6d2435130e7187cc2e.tar.gz
Improve testshard
-rw-r--r--ydb/core/test_tablet/load_actor_delete.cpp3
-rw-r--r--ydb/core/test_tablet/load_actor_impl.cpp18
-rw-r--r--ydb/core/test_tablet/load_actor_impl.h9
-rw-r--r--ydb/core/test_tablet/load_actor_mon.cpp19
-rw-r--r--ydb/core/test_tablet/load_actor_read_validate.cpp101
5 files changed, 119 insertions, 31 deletions
diff --git a/ydb/core/test_tablet/load_actor_delete.cpp b/ydb/core/test_tablet/load_actor_delete.cpp
index a06ab6655ac..f5a4f941c04 100644
--- a/ydb/core/test_tablet/load_actor_delete.cpp
+++ b/ydb/core/test_tablet/load_actor_delete.cpp
@@ -7,7 +7,8 @@ namespace NKikimr::NTestShard {
options.reserve(Keys.size());
ui64 accumLen = 0;
for (const auto& [key, info] : Keys) {
- if (info.ConfirmedState == info.PendingState && info.ConfirmedState == ::NTestShard::TStateServer::CONFIRMED) {
+ if (info.ConfirmedState == info.PendingState && info.ConfirmedState == ::NTestShard::TStateServer::CONFIRMED &&
+ !KeysBeingRead.contains(key)) {
accumLen += info.Len;
options.emplace_back(accumLen, key);
}
diff --git a/ydb/core/test_tablet/load_actor_impl.cpp b/ydb/core/test_tablet/load_actor_impl.cpp
index bd190af76e4..d5cf853036d 100644
--- a/ydb/core/test_tablet/load_actor_impl.cpp
+++ b/ydb/core/test_tablet/load_actor_impl.cpp
@@ -10,6 +10,7 @@ namespace NKikimr::NTestShard {
, Settings(settings)
, StateServerWriteLatency(1024)
, WriteLatency(1024)
+ , ReadLatency(1024)
{}
void TLoadActor::Bootstrap(const TActorId& parentId) {
@@ -39,7 +40,7 @@ namespace NKikimr::NTestShard {
return;
}
if (StallCounter > 500) {
- if (WritesInFlight.empty() && DeletesInFlight.empty() && TransitionInFlight.empty()) {
+ if (WritesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && TransitionInFlight.empty()) {
StallCounter = 0;
} else {
return;
@@ -50,7 +51,7 @@ namespace NKikimr::NTestShard {
barrier = Settings.GetValidateAfterBytes();
}
if (BytesProcessed > barrier) { // time to perform validation
- if (WritesInFlight.empty() && DeletesInFlight.empty() && TransitionInFlight.empty()) {
+ if (WritesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && TransitionInFlight.empty()) {
RunValidation(false);
}
} else { // resume load
@@ -60,6 +61,9 @@ namespace NKikimr::NTestShard {
TActivationContext::Send(new IEventHandle(EvDoSomeAction, 0, SelfId(), {}, nullptr, 0));
}
}
+ if (ReadsInFlight.size() < 10 && IssueRead()) {
+ TActivationContext::Send(new IEventHandle(EvDoSomeAction, 0, SelfId(), {}, nullptr, 0));
+ }
if (BytesOfData > Settings.GetMaxDataBytes()) { // delete some data if needed
IssueDelete();
}
@@ -132,10 +136,20 @@ namespace NKikimr::NTestShard {
}
DeletesInFlight.erase(it);
}
+ if (const auto it = ReadsInFlight.find(record.GetCookie()); it != ReadsInFlight.end()) {
+ const auto& [key, offset, size, timestamp] = it->second;
+ const auto jt = KeysBeingRead.find(key);
+ Y_VERIFY(jt != KeysBeingRead.end() && jt->second);
+ if (!--jt->second) {
+ KeysBeingRead.erase(jt);
+ }
+ ReadsInFlight.erase(it);
+ }
} else {
STLOG(PRI_INFO, TEST_SHARD, TS04, "TEvKeyValue::TEvResponse", (TabletId, TabletId), (Msg, ev->Get()->ToString()));
ProcessWriteResult(record.GetCookie(), record.GetWriteResult());
ProcessDeleteResult(record.GetCookie(), record.GetDeleteRangeResult());
+ ProcessReadResult(record.GetCookie(), record.GetReadResult());
}
Action();
}
diff --git a/ydb/core/test_tablet/load_actor_impl.h b/ydb/core/test_tablet/load_actor_impl.h
index 390baa47ac7..47c46901d6b 100644
--- a/ydb/core/test_tablet/load_actor_impl.h
+++ b/ydb/core/test_tablet/load_actor_impl.h
@@ -111,15 +111,24 @@ namespace NKikimr::NTestShard {
std::unordered_map<ui64, TWriteInfo> WritesInFlight; // cookie -> TWriteInfo
ui32 KeysWritten = 0;
static constexpr TDuration WriteSpeedWindow = TDuration::Seconds(10);
+ static constexpr TDuration ReadSpeedWindow = TDuration::Seconds(10);
TSpeedMeter WriteSpeed{WriteSpeedWindow};
+ TSpeedMeter ReadSpeed{ReadSpeedWindow};
TTimeSeries StateServerWriteLatency;
TTimeSeries WriteLatency;
+ TTimeSeries ReadLatency;
void GenerateKeyValue(TString *key, TString *value, bool *isInline);
void IssueWrite();
void ProcessWriteResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TWriteResult>& results);
void TrimBytesWritten(TInstant now);
+ std::unordered_map<ui64, std::tuple<TString, ui32, ui32, TMonotonic>> ReadsInFlight;
+ std::unordered_map<TString, ui32> KeysBeingRead;
+
+ bool IssueRead();
+ void ProcessReadResult(ui64 cookie, const NProtoBuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TReadResult>& results);
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// KV tablet delete management code
diff --git a/ydb/core/test_tablet/load_actor_mon.cpp b/ydb/core/test_tablet/load_actor_mon.cpp
index 24dfb0bc3c2..f96095435e9 100644
--- a/ydb/core/test_tablet/load_actor_mon.cpp
+++ b/ydb/core/test_tablet/load_actor_mon.cpp
@@ -68,6 +68,14 @@ namespace NKikimr::NTestShard {
}
TABLER() {
+ size_t numPoints = 0;
+ TDuration timeSpan;
+ const ui64 speed = self->ReadSpeed.GetSpeedInBytesPerSecond(now, &numPoints, &timeSpan);
+ TABLED() { str << "Read speed"; }
+ TABLED() { str << Sprintf("%.2lf", speed * 1e-6) << " MB/s @" << numPoints << ":" << timeSpan; }
+ }
+
+ TABLER() {
TABLED() { str << "Bytes of data stored"; }
TABLED() { str << self->BytesOfData; }
}
@@ -88,6 +96,11 @@ namespace NKikimr::NTestShard {
}
TABLER() {
+ TABLED() { str << "Bytes read"; }
+ TABLED() { str << self->ReadSpeed.GetBytesAccum(); }
+ }
+
+ TABLER() {
TABLED() { str << "Keys written"; }
TABLED() { str << self->KeysWritten; }
}
@@ -103,6 +116,11 @@ namespace NKikimr::NTestShard {
}
TABLER() {
+ TABLED() { str << "Reads in flight"; }
+ TABLED() { str << self->ReadsInFlight.size() << '/' << self->KeysBeingRead.size(); }
+ }
+
+ TABLER() {
TABLED() { str << "Delete requests in flight"; }
TABLED() { str << self->DeletesInFlight.size(); }
}
@@ -135,6 +153,7 @@ namespace NKikimr::NTestShard {
output(self->StateServerWriteLatency, "StateServerWriteLatency");
output(self->WriteLatency, "WriteLatency");
+ output(self->ReadLatency, "ReadLatency");
}
}
}
diff --git a/ydb/core/test_tablet/load_actor_read_validate.cpp b/ydb/core/test_tablet/load_actor_read_validate.cpp
index 87947f1d69b..c13e6444f88 100644
--- a/ydb/core/test_tablet/load_actor_read_validate.cpp
+++ b/ydb/core/test_tablet/load_actor_read_validate.cpp
@@ -17,7 +17,7 @@ namespace NKikimr::NTestShard {
std::unordered_map<TString, TKeyInfo> KeysBefore;
std::unordered_map<TString, TKeyInfo> Keys;
std::deque<TKey*> TransitionInFlight;
- std::unordered_map<ui64, std::tuple<TString, ui32, ui32>> QueriesInFlight;
+ std::unordered_map<ui64, TString> QueriesInFlight;
ui64 LastCookie = 0;
std::deque<TString> KeysPending;
@@ -114,7 +114,7 @@ namespace NKikimr::NTestShard {
Send(TabletActorId, ev.release());
}
- std::tuple<TString, ui32, ui32> PopQueryByCookie(ui64 cookie) {
+ TString PopQueryByCookie(ui64 cookie) {
auto node = QueriesInFlight.extract(cookie);
Y_VERIFY(node);
return node.mapped();
@@ -198,7 +198,7 @@ namespace NKikimr::NTestShard {
};
void ProcessReadResult(ui64 cookie, const TString& message, EReadOutcome outcome, const TString& value) {
- const auto [key, offset, size] = PopQueryByCookie(cookie);
+ const TString& key = PopQueryByCookie(cookie);
if (outcome == EReadOutcome::IMMEDIATE_RETRY) {
STLOG(PRI_ERROR, TEST_SHARD, TS23, "read immediate retry", (TabletId, TabletId), (Message, message));
@@ -222,9 +222,8 @@ namespace NKikimr::NTestShard {
StringSplitter(key).Split(',').CollectInto(&len, &seed, &id);
TString data = FastGenDataForLZ4(len, seed);
- Y_VERIFY_S(offset < len && size <= len - offset && value.size() == size && data.substr(offset, size) == value,
- "TabletId# " << TabletId << " Key# " << key << " value mismatch"
- << " value.size# " << value.size() << " offset# " << offset << " size# " << size);
+ Y_VERIFY_S(value == data, "TabletId# " << TabletId << " Key# " << key << " value mismatch"
+ << " value.size# " << value.size());
STLOG(PRI_DEBUG, TEST_SHARD, TS25, "read key", (TabletId, TabletId), (Key, key));
}
@@ -270,17 +269,8 @@ namespace NKikimr::NTestShard {
}
void IssueRead(const TString& key) {
- ui64 len, seed, id;
- StringSplitter(key).Split(',').CollectInto(&len, &seed, &id);
-
- std::optional<ui32> offset, size;
- if (RandomNumber(2u)) {
- offset = RandomNumber(len);
- size = 1 + RandomNumber(len - *offset);
- }
-
const ui64 cookie = ++LastCookie;
- const bool inserted = QueriesInFlight.try_emplace(cookie, key, offset.value_or(0), size.value_or(len)).second;
+ const bool inserted = QueriesInFlight.try_emplace(cookie, key).second;
Y_VERIFY(inserted);
std::unique_ptr<IEventBase> ev;
@@ -292,12 +282,6 @@ namespace NKikimr::NTestShard {
request->Record.SetCookie(cookie);
auto *cmdRead = request->Record.AddCmdRead();
cmdRead->SetKey(key);
- if (offset) {
- cmdRead->SetOffset(*offset);
- }
- if (size) {
- cmdRead->SetSize(*size);
- }
++WaitedReadsViaEvResponse;
ev = std::move(request);
} else {
@@ -305,12 +289,6 @@ namespace NKikimr::NTestShard {
request->Record.set_tablet_id(TabletId);
request->Record.set_cookie(cookie);
request->Record.set_key(key);
- if (offset) {
- request->Record.set_offset(*offset);
- }
- if (size) {
- request->Record.set_size(*size);
- }
++WaitedReadsViaEvReadResponse;
ev = std::move(request);
}
@@ -649,4 +627,71 @@ namespace NKikimr::NTestShard {
}
}
+ bool TLoadActor::IssueRead() {
+ std::vector<TString> options;
+ for (const auto& [key, info] : Keys) {
+ if (info.ConfirmedState == info.PendingState && info.ConfirmedState == ::NTestShard::TStateServer::CONFIRMED) {
+ options.push_back(key);
+ }
+ }
+ if (options.empty()) {
+ return false;
+ }
+ const size_t index = RandomNumber(options.size());
+ const TString& key = options[index];
+ ui64 len, seed, id;
+ StringSplitter(key).Split(',').CollectInto(&len, &seed, &id);
+ const ui32 offset = RandomNumber(len);
+ const ui32 size = 1 + RandomNumber(len - offset);
+
+ auto request = CreateRequest();
+ auto *cmdRead = request->Record.AddCmdRead();
+ cmdRead->SetKey(key);
+ cmdRead->SetOffset(offset);
+ cmdRead->SetSize(size);
+
+ STLOG(PRI_INFO, TEST_SHARD, TS16, "reading key", (TabletId, TabletId), (Key, key), (Offset, offset), (Size, size));
+
+ ReadsInFlight.try_emplace(request->Record.GetCookie(), key, offset, size, TActivationContext::Monotonic());
+ ++KeysBeingRead[key];
+ Send(TabletActorId, request.release());
+
+ return true;
+ }
+
+ void TLoadActor::ProcessReadResult(ui64 cookie, const NProtoBuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TReadResult>& results) {
+ for (const auto& result : results) {
+ auto node = ReadsInFlight.extract(cookie);
+ Y_VERIFY(node);
+ auto& [key, offset, size, timestamp] = node.mapped();
+ const TMonotonic now = TActivationContext::Monotonic();
+
+ auto it = KeysBeingRead.find(key);
+ Y_VERIFY(it != KeysBeingRead.end() && it->second);
+ if (!--it->second) {
+ KeysBeingRead.erase(key);
+ }
+
+ ui64 len, seed, id;
+ StringSplitter(key).Split(',').CollectInto(&len, &seed, &id);
+ TString data = FastGenDataForLZ4(len, seed);
+
+ STLOG(PRI_INFO, TEST_SHARD, TS18, "read key", (TabletId, TabletId), (Key, key), (Offset, offset), (Size, size),
+ (Status, NKikimrProto::EReplyStatus_Name(result.GetStatus())));
+
+ Y_VERIFY(result.GetStatus() == NKikimrProto::OK || result.GetStatus() == NKikimrProto::ERROR);
+
+ if (result.GetStatus() == NKikimrProto::OK) {
+ const TString value = result.GetValue();
+
+ Y_VERIFY_S(offset < len && size <= len - offset && value.size() == size && data.substr(offset, size) == value,
+ "TabletId# " << TabletId << " Key# " << key << " value mismatch"
+ << " value.size# " << value.size() << " offset# " << offset << " size# " << size);
+
+ ReadLatency.Add(now - timestamp);
+ ReadSpeed.Add(TActivationContext::Now(), size);
+ }
+ }
+ }
+
} // NKikimr::NTestShard