aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryuryalekseev <yuryalekseev@yandex-team.com>2022-09-23 16:01:10 +0300
committeryuryalekseev <yuryalekseev@yandex-team.com>2022-09-23 16:01:10 +0300
commitaa94e3bdb22065ec4106bacdbe1b645481570dea (patch)
treee8d6336b1ef62e2c7d56066e17e99bf4522f890f
parent1cedeebe24143a428fbc39266284b6ab5d1529f4 (diff)
downloadydb-aa94e3bdb22065ec4106bacdbe1b645481570dea.tar.gz
Add Reader* params to TEvGet requests from the key value tablet.
-rw-r--r--ydb/core/keyvalue/keyvalue_state.cpp32
-rw-r--r--ydb/core/keyvalue/keyvalue_storage_read_request.cpp14
-rw-r--r--ydb/core/keyvalue/keyvalue_storage_read_request.h9
-rw-r--r--ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp4
-rw-r--r--ydb/core/keyvalue/keyvalue_storage_request.cpp21
-rw-r--r--ydb/core/keyvalue/keyvalue_storage_request.h9
6 files changed, 53 insertions, 36 deletions
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp
index a203995750c..26431937b2f 100644
--- a/ydb/core/keyvalue/keyvalue_state.cpp
+++ b/ydb/core/keyvalue/keyvalue_state.cpp
@@ -2864,15 +2864,15 @@ bool TKeyValueState::PrepareAcquireLockRequest(const TActorContext &ctx, TEvKeyV
}
void RegisterReadRequestActor(const TActorContext &ctx, THolder<TIntermediate> &&intermediate,
- const TTabletStorageInfo *info)
+ const TTabletStorageInfo *info, ui32 tabletGeneration)
{
- ctx.RegisterWithSameMailbox(CreateKeyValueStorageReadRequest(std::move(intermediate), info));
+ ctx.RegisterWithSameMailbox(CreateKeyValueStorageReadRequest(std::move(intermediate), info, tabletGeneration));
}
void RegisterRequestActor(const TActorContext &ctx, THolder<TIntermediate> &&intermediate,
- const TTabletStorageInfo *info)
+ const TTabletStorageInfo *info, ui32 tabletGeneration)
{
- ctx.RegisterWithSameMailbox(CreateKeyValueStorageRequest(std::move(intermediate), info));
+ ctx.RegisterWithSameMailbox(CreateKeyValueStorageRequest(std::move(intermediate), info, tabletGeneration));
}
void TKeyValueState::ProcessPostponedIntermediate(const TActorContext& ctx, THolder<TIntermediate> &&intermediate,
@@ -2880,10 +2880,10 @@ void TKeyValueState::ProcessPostponedIntermediate(const TActorContext& ctx, THol
{
switch(intermediate->EvType) {
case TEvKeyValue::TEvRequest::EventType:
- return RegisterRequestActor(ctx, std::move(intermediate), info);
+ return RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
case TEvKeyValue::TEvRead::EventType:
case TEvKeyValue::TEvReadRange::EventType:
- return RegisterReadRequestActor(ctx, std::move(intermediate), info);
+ return RegisterReadRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
default:
Y_FAIL_S("Unexpected event type# " << intermediate->EvType);
}
@@ -2905,13 +2905,13 @@ void TKeyValueState::OnEvReadRequest(TEvKeyValue::TEvRead::TPtr &ev, const TActo
if (requestType == TRequestType::ReadOnlyInline) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create storage inline read request, Marker# KV49");
- RegisterReadRequestActor(ctx, std::move(intermediate), info);
+ RegisterReadRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
++RoInlineIntermediatesInFlight;
} else {
if (IntermediatesInFlight < IntermediatesInFlightLimit) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create storage read request, Marker# KV54");
- RegisterReadRequestActor(ctx, std::move(intermediate), info);
+ RegisterReadRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
++IntermediatesInFlight;
} else {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
@@ -2942,13 +2942,13 @@ void TKeyValueState::OnEvReadRangeRequest(TEvKeyValue::TEvReadRange::TPtr &ev, c
if (requestType == TRequestType::ReadOnlyInline) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create storage inline read range request, Marker# KV58");
- RegisterReadRequestActor(ctx, std::move(intermediate), info);
+ RegisterReadRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
++RoInlineIntermediatesInFlight;
} else {
if (IntermediatesInFlight < IntermediatesInFlightLimit) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create storage read range request, Marker# KV66");
- RegisterReadRequestActor(ctx, std::move(intermediate), info);
+ RegisterReadRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
++IntermediatesInFlight;
} else {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
@@ -2979,7 +2979,7 @@ void TKeyValueState::OnEvExecuteTransaction(TEvKeyValue::TEvExecuteTransaction::
++InFlightForStep[StoredState.GetChannelStep()];
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create storage request for WO, Marker# KV67");
- RegisterRequestActor(ctx, std::move(intermediate), info);
+ RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
CountRequestTakeOffOrEnqueue(requestType);
} else {
@@ -3003,7 +3003,7 @@ void TKeyValueState::OnEvGetStorageChannelStatus(TEvKeyValue::TEvGetStorageChann
++InFlightForStep[StoredState.GetChannelStep()];
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create GetStorageChannelStatus request, Marker# KV75");
- RegisterRequestActor(ctx, std::move(intermediate), info);
+ RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
++RoInlineIntermediatesInFlight;
CountRequestTakeOffOrEnqueue(requestType);
} else {
@@ -3027,7 +3027,7 @@ void TKeyValueState::OnEvAcquireLock(TEvKeyValue::TEvAcquireLock::TPtr &ev, cons
++InFlightForStep[StoredState.GetChannelStep()];
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create AcquireLock request, Marker# KV80");
- RegisterRequestActor(ctx, std::move(intermediate), info);
+ RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
++RoInlineIntermediatesInFlight;
CountRequestTakeOffOrEnqueue(requestType);
} else {
@@ -3074,17 +3074,17 @@ void TKeyValueState::OnEvRequest(TEvKeyValue::TEvRequest::TPtr &ev, const TActor
if (requestType == TRequestType::WriteOnly) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create storage request for WO, Marker# KV42");
- RegisterRequestActor(ctx, std::move(intermediate), info);
+ RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
} else if (requestType == TRequestType::ReadOnlyInline) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create storage request for RO_INLINE, Marker# KV45");
- RegisterRequestActor(ctx, std::move(intermediate), info);
+ RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
++RoInlineIntermediatesInFlight;
} else {
if (IntermediatesInFlight < IntermediatesInFlightLimit) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create storage request for RO/RW, Marker# KV43");
- RegisterRequestActor(ctx, std::move(intermediate), info);
+ RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
++IntermediatesInFlight;
} else {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
index d9c2625e6bb..9b7464bf83f 100644
--- a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
+++ b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
@@ -39,6 +39,7 @@ class TKeyValueStorageReadRequest : public TActorBootstrapped<TKeyValueStorageRe
THolder<TIntermediate> IntermediateResult;
const TTabletStorageInfo *TabletInfo;
+ ui32 TabletGeneration;
TStackVec<TGetBatch, 1> Batches;
ui32 ReceivedGetResults = 0;
@@ -187,10 +188,12 @@ public:
readItem.InFlight = true;
}
- std::unique_ptr<TEvBlobStorage::TEvGet> get = std::make_unique<TEvBlobStorage::TEvGet>(
+ auto ev = std::make_unique<TEvBlobStorage::TEvGet>(
readQueries, batch.ReadItemIndecies.size(), IntermediateResult->Deadline, handleClass, false);
+ ev->ReaderTabletId = TabletInfo->TabletID;
+ ev->ReaderTabletGeneration = TabletGeneration;
- SendToBSProxy(TActivationContext::AsActorContext(), batch.GroupId, get.release(),
+ SendToBSProxy(TActivationContext::AsActorContext(), batch.GroupId, ev.release(),
batch.Cookie);
batch.SentTime = TActivationContext::Now();
}
@@ -472,17 +475,18 @@ public:
}
TKeyValueStorageReadRequest(THolder<TIntermediate> &&intermediate,
- const TTabletStorageInfo *tabletInfo)
+ const TTabletStorageInfo *tabletInfo, ui32 tabletGeneration)
: IntermediateResult(std::move(intermediate))
, TabletInfo(tabletInfo)
+ , TabletGeneration(tabletGeneration)
{}
};
IActor* CreateKeyValueStorageReadRequest(THolder<TIntermediate>&& intermediate,
- const TTabletStorageInfo *tabletInfo)
+ const TTabletStorageInfo *tabletInfo, ui32 tabletGeneration)
{
- return new TKeyValueStorageReadRequest(std::move(intermediate), tabletInfo);
+ return new TKeyValueStorageReadRequest(std::move(intermediate), tabletInfo, tabletGeneration);
}
} // NKeyValue
diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request.h b/ydb/core/keyvalue/keyvalue_storage_read_request.h
index 20e01dc5e95..4f2a49bcf48 100644
--- a/ydb/core/keyvalue/keyvalue_storage_read_request.h
+++ b/ydb/core/keyvalue/keyvalue_storage_read_request.h
@@ -2,11 +2,16 @@
#include "defs.h"
#include "keyvalue_events.h"
+#include <util/generic/ptr.h>
+#include <util/system/types.h>
+
namespace NKikimr {
namespace NKeyValue {
-IActor* CreateKeyValueStorageReadRequest(THolder<TIntermediate>&& intermediate,
- const TTabletStorageInfo *tabletInfo);
+IActor* CreateKeyValueStorageReadRequest(
+ THolder<TIntermediate>&& intermediate,
+ const TTabletStorageInfo *tabletInfo,
+ ui32 tabletGeneration);
} // NKeyValue
} // NKikimr
diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp
index 9d4c716e640..5aef2730f06 100644
--- a/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp
@@ -267,7 +267,7 @@ void RunTest(TTestEnv &env, TReadRequestBuilder &builder,
TActorId edgeActor = runtime.AllocateEdgeActor(1);
auto [intermediate, expectedValues] = builder.Build(edgeActor, edgeActor, 1, 1);
- runtime.Register(CreateKeyValueStorageReadRequest(std::move(intermediate), env.TabletInfo.get()), 1);
+ runtime.Register(CreateKeyValueStorageReadRequest(std::move(intermediate), env.TabletInfo.get(), 1), 1);
std::unique_ptr<IEventHandle> ev = runtime.WaitForEdgeActorEvent({edgeActor});
UNIT_ASSERT(ev->Type == TEvKeyValue::EvReadResponse);
@@ -391,7 +391,7 @@ void RunTest(TTestEnv &env, TRangeReadRequestBuilder &builder, const std::vector
TActorId edgeActor = runtime.AllocateEdgeActor(1);
auto [intermediate, expectedValues] = builder.Build(edgeActor, edgeActor, 1, 1);
- runtime.Register(CreateKeyValueStorageReadRequest(std::move(intermediate), env.TabletInfo.get()), 1);
+ runtime.Register(CreateKeyValueStorageReadRequest(std::move(intermediate), env.TabletInfo.get(), 1), 1);
std::unique_ptr<IEventHandle> ev = runtime.WaitForEdgeActorEvent({edgeActor});
UNIT_ASSERT(ev->Type == TEvKeyValue::EvReadRangeResponse);
diff --git a/ydb/core/keyvalue/keyvalue_storage_request.cpp b/ydb/core/keyvalue/keyvalue_storage_request.cpp
index e9032d2ed3d..bd92f061686 100644
--- a/ydb/core/keyvalue/keyvalue_storage_request.cpp
+++ b/ydb/core/keyvalue/keyvalue_storage_request.cpp
@@ -28,6 +28,7 @@ class TKeyValueStorageRequest : public TActorBootstrapped<TKeyValueStorageReques
ui64 InFlightQueries;
ui64 InFlightRequestsLimit;
ui64 NextInFlightBatchCookie;
+ ui32 TabletGeneration;
THolder<TIntermediate> IntermediateResults;
@@ -66,7 +67,7 @@ public:
return NKikimrServices::TActivity::KEYVALUE_ACTOR;
}
- TKeyValueStorageRequest(THolder<TIntermediate>&& intermediate, const TTabletStorageInfo *tabletInfo)
+ TKeyValueStorageRequest(THolder<TIntermediate>&& intermediate, const TTabletStorageInfo *tabletInfo, ui32 tabletGeneration)
: ReadRequestsSent(0)
, ReadRequestsReplied(0)
, WriteRequestsSent(0)
@@ -80,6 +81,7 @@ public:
, InFlightQueries(0)
, InFlightRequestsLimit(10)
, NextInFlightBatchCookie(1)
+ , TabletGeneration(tabletGeneration)
, IntermediateResults(std::move(intermediate))
, TabletInfo(const_cast<TTabletStorageInfo*>(tabletInfo))
{
@@ -586,12 +588,11 @@ public:
InFlightBatchByCookie[cookie] = std::move(request);
Y_VERIFY(queryIdx == readQueryCount);
- SendToBSProxy(
- ctx, prevGroup,
- new TEvBlobStorage::TEvGet(readQueries, readQueryCount, IntermediateResults->Deadline,
- handleClass, false),
- cookie);
+ auto ev = std::make_unique<TEvBlobStorage::TEvGet>(readQueries, readQueryCount, IntermediateResults->Deadline, handleClass, false);
+ ev->ReaderTabletId = TabletInfo->TabletID;
+ ev->ReaderTabletGeneration = TabletGeneration;
+ SendToBSProxy(ctx, prevGroup, ev.release(), cookie);
return true;
}
@@ -684,9 +685,11 @@ public:
}
};
-IActor* CreateKeyValueStorageRequest(THolder<TIntermediate>&& intermediate,
- const TTabletStorageInfo *tabletInfo) {
- return new TKeyValueStorageRequest(std::move(intermediate), tabletInfo);
+IActor* CreateKeyValueStorageRequest(
+ THolder<TIntermediate>&& intermediate,
+ const TTabletStorageInfo *tabletInfo,
+ ui32 tabletGeneration) {
+ return new TKeyValueStorageRequest(std::move(intermediate), tabletInfo, tabletGeneration);
}
} // NKeyValue
diff --git a/ydb/core/keyvalue/keyvalue_storage_request.h b/ydb/core/keyvalue/keyvalue_storage_request.h
index a821093e216..8e0f4037e08 100644
--- a/ydb/core/keyvalue/keyvalue_storage_request.h
+++ b/ydb/core/keyvalue/keyvalue_storage_request.h
@@ -2,11 +2,16 @@
#include "defs.h"
#include "keyvalue_intermediate.h"
+#include <util/generic/ptr.h>
+#include <util/system/types.h>
+
namespace NKikimr {
namespace NKeyValue {
-IActor* CreateKeyValueStorageRequest(THolder<TIntermediate>&& intermediate,
- const TTabletStorageInfo *tabletInfo);
+IActor* CreateKeyValueStorageRequest(
+ THolder<TIntermediate>&& intermediate,
+ const TTabletStorageInfo *tabletInfo,
+ ui32 tabletGeneration);
} // NKeyValue
} // NKikimr