aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-12-19 11:49:10 +0300
committeralexvru <alexvru@ydb.tech>2022-12-19 11:49:10 +0300
commitee24e6c1f52ac838f4acd7773fcac520dec6267b (patch)
tree54c28eb81e1d34a72e8ec6f7acf07fa4b7121ce0
parentcd7b43afadb45daa054bd3882511703e7200ca57 (diff)
downloadydb-ee24e6c1f52ac838f4acd7773fcac520dec6267b.tar.gz
Some TestShard improvements
-rw-r--r--ydb/core/test_tablet/load_actor_read_validate.cpp198
1 files changed, 93 insertions, 105 deletions
diff --git a/ydb/core/test_tablet/load_actor_read_validate.cpp b/ydb/core/test_tablet/load_actor_read_validate.cpp
index 70cf5f97d4c..aeb50277d64 100644
--- a/ydb/core/test_tablet/load_actor_read_validate.cpp
+++ b/ydb/core/test_tablet/load_actor_read_validate.cpp
@@ -83,81 +83,59 @@ namespace NKikimr::NTestShard {
}
}
- void SendReadRangeViaEvRequest() {
- auto request = std::make_unique<TEvKeyValue::TEvRequest>();
- auto& record = request->Record;
- record.SetTabletId(TabletId);
- record.SetCookie(0);
- auto *read = record.AddCmdReadRange();
- auto *r = read->MutableRange();
- if (LastKey) {
- r->SetFrom(*LastKey);
- r->SetIncludeFrom(false);
- }
- Send(TabletActorId, request.release());
- }
-
- void SendReadRangeViaEvReadRange() {
- auto request = std::make_unique<TEvKeyValue::TEvReadRange>();
- auto& record = request->Record;
- record.set_tablet_id(TabletId);
- record.set_cookie(0);
- auto *range = record.mutable_range();
- range->set_from_key_exclusive(*LastKey);
- Send(TabletActorId, request.release());
- }
-
void IssueNextReadRangeQuery() {
+ std::unique_ptr<IEventBase> ev;
+
IssueReadRangeMode = !IssueReadRangeMode;
if (IssueReadRangeMode) {
- WaitedReadRangesViaEvResponse++;
- SendReadRangeViaEvRequest();
+ auto request = std::make_unique<TEvKeyValue::TEvRequest>();
+ request->Record.SetTabletId(TabletId);
+ request->Record.SetCookie(0);
+ auto *read = request->Record.AddCmdReadRange();
+ auto *r = read->MutableRange();
+ if (LastKey) {
+ r->SetFrom(*LastKey);
+ r->SetIncludeFrom(false);
+ }
+ ++WaitedReadRangesViaEvResponse;
+ ev = std::move(request);
} else {
- WaitedReadRangesViaEvReadRangeResponse++;
- SendReadRangeViaEvReadRange();
+ auto request = std::make_unique<TEvKeyValue::TEvReadRange>();
+ request->Record.set_tablet_id(TabletId);
+ auto *range = request->Record.mutable_range();
+ if (LastKey) {
+ range->set_from_key_exclusive(*LastKey);
+ }
+ ++WaitedReadRangesViaEvReadRangeResponse;
+ ev = std::move(request);
}
+
+ Send(TabletActorId, ev.release());
}
TString PopQueryByCookie(ui64 cookie) {
- auto it = QueriesInFlight.find(cookie);
- Y_VERIFY(it != QueriesInFlight.end());
- TString key = std::move(it->second);
- QueriesInFlight.erase(it);
- return key;
+ auto node = QueriesInFlight.extract(cookie);
+ Y_VERIFY(node);
+ return node.mapped();
}
void Handle(TEvKeyValue::TEvResponse::TPtr ev) {
auto& r = ev->Get()->Record;
if (r.GetCookie()) {
WaitedReadsViaEvResponse--;
- TString key = PopQueryByCookie(r.GetCookie());
if (r.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
- STLOG(PRI_ERROR, TEST_SHARD, TS18, "CmdRead failed", (TabletId, TabletId), (Status, r.GetStatus()),
- (ErrorReason, r.GetErrorReason()));
- return IssueRead(key);
+ return ProcessReadResult(r.GetCookie(), TStringBuilder() << "Status# " << r.GetStatus()
+ << " ErrorReason# " << r.GetErrorReason(), EReadOutcome::IMMEDIATE_RETRY, {});
}
Y_VERIFY(r.ReadResultSize() == 1);
const auto& res = r.GetReadResult(0);
const auto status = static_cast<NKikimrProto::EReplyStatus>(res.GetStatus());
- if (status == NKikimrProto::ERROR && RetryCount < 10) {
- const bool inserted = KeyReadsWaitingForRetry.insert(key).second;
- Y_VERIFY(inserted);
- STLOG(PRI_ERROR, TEST_SHARD, TS22, "read key failed -- going to retry", (TabletId, TabletId), (Key, key));
- return;
- }
-
- Y_VERIFY_S(status == NKikimrProto::OK, "Status# " << NKikimrProto::EReplyStatus_Name(status) << " Message# "
- << res.GetMessage());
-
- const TString& value = res.GetValue();
- const bool inserted = Keys.try_emplace(key, value.size()).second;
- Y_VERIFY(inserted);
- Y_VERIFY_S(HashForValue(value) == key, "TabletId# " << TabletId << " Key# " << key << " digest mismatch"
- " actual# " << HashForValue(value) << " len# " << value.size());
- STLOG(PRI_DEBUG, TEST_SHARD, TS16, "read key", (TabletId, TabletId), (Key, key));
+ ProcessReadResult(r.GetCookie(), TStringBuilder() << "Status# " << NKikimrProto::EReplyStatus_Name(status)
+ << " Message# " << res.GetMessage(), status == NKikimrProto::OK ? EReadOutcome::OK :
+ status == NKikimrProto::ERROR ? EReadOutcome::RETRY : EReadOutcome::ERROR, res.GetValue());
} else {
WaitedReadRangesViaEvResponse--;
if (r.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
@@ -187,38 +165,53 @@ namespace NKikimr::NTestShard {
}
void Handle(TEvKeyValue::TEvReadResponse::TPtr ev) {
- auto& record = ev->Get()->Record;
WaitedReadsViaEvReadResponse--;
- const TString key = PopQueryByCookie(record.cookie());
+ auto& record = ev->Get()->Record;
const NKikimrKeyValue::Statuses::ReplyStatus status = record.status();
+ const TString message = TStringBuilder()
+ << "Status# " << NKikimrKeyValue::Statuses::ReplyStatus_Name(status)
+ << (record.msg() ? TStringBuilder() << " Message# " << record.msg() : TString());
+ const EReadOutcome outcome =
+ status == NKikimrKeyValue::Statuses::RSTATUS_TIMEOUT ? EReadOutcome::IMMEDIATE_RETRY :
+ status == NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR ? EReadOutcome::RETRY :
+ status == NKikimrKeyValue::Statuses::RSTATUS_OK ? EReadOutcome::OK :
+ EReadOutcome::ERROR;
- if (status == NKikimrKeyValue::Statuses::RSTATUS_TIMEOUT) {
- STLOG(PRI_ERROR, TEST_SHARD, TS23, "CmdRead failed", (TabletId, TabletId), (Status, status),
- (ErrorReason, record.msg()));
+ ProcessReadResult(record.cookie(), message, outcome, record.value());
+
+ FinishIfPossible();
+ }
+
+ enum class EReadOutcome {
+ IMMEDIATE_RETRY,
+ RETRY,
+ OK,
+ ERROR,
+ };
+
+ void ProcessReadResult(ui64 cookie, const TString& message, EReadOutcome outcome, const TString& value) {
+ const TString key = PopQueryByCookie(cookie);
+
+ if (outcome == EReadOutcome::IMMEDIATE_RETRY) {
+ STLOG(PRI_ERROR, TEST_SHARD, TS23, "read immediate retry", (TabletId, TabletId), (Message, message));
return IssueRead(key);
}
- if (status == NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR && RetryCount < 10) {
+ if (outcome == EReadOutcome::RETRY && RetryCount < 10) {
const bool inserted = KeyReadsWaitingForRetry.insert(key).second;
Y_VERIFY(inserted);
STLOG(PRI_ERROR, TEST_SHARD, TS24, "read key failed -- going to retry", (TabletId, TabletId),
- (Key, key), (ErrorReason, record.msg()));
- return;
- }
-
- Y_VERIFY_S(status == NKikimrKeyValue::Statuses::RSTATUS_OK,
- "Status# " << NKikimrKeyValue::Statuses_ReplyStatus_Name(status)
- << " Cookie# " << record.cookie()
- << " Message# " << record.msg());
+ (Key, key), (Message, message));
+ } else {
+ Y_VERIFY_S(outcome == EReadOutcome::OK, "Message# " << message << " Key# " << key);
- const TString& value = record.value();
- const bool inserted = Keys.try_emplace(key, value.size()).second;
- Y_VERIFY(inserted);
- Y_VERIFY_S(HashForValue(value) == key, "TabletId# " << TabletId << " Key# " << key << " digest mismatch"
- " actual# " << HashForValue(value) << " len# " << value.size());
- STLOG(PRI_DEBUG, TEST_SHARD, TS25, "read key", (TabletId, TabletId), (Key, key));
- FinishIfPossible();
+ const bool inserted = Keys.try_emplace(key, value.size()).second;
+ Y_VERIFY(inserted);
+ Y_VERIFY_S(HashForValue(value) == key, "TabletId# " << TabletId << " Key# " << key << " digest mismatch"
+ " actual# " << HashForValue(value) << " len# " << value.size());
+ STLOG(PRI_DEBUG, TEST_SHARD, TS25, "read key", (TabletId, TabletId), (Key, key));
+ }
}
void Handle(TEvKeyValue::TEvReadRangeResponse::TPtr ev) {
@@ -251,44 +244,31 @@ namespace NKikimr::NTestShard {
FinishIfPossible();
}
- ui64 SendReadViaEvRequest(const TString& key) {
- auto request = std::make_unique<TEvKeyValue::TEvRequest>();
- auto& record = request->Record;
- record.SetTabletId(TabletId);
+ void IssueRead(const TString& key) {
const ui64 cookie = ++LastCookie;
- record.SetCookie(cookie);
- auto *read = record.AddCmdRead();
- read->SetKey(key);
- Send(TabletActorId, request.release());
- return cookie;
- }
+ const bool inserted = QueriesInFlight.emplace(cookie, key).second;
+ Y_VERIFY(inserted);
- ui64 SendReadViaEvReadRequest(const TString& key) {
- auto request = std::make_unique<TEvKeyValue::TEvRead>();
- auto& record = request->Record;
- record.set_tablet_id(TabletId);
- const ui64 cookie = ++LastCookie;
- record.set_cookie(cookie);
- record.set_key(key);
- Send(TabletActorId, request.release());
- return cookie;
- }
+ std::unique_ptr<IEventBase> ev;
- ui64 SendRead(const TString& key) {
IssueReadMode = !IssueReadMode;
if (IssueReadMode) {
- WaitedReadsViaEvResponse++;
- return SendReadViaEvRequest(key);
+ auto request = std::make_unique<TEvKeyValue::TEvRequest>();
+ request->Record.SetTabletId(TabletId);
+ request->Record.SetCookie(cookie);
+ request->Record.AddCmdRead()->SetKey(key);
+ ++WaitedReadsViaEvResponse;
+ ev = std::move(request);
} else {
- WaitedReadsViaEvReadResponse++;
- return SendReadViaEvReadRequest(key);
+ auto request = std::make_unique<TEvKeyValue::TEvRead>();
+ request->Record.set_tablet_id(TabletId);
+ request->Record.set_cookie(cookie);
+ request->Record.set_key(key);
+ ++WaitedReadsViaEvReadResponse;
+ ev = std::move(request);
}
- }
- void IssueRead(const TString& key) {
- const ui64 cookie = SendRead(key);
- const bool inserted = QueriesInFlight.emplace(cookie, key).second;
- Y_VERIFY(inserted);
+ Send(TabletActorId, ev.release());
}
void IssueNextStateServerQuery() {
@@ -338,9 +318,17 @@ namespace NKikimr::NTestShard {
void FinishIfPossible() {
if (StateReadComplete && KeyValueReadComplete && QueriesInFlight.empty() && !SendRetriesPending) {
+ Y_VERIFY_S(WaitedReadRangesViaEvResponse + WaitedReadRangesViaEvReadRangeResponse +
+ WaitedReadsViaEvResponse + WaitedReadsViaEvReadResponse == 0,
+ "WaitedReadRangesViaEvResponse# " << WaitedReadRangesViaEvResponse
+ << " WaitedReadsViaEvResponse# " << WaitedReadsViaEvResponse
+ << " WaitedReadsViaEvReadResponse# " << WaitedReadsViaEvReadResponse
+ << " WaitedReadRangesViaEvReadRangeResponse# " << WaitedReadRangesViaEvReadRangeResponse);
+
if (!KeyReadsWaitingForRetry.empty()) {
SendRetriesPending = true;
- TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(EvRetryKeyReads, 0, SelfId(), {}, nullptr, 0));
+ TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(EvRetryKeyReads, 0, SelfId(),
+ {}, nullptr, 0));
return;
}
if (!StateValidated) {