diff options
author | yuryalekseev <yuryalekseev@yandex-team.com> | 2022-09-23 16:01:10 +0300 |
---|---|---|
committer | yuryalekseev <yuryalekseev@yandex-team.com> | 2022-09-23 16:01:10 +0300 |
commit | aa94e3bdb22065ec4106bacdbe1b645481570dea (patch) | |
tree | e8d6336b1ef62e2c7d56066e17e99bf4522f890f | |
parent | 1cedeebe24143a428fbc39266284b6ab5d1529f4 (diff) | |
download | ydb-aa94e3bdb22065ec4106bacdbe1b645481570dea.tar.gz |
Add Reader* params to TEvGet requests from the key value tablet.
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.cpp | 32 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_storage_read_request.cpp | 14 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_storage_read_request.h | 9 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_storage_request.cpp | 21 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_storage_request.h | 9 |
6 files changed, 53 insertions, 36 deletions
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index a203995750c..26431937b2f 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -2864,15 +2864,15 @@ bool TKeyValueState::PrepareAcquireLockRequest(const TActorContext &ctx, TEvKeyV } void RegisterReadRequestActor(const TActorContext &ctx, THolder<TIntermediate> &&intermediate, - const TTabletStorageInfo *info) + const TTabletStorageInfo *info, ui32 tabletGeneration) { - ctx.RegisterWithSameMailbox(CreateKeyValueStorageReadRequest(std::move(intermediate), info)); + ctx.RegisterWithSameMailbox(CreateKeyValueStorageReadRequest(std::move(intermediate), info, tabletGeneration)); } void RegisterRequestActor(const TActorContext &ctx, THolder<TIntermediate> &&intermediate, - const TTabletStorageInfo *info) + const TTabletStorageInfo *info, ui32 tabletGeneration) { - ctx.RegisterWithSameMailbox(CreateKeyValueStorageRequest(std::move(intermediate), info)); + ctx.RegisterWithSameMailbox(CreateKeyValueStorageRequest(std::move(intermediate), info, tabletGeneration)); } void TKeyValueState::ProcessPostponedIntermediate(const TActorContext& ctx, THolder<TIntermediate> &&intermediate, @@ -2880,10 +2880,10 @@ void TKeyValueState::ProcessPostponedIntermediate(const TActorContext& ctx, THol { switch(intermediate->EvType) { case TEvKeyValue::TEvRequest::EventType: - return RegisterRequestActor(ctx, std::move(intermediate), info); + return RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); case TEvKeyValue::TEvRead::EventType: case TEvKeyValue::TEvReadRange::EventType: - return RegisterReadRequestActor(ctx, std::move(intermediate), info); + return RegisterReadRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); default: Y_FAIL_S("Unexpected event type# " << intermediate->EvType); } @@ -2905,13 +2905,13 @@ void TKeyValueState::OnEvReadRequest(TEvKeyValue::TEvRead::TPtr &ev, const TActo if (requestType == TRequestType::ReadOnlyInline) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create storage inline read request, Marker# KV49"); - RegisterReadRequestActor(ctx, std::move(intermediate), info); + RegisterReadRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); ++RoInlineIntermediatesInFlight; } else { if (IntermediatesInFlight < IntermediatesInFlightLimit) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create storage read request, Marker# KV54"); - RegisterReadRequestActor(ctx, std::move(intermediate), info); + RegisterReadRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); ++IntermediatesInFlight; } else { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId @@ -2942,13 +2942,13 @@ void TKeyValueState::OnEvReadRangeRequest(TEvKeyValue::TEvReadRange::TPtr &ev, c if (requestType == TRequestType::ReadOnlyInline) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create storage inline read range request, Marker# KV58"); - RegisterReadRequestActor(ctx, std::move(intermediate), info); + RegisterReadRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); ++RoInlineIntermediatesInFlight; } else { if (IntermediatesInFlight < IntermediatesInFlightLimit) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create storage read range request, Marker# KV66"); - RegisterReadRequestActor(ctx, std::move(intermediate), info); + RegisterReadRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); ++IntermediatesInFlight; } else { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId @@ -2979,7 +2979,7 @@ void TKeyValueState::OnEvExecuteTransaction(TEvKeyValue::TEvExecuteTransaction:: ++InFlightForStep[StoredState.GetChannelStep()]; LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create storage request for WO, Marker# KV67"); - RegisterRequestActor(ctx, std::move(intermediate), info); + RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); CountRequestTakeOffOrEnqueue(requestType); } else { @@ -3003,7 +3003,7 @@ void TKeyValueState::OnEvGetStorageChannelStatus(TEvKeyValue::TEvGetStorageChann ++InFlightForStep[StoredState.GetChannelStep()]; LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create GetStorageChannelStatus request, Marker# KV75"); - RegisterRequestActor(ctx, std::move(intermediate), info); + RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); ++RoInlineIntermediatesInFlight; CountRequestTakeOffOrEnqueue(requestType); } else { @@ -3027,7 +3027,7 @@ void TKeyValueState::OnEvAcquireLock(TEvKeyValue::TEvAcquireLock::TPtr &ev, cons ++InFlightForStep[StoredState.GetChannelStep()]; LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create AcquireLock request, Marker# KV80"); - RegisterRequestActor(ctx, std::move(intermediate), info); + RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); ++RoInlineIntermediatesInFlight; CountRequestTakeOffOrEnqueue(requestType); } else { @@ -3074,17 +3074,17 @@ void TKeyValueState::OnEvRequest(TEvKeyValue::TEvRequest::TPtr &ev, const TActor if (requestType == TRequestType::WriteOnly) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create storage request for WO, Marker# KV42"); - RegisterRequestActor(ctx, std::move(intermediate), info); + RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); } else if (requestType == TRequestType::ReadOnlyInline) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create storage request for RO_INLINE, Marker# KV45"); - RegisterRequestActor(ctx, std::move(intermediate), info); + RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); ++RoInlineIntermediatesInFlight; } else { if (IntermediatesInFlight < IntermediatesInFlightLimit) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create storage request for RO/RW, Marker# KV43"); - RegisterRequestActor(ctx, std::move(intermediate), info); + RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); ++IntermediatesInFlight; } else { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp index d9c2625e6bb..9b7464bf83f 100644 --- a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp @@ -39,6 +39,7 @@ class TKeyValueStorageReadRequest : public TActorBootstrapped<TKeyValueStorageRe THolder<TIntermediate> IntermediateResult; const TTabletStorageInfo *TabletInfo; + ui32 TabletGeneration; TStackVec<TGetBatch, 1> Batches; ui32 ReceivedGetResults = 0; @@ -187,10 +188,12 @@ public: readItem.InFlight = true; } - std::unique_ptr<TEvBlobStorage::TEvGet> get = std::make_unique<TEvBlobStorage::TEvGet>( + auto ev = std::make_unique<TEvBlobStorage::TEvGet>( readQueries, batch.ReadItemIndecies.size(), IntermediateResult->Deadline, handleClass, false); + ev->ReaderTabletId = TabletInfo->TabletID; + ev->ReaderTabletGeneration = TabletGeneration; - SendToBSProxy(TActivationContext::AsActorContext(), batch.GroupId, get.release(), + SendToBSProxy(TActivationContext::AsActorContext(), batch.GroupId, ev.release(), batch.Cookie); batch.SentTime = TActivationContext::Now(); } @@ -472,17 +475,18 @@ public: } TKeyValueStorageReadRequest(THolder<TIntermediate> &&intermediate, - const TTabletStorageInfo *tabletInfo) + const TTabletStorageInfo *tabletInfo, ui32 tabletGeneration) : IntermediateResult(std::move(intermediate)) , TabletInfo(tabletInfo) + , TabletGeneration(tabletGeneration) {} }; IActor* CreateKeyValueStorageReadRequest(THolder<TIntermediate>&& intermediate, - const TTabletStorageInfo *tabletInfo) + const TTabletStorageInfo *tabletInfo, ui32 tabletGeneration) { - return new TKeyValueStorageReadRequest(std::move(intermediate), tabletInfo); + return new TKeyValueStorageReadRequest(std::move(intermediate), tabletInfo, tabletGeneration); } } // NKeyValue diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request.h b/ydb/core/keyvalue/keyvalue_storage_read_request.h index 20e01dc5e95..4f2a49bcf48 100644 --- a/ydb/core/keyvalue/keyvalue_storage_read_request.h +++ b/ydb/core/keyvalue/keyvalue_storage_read_request.h @@ -2,11 +2,16 @@ #include "defs.h" #include "keyvalue_events.h" +#include <util/generic/ptr.h> +#include <util/system/types.h> + namespace NKikimr { namespace NKeyValue { -IActor* CreateKeyValueStorageReadRequest(THolder<TIntermediate>&& intermediate, - const TTabletStorageInfo *tabletInfo); +IActor* CreateKeyValueStorageReadRequest( + THolder<TIntermediate>&& intermediate, + const TTabletStorageInfo *tabletInfo, + ui32 tabletGeneration); } // NKeyValue } // NKikimr diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp index 9d4c716e640..5aef2730f06 100644 --- a/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp @@ -267,7 +267,7 @@ void RunTest(TTestEnv &env, TReadRequestBuilder &builder, TActorId edgeActor = runtime.AllocateEdgeActor(1); auto [intermediate, expectedValues] = builder.Build(edgeActor, edgeActor, 1, 1); - runtime.Register(CreateKeyValueStorageReadRequest(std::move(intermediate), env.TabletInfo.get()), 1); + runtime.Register(CreateKeyValueStorageReadRequest(std::move(intermediate), env.TabletInfo.get(), 1), 1); std::unique_ptr<IEventHandle> ev = runtime.WaitForEdgeActorEvent({edgeActor}); UNIT_ASSERT(ev->Type == TEvKeyValue::EvReadResponse); @@ -391,7 +391,7 @@ void RunTest(TTestEnv &env, TRangeReadRequestBuilder &builder, const std::vector TActorId edgeActor = runtime.AllocateEdgeActor(1); auto [intermediate, expectedValues] = builder.Build(edgeActor, edgeActor, 1, 1); - runtime.Register(CreateKeyValueStorageReadRequest(std::move(intermediate), env.TabletInfo.get()), 1); + runtime.Register(CreateKeyValueStorageReadRequest(std::move(intermediate), env.TabletInfo.get(), 1), 1); std::unique_ptr<IEventHandle> ev = runtime.WaitForEdgeActorEvent({edgeActor}); UNIT_ASSERT(ev->Type == TEvKeyValue::EvReadRangeResponse); diff --git a/ydb/core/keyvalue/keyvalue_storage_request.cpp b/ydb/core/keyvalue/keyvalue_storage_request.cpp index e9032d2ed3d..bd92f061686 100644 --- a/ydb/core/keyvalue/keyvalue_storage_request.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_request.cpp @@ -28,6 +28,7 @@ class TKeyValueStorageRequest : public TActorBootstrapped<TKeyValueStorageReques ui64 InFlightQueries; ui64 InFlightRequestsLimit; ui64 NextInFlightBatchCookie; + ui32 TabletGeneration; THolder<TIntermediate> IntermediateResults; @@ -66,7 +67,7 @@ public: return NKikimrServices::TActivity::KEYVALUE_ACTOR; } - TKeyValueStorageRequest(THolder<TIntermediate>&& intermediate, const TTabletStorageInfo *tabletInfo) + TKeyValueStorageRequest(THolder<TIntermediate>&& intermediate, const TTabletStorageInfo *tabletInfo, ui32 tabletGeneration) : ReadRequestsSent(0) , ReadRequestsReplied(0) , WriteRequestsSent(0) @@ -80,6 +81,7 @@ public: , InFlightQueries(0) , InFlightRequestsLimit(10) , NextInFlightBatchCookie(1) + , TabletGeneration(tabletGeneration) , IntermediateResults(std::move(intermediate)) , TabletInfo(const_cast<TTabletStorageInfo*>(tabletInfo)) { @@ -586,12 +588,11 @@ public: InFlightBatchByCookie[cookie] = std::move(request); Y_VERIFY(queryIdx == readQueryCount); - SendToBSProxy( - ctx, prevGroup, - new TEvBlobStorage::TEvGet(readQueries, readQueryCount, IntermediateResults->Deadline, - handleClass, false), - cookie); + auto ev = std::make_unique<TEvBlobStorage::TEvGet>(readQueries, readQueryCount, IntermediateResults->Deadline, handleClass, false); + ev->ReaderTabletId = TabletInfo->TabletID; + ev->ReaderTabletGeneration = TabletGeneration; + SendToBSProxy(ctx, prevGroup, ev.release(), cookie); return true; } @@ -684,9 +685,11 @@ public: } }; -IActor* CreateKeyValueStorageRequest(THolder<TIntermediate>&& intermediate, - const TTabletStorageInfo *tabletInfo) { - return new TKeyValueStorageRequest(std::move(intermediate), tabletInfo); +IActor* CreateKeyValueStorageRequest( + THolder<TIntermediate>&& intermediate, + const TTabletStorageInfo *tabletInfo, + ui32 tabletGeneration) { + return new TKeyValueStorageRequest(std::move(intermediate), tabletInfo, tabletGeneration); } } // NKeyValue diff --git a/ydb/core/keyvalue/keyvalue_storage_request.h b/ydb/core/keyvalue/keyvalue_storage_request.h index a821093e216..8e0f4037e08 100644 --- a/ydb/core/keyvalue/keyvalue_storage_request.h +++ b/ydb/core/keyvalue/keyvalue_storage_request.h @@ -2,11 +2,16 @@ #include "defs.h" #include "keyvalue_intermediate.h" +#include <util/generic/ptr.h> +#include <util/system/types.h> + namespace NKikimr { namespace NKeyValue { -IActor* CreateKeyValueStorageRequest(THolder<TIntermediate>&& intermediate, - const TTabletStorageInfo *tabletInfo); +IActor* CreateKeyValueStorageRequest( + THolder<TIntermediate>&& intermediate, + const TTabletStorageInfo *tabletInfo, + ui32 tabletGeneration); } // NKeyValue } // NKikimr |