diff options
author | alexvru <alexvru@ydb.tech> | 2022-12-19 11:49:10 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-12-19 11:49:10 +0300 |
commit | ee24e6c1f52ac838f4acd7773fcac520dec6267b (patch) | |
tree | 54c28eb81e1d34a72e8ec6f7acf07fa4b7121ce0 | |
parent | cd7b43afadb45daa054bd3882511703e7200ca57 (diff) | |
download | ydb-ee24e6c1f52ac838f4acd7773fcac520dec6267b.tar.gz |
Some TestShard improvements
-rw-r--r-- | ydb/core/test_tablet/load_actor_read_validate.cpp | 198 |
1 files changed, 93 insertions, 105 deletions
diff --git a/ydb/core/test_tablet/load_actor_read_validate.cpp b/ydb/core/test_tablet/load_actor_read_validate.cpp index 70cf5f97d4c..aeb50277d64 100644 --- a/ydb/core/test_tablet/load_actor_read_validate.cpp +++ b/ydb/core/test_tablet/load_actor_read_validate.cpp @@ -83,81 +83,59 @@ namespace NKikimr::NTestShard { } } - void SendReadRangeViaEvRequest() { - auto request = std::make_unique<TEvKeyValue::TEvRequest>(); - auto& record = request->Record; - record.SetTabletId(TabletId); - record.SetCookie(0); - auto *read = record.AddCmdReadRange(); - auto *r = read->MutableRange(); - if (LastKey) { - r->SetFrom(*LastKey); - r->SetIncludeFrom(false); - } - Send(TabletActorId, request.release()); - } - - void SendReadRangeViaEvReadRange() { - auto request = std::make_unique<TEvKeyValue::TEvReadRange>(); - auto& record = request->Record; - record.set_tablet_id(TabletId); - record.set_cookie(0); - auto *range = record.mutable_range(); - range->set_from_key_exclusive(*LastKey); - Send(TabletActorId, request.release()); - } - void IssueNextReadRangeQuery() { + std::unique_ptr<IEventBase> ev; + IssueReadRangeMode = !IssueReadRangeMode; if (IssueReadRangeMode) { - WaitedReadRangesViaEvResponse++; - SendReadRangeViaEvRequest(); + auto request = std::make_unique<TEvKeyValue::TEvRequest>(); + request->Record.SetTabletId(TabletId); + request->Record.SetCookie(0); + auto *read = request->Record.AddCmdReadRange(); + auto *r = read->MutableRange(); + if (LastKey) { + r->SetFrom(*LastKey); + r->SetIncludeFrom(false); + } + ++WaitedReadRangesViaEvResponse; + ev = std::move(request); } else { - WaitedReadRangesViaEvReadRangeResponse++; - SendReadRangeViaEvReadRange(); + auto request = std::make_unique<TEvKeyValue::TEvReadRange>(); + request->Record.set_tablet_id(TabletId); + auto *range = request->Record.mutable_range(); + if (LastKey) { + range->set_from_key_exclusive(*LastKey); + } + ++WaitedReadRangesViaEvReadRangeResponse; + ev = std::move(request); } + + Send(TabletActorId, ev.release()); } TString PopQueryByCookie(ui64 cookie) { - auto it = QueriesInFlight.find(cookie); - Y_VERIFY(it != QueriesInFlight.end()); - TString key = std::move(it->second); - QueriesInFlight.erase(it); - return key; + auto node = QueriesInFlight.extract(cookie); + Y_VERIFY(node); + return node.mapped(); } void Handle(TEvKeyValue::TEvResponse::TPtr ev) { auto& r = ev->Get()->Record; if (r.GetCookie()) { WaitedReadsViaEvResponse--; - TString key = PopQueryByCookie(r.GetCookie()); if (r.GetStatus() != NMsgBusProxy::MSTATUS_OK) { - STLOG(PRI_ERROR, TEST_SHARD, TS18, "CmdRead failed", (TabletId, TabletId), (Status, r.GetStatus()), - (ErrorReason, r.GetErrorReason())); - return IssueRead(key); + return ProcessReadResult(r.GetCookie(), TStringBuilder() << "Status# " << r.GetStatus() + << " ErrorReason# " << r.GetErrorReason(), EReadOutcome::IMMEDIATE_RETRY, {}); } Y_VERIFY(r.ReadResultSize() == 1); const auto& res = r.GetReadResult(0); const auto status = static_cast<NKikimrProto::EReplyStatus>(res.GetStatus()); - if (status == NKikimrProto::ERROR && RetryCount < 10) { - const bool inserted = KeyReadsWaitingForRetry.insert(key).second; - Y_VERIFY(inserted); - STLOG(PRI_ERROR, TEST_SHARD, TS22, "read key failed -- going to retry", (TabletId, TabletId), (Key, key)); - return; - } - - Y_VERIFY_S(status == NKikimrProto::OK, "Status# " << NKikimrProto::EReplyStatus_Name(status) << " Message# " - << res.GetMessage()); - - const TString& value = res.GetValue(); - const bool inserted = Keys.try_emplace(key, value.size()).second; - Y_VERIFY(inserted); - Y_VERIFY_S(HashForValue(value) == key, "TabletId# " << TabletId << " Key# " << key << " digest mismatch" - " actual# " << HashForValue(value) << " len# " << value.size()); - STLOG(PRI_DEBUG, TEST_SHARD, TS16, "read key", (TabletId, TabletId), (Key, key)); + ProcessReadResult(r.GetCookie(), TStringBuilder() << "Status# " << NKikimrProto::EReplyStatus_Name(status) + << " Message# " << res.GetMessage(), status == NKikimrProto::OK ? EReadOutcome::OK : + status == NKikimrProto::ERROR ? EReadOutcome::RETRY : EReadOutcome::ERROR, res.GetValue()); } else { WaitedReadRangesViaEvResponse--; if (r.GetStatus() != NMsgBusProxy::MSTATUS_OK) { @@ -187,38 +165,53 @@ namespace NKikimr::NTestShard { } void Handle(TEvKeyValue::TEvReadResponse::TPtr ev) { - auto& record = ev->Get()->Record; WaitedReadsViaEvReadResponse--; - const TString key = PopQueryByCookie(record.cookie()); + auto& record = ev->Get()->Record; const NKikimrKeyValue::Statuses::ReplyStatus status = record.status(); + const TString message = TStringBuilder() + << "Status# " << NKikimrKeyValue::Statuses::ReplyStatus_Name(status) + << (record.msg() ? TStringBuilder() << " Message# " << record.msg() : TString()); + const EReadOutcome outcome = + status == NKikimrKeyValue::Statuses::RSTATUS_TIMEOUT ? EReadOutcome::IMMEDIATE_RETRY : + status == NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR ? EReadOutcome::RETRY : + status == NKikimrKeyValue::Statuses::RSTATUS_OK ? EReadOutcome::OK : + EReadOutcome::ERROR; - if (status == NKikimrKeyValue::Statuses::RSTATUS_TIMEOUT) { - STLOG(PRI_ERROR, TEST_SHARD, TS23, "CmdRead failed", (TabletId, TabletId), (Status, status), - (ErrorReason, record.msg())); + ProcessReadResult(record.cookie(), message, outcome, record.value()); + + FinishIfPossible(); + } + + enum class EReadOutcome { + IMMEDIATE_RETRY, + RETRY, + OK, + ERROR, + }; + + void ProcessReadResult(ui64 cookie, const TString& message, EReadOutcome outcome, const TString& value) { + const TString key = PopQueryByCookie(cookie); + + if (outcome == EReadOutcome::IMMEDIATE_RETRY) { + STLOG(PRI_ERROR, TEST_SHARD, TS23, "read immediate retry", (TabletId, TabletId), (Message, message)); return IssueRead(key); } - if (status == NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR && RetryCount < 10) { + if (outcome == EReadOutcome::RETRY && RetryCount < 10) { const bool inserted = KeyReadsWaitingForRetry.insert(key).second; Y_VERIFY(inserted); STLOG(PRI_ERROR, TEST_SHARD, TS24, "read key failed -- going to retry", (TabletId, TabletId), - (Key, key), (ErrorReason, record.msg())); - return; - } - - Y_VERIFY_S(status == NKikimrKeyValue::Statuses::RSTATUS_OK, - "Status# " << NKikimrKeyValue::Statuses_ReplyStatus_Name(status) - << " Cookie# " << record.cookie() - << " Message# " << record.msg()); + (Key, key), (Message, message)); + } else { + Y_VERIFY_S(outcome == EReadOutcome::OK, "Message# " << message << " Key# " << key); - const TString& value = record.value(); - const bool inserted = Keys.try_emplace(key, value.size()).second; - Y_VERIFY(inserted); - Y_VERIFY_S(HashForValue(value) == key, "TabletId# " << TabletId << " Key# " << key << " digest mismatch" - " actual# " << HashForValue(value) << " len# " << value.size()); - STLOG(PRI_DEBUG, TEST_SHARD, TS25, "read key", (TabletId, TabletId), (Key, key)); - FinishIfPossible(); + const bool inserted = Keys.try_emplace(key, value.size()).second; + Y_VERIFY(inserted); + Y_VERIFY_S(HashForValue(value) == key, "TabletId# " << TabletId << " Key# " << key << " digest mismatch" + " actual# " << HashForValue(value) << " len# " << value.size()); + STLOG(PRI_DEBUG, TEST_SHARD, TS25, "read key", (TabletId, TabletId), (Key, key)); + } } void Handle(TEvKeyValue::TEvReadRangeResponse::TPtr ev) { @@ -251,44 +244,31 @@ namespace NKikimr::NTestShard { FinishIfPossible(); } - ui64 SendReadViaEvRequest(const TString& key) { - auto request = std::make_unique<TEvKeyValue::TEvRequest>(); - auto& record = request->Record; - record.SetTabletId(TabletId); + void IssueRead(const TString& key) { const ui64 cookie = ++LastCookie; - record.SetCookie(cookie); - auto *read = record.AddCmdRead(); - read->SetKey(key); - Send(TabletActorId, request.release()); - return cookie; - } + const bool inserted = QueriesInFlight.emplace(cookie, key).second; + Y_VERIFY(inserted); - ui64 SendReadViaEvReadRequest(const TString& key) { - auto request = std::make_unique<TEvKeyValue::TEvRead>(); - auto& record = request->Record; - record.set_tablet_id(TabletId); - const ui64 cookie = ++LastCookie; - record.set_cookie(cookie); - record.set_key(key); - Send(TabletActorId, request.release()); - return cookie; - } + std::unique_ptr<IEventBase> ev; - ui64 SendRead(const TString& key) { IssueReadMode = !IssueReadMode; if (IssueReadMode) { - WaitedReadsViaEvResponse++; - return SendReadViaEvRequest(key); + auto request = std::make_unique<TEvKeyValue::TEvRequest>(); + request->Record.SetTabletId(TabletId); + request->Record.SetCookie(cookie); + request->Record.AddCmdRead()->SetKey(key); + ++WaitedReadsViaEvResponse; + ev = std::move(request); } else { - WaitedReadsViaEvReadResponse++; - return SendReadViaEvReadRequest(key); + auto request = std::make_unique<TEvKeyValue::TEvRead>(); + request->Record.set_tablet_id(TabletId); + request->Record.set_cookie(cookie); + request->Record.set_key(key); + ++WaitedReadsViaEvReadResponse; + ev = std::move(request); } - } - void IssueRead(const TString& key) { - const ui64 cookie = SendRead(key); - const bool inserted = QueriesInFlight.emplace(cookie, key).second; - Y_VERIFY(inserted); + Send(TabletActorId, ev.release()); } void IssueNextStateServerQuery() { @@ -338,9 +318,17 @@ namespace NKikimr::NTestShard { void FinishIfPossible() { if (StateReadComplete && KeyValueReadComplete && QueriesInFlight.empty() && !SendRetriesPending) { + Y_VERIFY_S(WaitedReadRangesViaEvResponse + WaitedReadRangesViaEvReadRangeResponse + + WaitedReadsViaEvResponse + WaitedReadsViaEvReadResponse == 0, + "WaitedReadRangesViaEvResponse# " << WaitedReadRangesViaEvResponse + << " WaitedReadsViaEvResponse# " << WaitedReadsViaEvResponse + << " WaitedReadsViaEvReadResponse# " << WaitedReadsViaEvReadResponse + << " WaitedReadRangesViaEvReadRangeResponse# " << WaitedReadRangesViaEvReadRangeResponse); + if (!KeyReadsWaitingForRetry.empty()) { SendRetriesPending = true; - TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(EvRetryKeyReads, 0, SelfId(), {}, nullptr, 0)); + TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(EvRetryKeyReads, 0, SelfId(), + {}, nullptr, 0)); return; } if (!StateValidated) { |