aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryuryalekseev <yuryalekseev@yandex-team.com>2022-09-26 13:59:43 +0300
committeryuryalekseev <yuryalekseev@yandex-team.com>2022-09-26 13:59:43 +0300
commitc843de2f4eff352da4ae4d8b7639cd5de9abac05 (patch)
treee8d3263d6c7b0f0cf133bd950b8d80fa398ea4bc
parent5f7cc7de224fe53263e213b0d2534955c8e64ec7 (diff)
downloadydb-c843de2f4eff352da4ae4d8b7639cd5de9abac05.tar.gz
Add test for BLOCKED TEvGet with kv tablet.
-rw-r--r--ydb/core/keyvalue/keyvalue_storage_read_request.cpp26
-rw-r--r--ydb/core/keyvalue/keyvalue_ut.cpp49
2 files changed, 74 insertions, 1 deletions
diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
index 9b7464bf83..cae0568ba6 100644
--- a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
+++ b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
@@ -243,6 +243,22 @@ public:
return;
}
+ if (result->Status == NKikimrProto::BLOCKED) {
+ STLOG_WITH_ERROR_DESCRIPTION(ErrorDescription, NLog::PRI_ERROR, NKikimrServices::KEYVALUE, KV323,
+ "Received BLOCKED EvGetResult.",
+ (KeyValue, TabletInfo->TabletID),
+ (Status, result->Status),
+ (Deadline, IntermediateResult->Deadline.MilliSeconds()),
+ (Now, TActivationContext::Now().MilliSeconds()),
+ (SentAt, batch.SentTime),
+ (GotAt, IntermediateResult->Stat.IntermediateCreatedAt.MilliSeconds()),
+ (ErrorReason, result->ErrorReason));
+ // kill the key value tablet due to it's obsolete generation
+ Send(IntermediateResult->KeyValueActorId, new TKikimrEvents::TEvPoisonPill);
+ ReplyErrorAndPassAway(NKikimrKeyValue::Statuses::RSTATUS_ERROR);
+ return;
+ }
+
if (result->Status != NKikimrProto::OK) {
STLOG_WITH_ERROR_DESCRIPTION(ErrorDescription, NLog::PRI_ERROR, NKikimrServices::KEYVALUE, KV316,
"Unexpected EvGetResult.",
@@ -332,6 +348,7 @@ public:
if (IntermediateResult->HasCookie) {
response->Record.set_cookie(IntermediateResult->Cookie);
}
+
return response;
}
@@ -348,7 +365,14 @@ public:
std::unique_ptr<IEventBase> MakeErrorResponse(NKikimrKeyValue::Statuses::ReplyStatus status) {
if (IsRead()) {
- return CreateReadResponse(status, ErrorDescription);
+ auto response = CreateReadResponse(status, ErrorDescription);
+ auto &cmd = GetCommand();
+ Y_VERIFY(std::holds_alternative<TIntermediate::TRead>(cmd));
+ auto& intermediateRead = std::get<TIntermediate::TRead>(cmd);
+ response->Record.set_requested_key(intermediateRead.Key);
+ response->Record.set_requested_offset(intermediateRead.Offset);
+ response->Record.set_requested_size(intermediateRead.RequestedSize);
+ return response;
} else {
return CreateReadRangeResponse(status, ErrorDescription);
}
diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp
index 96b9347b7f..64291abb68 100644
--- a/ydb/core/keyvalue/keyvalue_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_ut.cpp
@@ -6,6 +6,7 @@
#include <ydb/core/testlib/tablet_helpers.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/random/fast.h>
+#include <ydb/core/base/blobstorage.h>
const bool ENABLE_DETAILED_KV_LOG = false;
const bool ENABLE_TESTLOG_OUTPUT = false;
@@ -1003,6 +1004,54 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEvents) {
}
+Y_UNIT_TEST(TestBlockedEvGetRequest) {
+ std::optional<TActorId> tabletActor;
+ std::optional<TActorId> dsProxyActor;
+ std::optional<ui64> keyValueTabletId;
+ std::optional<ui32> keyValueTabletGeneration;
+ auto setup = [&] (TTestActorRuntime &runtime) {
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
+ if (event->GetTypeRewrite() == TEvBlobStorage::TEvGet::EventType) {
+ if (tabletActor && *tabletActor == event->Sender) {
+ // key value tablet reads from dsproxy
+ dsProxyActor = event->Recipient;
+ }
+ keyValueTabletId = event->Get<TEvBlobStorage::TEvGet>()->ReaderTabletId;
+ keyValueTabletGeneration = event->Get<TEvBlobStorage::TEvGet>()->ReaderTabletGeneration;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ runtime.SetRegistrationObserverFunc([&](TTestActorRuntimeBase&, const TActorId& /*parentId*/, const TActorId& actorId) {
+ if (TypeName(*runtime.FindActor(actorId)) == "NKikimr::NKeyValue::TKeyValueStorageReadRequest") {
+ tabletActor = actorId;
+ }
+ });
+ };
+ TTestContext tc;
+ TFinalizer finalizer(tc);
+ bool activeZone = false;
+ tc.Prepare(INITIAL_TEST_DISPATCH_NAME, setup, activeZone);
+ ExecuteWrite(tc, {{"key", "value"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ ExecuteRead(tc, "key", "value", 0, 0, 0);
+
+ // block current generation of the key value tablet
+ UNIT_ASSERT(tabletActor);
+ UNIT_ASSERT(dsProxyActor);
+ UNIT_ASSERT(keyValueTabletId);
+ UNIT_ASSERT(keyValueTabletGeneration);
+ auto generation = *keyValueTabletGeneration;
+ auto ev = std::make_unique<TEvBlobStorage::TEvBlock>(*keyValueTabletId, generation, TInstant::Max());
+ tc.Runtime->Send(new IEventHandle(*dsProxyActor, *tabletActor, ev.release()));
+
+ // read with the blocked generation should fail and lead to a restart of the key value tablet
+ ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_ERROR>(tc, "key", "", 0, 0, 0);
+ // read data through the newly created key value tablet
+ ExecuteRead(tc, "key", "value", 0, 0, 0);
+ // check that the key value tablet has indeed restarted
+ UNIT_ASSERT(generation < *keyValueTabletGeneration);
+}
+
Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowInitialGC) {
TTestContext tc;
TMaybe<TActorId> tabletActor;