diff options
author | kruall <kruall@yandex-team.ru> | 2022-06-06 14:51:42 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-06-06 14:51:42 +0300 |
commit | 502f43be9df33c1a73e433ec7f46e6121c751db9 (patch) | |
tree | be8f9fd85e0f6ce7da59e86c48b2ee1a08661914 | |
parent | f3748b98db7af1f1fd7e345bb6e8d452387d3366 (diff) | |
download | ydb-502f43be9df33c1a73e433ec7f46e6121c751db9.tar.gz |
merge from trunk: r9240076, r9244096, r9255567, r9287393, r9386382; KIKIMR-13253 KV GRPC API
REVIEW: 2498910
x-ydb-stable-ref: f305413c20c64e5a0157a85c3672d04ef169ac6f
-rw-r--r-- | ydb/core/keyvalue/keyvalue_events.h | 40 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_flat_impl.h | 12 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_intermediate.h | 4 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.cpp | 100 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.h | 28 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_storage_read_request.cpp | 19 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_ut.cpp | 22 | ||||
-rw-r--r-- | ydb/core/keyvalue/protos/events.proto | 69 | ||||
-rw-r--r-- | ydb/core/test_tablet/load_actor_read_validate.cpp | 1 |
10 files changed, 156 insertions, 141 deletions
diff --git a/ydb/core/keyvalue/keyvalue_events.h b/ydb/core/keyvalue/keyvalue_events.h index 03bc81ad8f..2c8c23256a 100644 --- a/ydb/core/keyvalue/keyvalue_events.h +++ b/ydb/core/keyvalue/keyvalue_events.h @@ -27,16 +27,16 @@ struct TEvKeyValue { EvRead = EvRequest + 16, EvReadRange, EvExecuteTransaction, - EvGetStatus, - EvObtainLock, + EvGetStorageChannelStatus, + EvAcquireLock, EvResponse = EvRequest + 512, EvReadResponse = EvResponse + 16, EvReadRangeResponse, EvExecuteTransactionResponse, - EvGetStatusResponse, - EvObtainLockResponse, + EvGetStorageChannelStatusResponse, + EvAcquireLockResponse, EvEnd }; @@ -87,32 +87,32 @@ struct TEvKeyValue { TEvExecuteTransactionResponse() { } }; - struct TEvGetStatusResponse; + struct TEvGetStorageChannelStatusResponse; - struct TEvGetStatus : public TEventPB<TEvGetStatus, - NKikimrKeyValue::GetStatusRequest, EvGetStatus> { + struct TEvGetStorageChannelStatus : public TEventPB<TEvGetStorageChannelStatus, + NKikimrKeyValue::GetStorageChannelStatusRequest, EvGetStorageChannelStatus> { - using TResponse = TEvGetStatusResponse; - TEvGetStatus() { } + using TResponse = TEvGetStorageChannelStatusResponse; + TEvGetStorageChannelStatus() { } }; - struct TEvGetStatusResponse : public TEventPB<TEvGetStatusResponse, - NKikimrKeyValue::GetStatusResult, EvGetStatusResponse> { - TEvGetStatusResponse() { } + struct TEvGetStorageChannelStatusResponse : public TEventPB<TEvGetStorageChannelStatusResponse, + NKikimrKeyValue::GetStorageChannelStatusResult, EvGetStorageChannelStatusResponse> { + TEvGetStorageChannelStatusResponse() { } }; - struct TEvObtainLockResponse; + struct TEvAcquireLockResponse; - struct TEvObtainLock : public TEventPB<TEvObtainLock, - NKikimrKeyValue::ObtainLockRequest, EvObtainLock> { + struct TEvAcquireLock : public TEventPB<TEvAcquireLock, + NKikimrKeyValue::AcquireLockRequest, EvAcquireLock> { - using TResponse = TEvObtainLockResponse; - TEvObtainLock() { } + using TResponse = TEvAcquireLockResponse; + TEvAcquireLock() { } }; - struct TEvObtainLockResponse : public TEventPB<TEvObtainLockResponse, - NKikimrKeyValue::ObtainLockResult, EvObtainLockResponse> { - TEvObtainLockResponse() { } + struct TEvAcquireLockResponse : public TEventPB<TEvAcquireLockResponse, + NKikimrKeyValue::AcquireLockResult, EvAcquireLockResponse> { + TEvAcquireLockResponse() { } }; struct TEvRequest : public TEventPB<TEvRequest, diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h index d5c52a1bd7..859e07d4de 100644 --- a/ydb/core/keyvalue/keyvalue_flat_impl.h +++ b/ydb/core/keyvalue/keyvalue_flat_impl.h @@ -293,12 +293,12 @@ protected: State.OnEvExecuteTransaction(ev, TActivationContext::AsActorContext(), Info()); } - void Handle(TEvKeyValue::TEvGetStatus::TPtr &ev) { - State.OnEvGetStatus(ev, TActivationContext::AsActorContext(), Info()); + void Handle(TEvKeyValue::TEvGetStorageChannelStatus::TPtr &ev) { + State.OnEvGetStorageChannelStatus(ev, TActivationContext::AsActorContext(), Info()); } - void Handle(TEvKeyValue::TEvObtainLock::TPtr &ev) { - State.OnEvObtainLock(ev, TActivationContext::AsActorContext(), Info()); + void Handle(TEvKeyValue::TEvAcquireLock::TPtr &ev) { + State.OnEvAcquireLock(ev, TActivationContext::AsActorContext(), Info()); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -492,8 +492,8 @@ public: hFunc(TEvKeyValue::TEvRead, Handle); hFunc(TEvKeyValue::TEvReadRange, Handle); hFunc(TEvKeyValue::TEvExecuteTransaction, Handle); - hFunc(TEvKeyValue::TEvGetStatus, Handle); - hFunc(TEvKeyValue::TEvObtainLock, Handle); + hFunc(TEvKeyValue::TEvGetStorageChannelStatus, Handle); + hFunc(TEvKeyValue::TEvAcquireLock, Handle); HFunc(TEvKeyValue::TEvEraseCollect, Handle); HFunc(TEvKeyValue::TEvCollect, Handle); diff --git a/ydb/core/keyvalue/keyvalue_intermediate.h b/ydb/core/keyvalue/keyvalue_intermediate.h index 38698a9c73..ef52c37a63 100644 --- a/ydb/core/keyvalue/keyvalue_intermediate.h +++ b/ydb/core/keyvalue/keyvalue_intermediate.h @@ -152,9 +152,9 @@ struct TIntermediate { NKikimrClient::TResponse Response; NKikimrKeyValue::ExecuteTransactionResult ExecuteTransactionResponse; - NKikimrKeyValue::GetStatusResult GetStatusResponse; + NKikimrKeyValue::GetStorageChannelStatusResult GetStorageChannelStatusResponse; - THashMap<ui32, NKikimrKeyValue::Channel*> Channels; + THashMap<ui32, NKikimrKeyValue::StorageChannel*> Channels; ui32 EvType = 0; diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index 2c4246d21e..a196511633 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -874,19 +874,28 @@ void TKeyValueState::Reply(THolder<TIntermediate> &intermediate, const TActorCon THolder<TEvKeyValue::TEvExecuteTransactionResponse> response(new TEvKeyValue::TEvExecuteTransactionResponse); response->Record = intermediate->ExecuteTransactionResponse; ResourceMetrics->Network.Increment(response->Record.ByteSize()); + if (intermediate->RespondTo.NodeId() != ctx.SelfID.NodeId()) { + response->Record.set_node_id(ctx.SelfID.NodeId()); + } ctx.Send(intermediate->RespondTo, response.Release()); } - if (intermediate->EvType == TEvKeyValue::TEvGetStatus::EventType) { - THolder<TEvKeyValue::TEvGetStatusResponse> response(new TEvKeyValue::TEvGetStatusResponse); - response->Record = intermediate->GetStatusResponse; + if (intermediate->EvType == TEvKeyValue::TEvGetStorageChannelStatus::EventType) { + THolder<TEvKeyValue::TEvGetStorageChannelStatusResponse> response(new TEvKeyValue::TEvGetStorageChannelStatusResponse); + response->Record = intermediate->GetStorageChannelStatusResponse; ResourceMetrics->Network.Increment(response->Record.ByteSize()); + if (intermediate->RespondTo.NodeId() != ctx.SelfID.NodeId()) { + response->Record.set_node_id(ctx.SelfID.NodeId()); + } ctx.Send(intermediate->RespondTo, response.Release()); } - if (intermediate->EvType == TEvKeyValue::TEvObtainLock::EventType) { - THolder<TEvKeyValue::TEvObtainLockResponse> response(new TEvKeyValue::TEvObtainLockResponse); + if (intermediate->EvType == TEvKeyValue::TEvAcquireLock::EventType) { + THolder<TEvKeyValue::TEvAcquireLockResponse> response(new TEvKeyValue::TEvAcquireLockResponse); response->Record.set_lock_generation(StoredState.GetUserGeneration()); response->Record.set_cookie(intermediate->Cookie); ResourceMetrics->Network.Increment(response->Record.ByteSize()); + if (intermediate->RespondTo.NodeId() != ctx.SelfID.NodeId()) { + response->Record.set_node_id(ctx.SelfID.NodeId()); + } ctx.Send(intermediate->RespondTo, response.Release()); } intermediate->IsReplied = true; @@ -901,7 +910,7 @@ void TKeyValueState::Reply(THolder<TIntermediate> &intermediate, const TActorCon void TKeyValueState::ProcessCmd(TIntermediate::TRead &request, NKikimrClient::TKeyValueResponse::TReadResult *legacyResponse, - NKikimrKeyValue::Channel */*response*/, + NKikimrKeyValue::StorageChannel */*response*/, ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/, TIntermediate* /*intermediate*/) { @@ -941,7 +950,7 @@ void TKeyValueState::ProcessCmd(TIntermediate::TRead &request, void TKeyValueState::ProcessCmd(TIntermediate::TRangeRead &request, NKikimrClient::TKeyValueResponse::TReadRangeResult *legacyResponse, - NKikimrKeyValue::Channel */*response*/, + NKikimrKeyValue::StorageChannel */*response*/, ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/, TIntermediate* /*intermediate*/) { @@ -998,33 +1007,19 @@ void TKeyValueState::ProcessCmd(TIntermediate::TRangeRead &request, } -void SetStatusFlags(NKikimrKeyValue::Flags *flags, const TStorageStatusFlags &statusFlags) { - if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceCyan)) { - flags->set_disk_space_cyan(true); - } - if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove)) { - flags->set_disk_space_light_yellow_move(true); - } - if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceYellowStop)) { - flags->set_disk_space_yellow_stop(true); - } - if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceLightOrange)) { - flags->set_disk_space_light_orange(true); - } +NKikimrKeyValue::StorageChannel::StatusFlag GetStatusFlag(const TStorageStatusFlags &statusFlags) { if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceOrange)) { - flags->set_disk_space_orange(true); + return NKikimrKeyValue::StorageChannel::STATUS_FLAG_ORANGE_OUT_SPACE; } - if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceRed)) { - flags->set_disk_space_red(true); - } - if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceBlack)) { - flags->set_disk_space_black(true); + if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceOrange)) { + return NKikimrKeyValue::StorageChannel::STATUS_FLAG_YELLOW_STOP; } + return NKikimrKeyValue::StorageChannel::STATUS_FLAG_GREEN; } void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request, NKikimrClient::TKeyValueResponse::TWriteResult *legacyResponse, - NKikimrKeyValue::Channel *response, + NKikimrKeyValue::StorageChannel *response, ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime, TIntermediate* /*intermediate*/) { @@ -1068,15 +1063,14 @@ void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request, } if (response) { response->set_status(NKikimrKeyValue::Statuses::RSTATUS_OK); - auto *flags = response->mutable_status_flags(); - SetStatusFlags(flags, request.StatusFlags); + response->set_status_flag(GetStatusFlag(request.StatusFlags)); response->set_storage_channel(storage_channel); } } void TKeyValueState::ProcessCmd(const TIntermediate::TDelete &request, NKikimrClient::TKeyValueResponse::TDeleteRangeResult *legacyResponse, - NKikimrKeyValue::Channel */*response*/, + NKikimrKeyValue::StorageChannel */*response*/, ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 /*unixTime*/, TIntermediate* /*intermediate*/) { @@ -1095,7 +1089,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TDelete &request, void TKeyValueState::ProcessCmd(const TIntermediate::TRename &request, NKikimrClient::TKeyValueResponse::TRenameResult *legacyResponse, - NKikimrKeyValue::Channel */*response*/, + NKikimrKeyValue::StorageChannel */*response*/, ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime, TIntermediate* /*intermediate*/) { @@ -1120,7 +1114,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TRename &request, void TKeyValueState::ProcessCmd(const TIntermediate::TCopyRange &request, NKikimrClient::TKeyValueResponse::TCopyRangeResult *legacyResponse, - NKikimrKeyValue::Channel */*response*/, + NKikimrKeyValue::StorageChannel */*response*/, ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 /*unixTime*/, TIntermediate *intermediate) { @@ -1156,7 +1150,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TCopyRange &request, void TKeyValueState::ProcessCmd(const TIntermediate::TConcat &request, NKikimrClient::TKeyValueResponse::TConcatResult *legacyResponse, - NKikimrKeyValue::Channel */*response*/, + NKikimrKeyValue::StorageChannel */*response*/, ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime, TIntermediate *intermediate) { @@ -1264,8 +1258,8 @@ void TKeyValueState::CmdGetStatus(THolder<TIntermediate> &intermediate, ISimpleD response.SetStatus(request.Status); response.SetStorageChannel(request.StorageChannel); response.SetStatusFlags(request.StatusFlags.Raw); - } else if ((intermediate->EvType == TEvKeyValue::TEvGetStatus::EventType)) { - auto response = intermediate->GetStatusResponse.add_channel(); + } else if ((intermediate->EvType == TEvKeyValue::TEvGetStorageChannelStatus::EventType)) { + auto response = intermediate->GetStorageChannelStatusResponse.add_storage_channel(); if (request.Status == NKikimrProto::OK) { response->set_status(NKikimrKeyValue::Statuses::RSTATUS_OK); @@ -1281,10 +1275,10 @@ void TKeyValueState::CmdGetStatus(THolder<TIntermediate> &intermediate, ISimpleD response->set_storage_channel(request.StorageChannel - BLOB_CHANNEL + MainStorageChannelInPublicApi); } - SetStatusFlags(response->mutable_status_flags(), request.StatusFlags); + response->set_status_flag(GetStatusFlag(request.StatusFlags)); } } - intermediate->GetStatusResponse.set_status(NKikimrKeyValue::Statuses::RSTATUS_OK); + intermediate->GetStorageChannelStatusResponse.set_status(NKikimrKeyValue::Statuses::RSTATUS_OK); } void TKeyValueState::CmdCopyRange(THolder<TIntermediate>& intermediate, ISimpleDb& db, const TActorContext& ctx) { @@ -1340,7 +1334,7 @@ void TKeyValueState::CmdSetExecutorFastLogPolicy(THolder<TIntermediate> &interme void TKeyValueState::CmdCmds(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx) { ui64 unixTime = TAppData::TimeProvider->Now().Seconds(); bool wasWrite = false; - auto getChannel = [&](auto &cmd) -> NKikimrKeyValue::Channel* { + auto getChannel = [&](auto &cmd) -> NKikimrKeyValue::StorageChannel* { using Type = std::decay_t<decltype(cmd)>; if constexpr (std::is_same_v<Type, TIntermediate::TWrite>) { if (intermediate->EvType != TEvKeyValue::TEvExecuteTransaction::EventType) { @@ -1355,7 +1349,7 @@ void TKeyValueState::CmdCmds(THolder<TIntermediate> &intermediate, ISimpleDb &db } auto it = intermediate->Channels.find(storageChannel); if (it == intermediate->Channels.end()) { - auto channel = intermediate->ExecuteTransactionResponse.add_channel(); + auto channel = intermediate->ExecuteTransactionResponse.add_storage_channel(); intermediate->Channels.emplace(storageChannel, channel); return channel; } @@ -2605,7 +2599,7 @@ bool TKeyValueState::PrepareReadRequest(const TActorContext &ctx, TEvKeyValue::T response.Status = NKikimrProto::NODATA; response.Message = "No such key Marker# KV55"; ReplyError<TEvKeyValue::TEvReadResponse>(ctx, response.Message, - NKikimrKeyValue::Statuses::RSTATUS_NO_DATA, intermediate); + NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND, intermediate); return false; } bool isOverRun = PrepareOneRead<NKikimrKeyValue::Priorities, true, ReadResultSizeEstimationNewApi>( @@ -2750,12 +2744,12 @@ TKeyValueState::TPrepareResult TKeyValueState::PrepareOneGetStatus(TIntermediate } -bool TKeyValueState::PrepareGetStatusRequest(const TActorContext &ctx, TEvKeyValue::TEvGetStatus::TPtr &ev, +bool TKeyValueState::PrepareGetStorageChannelStatusRequest(const TActorContext &ctx, TEvKeyValue::TEvGetStorageChannelStatus::TPtr &ev, THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info) { - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " PrepareGetStatusRequest Marker# KV78"); + LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " PrepareGetStorageChannelStatusRequest Marker# KV78"); - NKikimrKeyValue::GetStatusRequest &request = ev->Get()->Record; + NKikimrKeyValue::GetStorageChannelStatusRequest &request = ev->Get()->Record; StoredState.SetChannelGeneration(ExecutorGeneration); StoredState.SetChannelStep(NextLogoBlobStep - 1); @@ -2766,7 +2760,7 @@ bool TKeyValueState::PrepareGetStatusRequest(const TActorContext &ctx, TEvKeyVal intermediate->RequestUid = NextRequestUid; ++NextRequestUid; RequestInputTime[intermediate->RequestUid] = TAppData::TimeProvider->Now(); - intermediate->EvType = TEvKeyValue::TEvGetStatus::EventType; + intermediate->EvType = TEvKeyValue::TEvGetStorageChannelStatus::EventType; if (CheckDeadline(ctx, ev->Get(), intermediate)) { return false; @@ -2786,10 +2780,10 @@ bool TKeyValueState::PrepareGetStatusRequest(const TActorContext &ctx, TEvKeyVal return true; } -bool TKeyValueState::PrepareObtainLockRequest(const TActorContext &ctx, TEvKeyValue::TEvObtainLock::TPtr &ev, +bool TKeyValueState::PrepareAcquireLockRequest(const TActorContext &ctx, TEvKeyValue::TEvAcquireLock::TPtr &ev, THolder<TIntermediate> &intermediate) { - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " PrepareObtainLockRequest Marker# KV79"); + LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " PrepareAcquireLockRequest Marker# KV79"); StoredState.SetChannelGeneration(ExecutorGeneration); StoredState.SetChannelStep(NextLogoBlobStep - 1); @@ -2801,7 +2795,7 @@ bool TKeyValueState::PrepareObtainLockRequest(const TActorContext &ctx, TEvKeyVa intermediate->RequestUid = NextRequestUid; ++NextRequestUid; RequestInputTime[intermediate->RequestUid] = TAppData::TimeProvider->Now(); - intermediate->EvType = TEvKeyValue::TEvObtainLock::EventType; + intermediate->EvType = TEvKeyValue::TEvAcquireLock::EventType; intermediate->HasIncrementGeneration = true; return true; } @@ -2931,7 +2925,7 @@ void TKeyValueState::OnEvExecuteTransaction(TEvKeyValue::TEvExecuteTransaction:: } } -void TKeyValueState::OnEvGetStatus(TEvKeyValue::TEvGetStatus::TPtr &ev, const TActorContext &ctx, +void TKeyValueState::OnEvGetStorageChannelStatus(TEvKeyValue::TEvGetStorageChannelStatus::TPtr &ev, const TActorContext &ctx, const TTabletStorageInfo *info) { THolder<TIntermediate> intermediate; @@ -2942,10 +2936,10 @@ void TKeyValueState::OnEvGetStatus(TEvKeyValue::TEvGetStatus::TPtr &ev, const TA TRequestType::EType requestType = TRequestType::ReadOnlyInline; CountRequestIncoming(requestType); - if (PrepareGetStatusRequest(ctx, ev, intermediate, info)) { + if (PrepareGetStorageChannelStatusRequest(ctx, ev, intermediate, info)) { ++InFlightForStep[StoredState.GetChannelStep()]; LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId - << " Create GetStatus request, Marker# KV75"); + << " Create GetStorageChannelStatus request, Marker# KV75"); RegisterRequestActor(ctx, std::move(intermediate), info); ++RoInlineIntermediatesInFlight; CountRequestTakeOffOrEnqueue(requestType); @@ -2955,7 +2949,7 @@ void TKeyValueState::OnEvGetStatus(TEvKeyValue::TEvGetStatus::TPtr &ev, const TA } } -void TKeyValueState::OnEvObtainLock(TEvKeyValue::TEvObtainLock::TPtr &ev, const TActorContext &ctx, +void TKeyValueState::OnEvAcquireLock(TEvKeyValue::TEvAcquireLock::TPtr &ev, const TActorContext &ctx, const TTabletStorageInfo *info) { THolder<TIntermediate> intermediate; @@ -2966,10 +2960,10 @@ void TKeyValueState::OnEvObtainLock(TEvKeyValue::TEvObtainLock::TPtr &ev, const TRequestType::EType requestType = TRequestType::ReadOnlyInline; CountRequestIncoming(requestType); - if (PrepareObtainLockRequest(ctx, ev, intermediate)) { + if (PrepareAcquireLockRequest(ctx, ev, intermediate)) { ++InFlightForStep[StoredState.GetChannelStep()]; LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId - << " Create ObtainLock request, Marker# KV80"); + << " Create AcquireLock request, Marker# KV80"); RegisterRequestActor(ctx, std::move(intermediate), info); ++RoInlineIntermediatesInFlight; CountRequestTakeOffOrEnqueue(requestType); diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h index 2c1c4ccf1f..ffdc1f7250 100644 --- a/ydb/core/keyvalue/keyvalue_state.h +++ b/ydb/core/keyvalue/keyvalue_state.h @@ -342,31 +342,31 @@ public: void Reply(THolder<TIntermediate> &intermediate, const TActorContext &ctx, const TTabletStorageInfo *info); void ProcessCmd(TIntermediate::TRead &read, NKikimrClient::TKeyValueResponse::TReadResult *legacyResponse, - NKikimrKeyValue::Channel *response, + NKikimrKeyValue::StorageChannel *response, ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void ProcessCmd(TIntermediate::TRangeRead &request, NKikimrClient::TKeyValueResponse::TReadRangeResult *legacyResponse, - NKikimrKeyValue::Channel *response, + NKikimrKeyValue::StorageChannel *response, ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void ProcessCmd(TIntermediate::TWrite &request, NKikimrClient::TKeyValueResponse::TWriteResult *legacyResponse, - NKikimrKeyValue::Channel *response, + NKikimrKeyValue::StorageChannel *response, ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void ProcessCmd(const TIntermediate::TDelete &request, NKikimrClient::TKeyValueResponse::TDeleteRangeResult *legacyResponse, - NKikimrKeyValue::Channel *response, + NKikimrKeyValue::StorageChannel *response, ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void ProcessCmd(const TIntermediate::TRename &request, NKikimrClient::TKeyValueResponse::TRenameResult *legacyResponse, - NKikimrKeyValue::Channel *response, + NKikimrKeyValue::StorageChannel *response, ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void ProcessCmd(const TIntermediate::TCopyRange &request, NKikimrClient::TKeyValueResponse::TCopyRangeResult *legacyResponse, - NKikimrKeyValue::Channel *response, + NKikimrKeyValue::StorageChannel *response, ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void ProcessCmd(const TIntermediate::TConcat &request, NKikimrClient::TKeyValueResponse::TConcatResult *resplegacyResponseonse, - NKikimrKeyValue::Channel *response, + NKikimrKeyValue::StorageChannel *response, ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void CmdRead(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx); void CmdReadRange(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx); @@ -429,9 +429,9 @@ public: const TTabletStorageInfo *info); void OnEvExecuteTransaction(TEvKeyValue::TEvExecuteTransaction::TPtr &ev, const TActorContext &ctx, const TTabletStorageInfo *info); - void OnEvGetStatus(TEvKeyValue::TEvGetStatus::TPtr &ev, const TActorContext &ctx, + void OnEvGetStorageChannelStatus(TEvKeyValue::TEvGetStorageChannelStatus::TPtr &ev, const TActorContext &ctx, const TTabletStorageInfo *info); - void OnEvObtainLock(TEvKeyValue::TEvObtainLock::TPtr &ev, const TActorContext &ctx, + void OnEvAcquireLock(TEvKeyValue::TEvAcquireLock::TPtr &ev, const TActorContext &ctx, const TTabletStorageInfo *info); void OnPeriodicRefresh(const TActorContext &ctx); @@ -485,14 +485,14 @@ public: auto &record = kvRequest->Record; intermediate->HasGeneration = true; intermediate->Generation = record.lock_generation(); - if (record.lock_generation() != StoredState.GetUserGeneration()) { + if (record.has_lock_generation() && record.lock_generation() != StoredState.GetUserGeneration()) { TStringStream str; str << "KeyValue# " << TabletId; str << " Generation mismatch! Requested# " << record.lock_generation(); str << " Actual# " << StoredState.GetUserGeneration(); str << " Marker# KV05"; ReplyError<typename TGrpcRequestWithLockGeneration::TResponse>(ctx, str.Str(), - NKikimrKeyValue::Statuses::RSTATUS_ERROR, intermediate); + NKikimrKeyValue::Statuses::RSTATUS_WRONG_LOCK_GENERATION, intermediate); return true; } return false; @@ -561,7 +561,7 @@ public: response->Record.set_requested_size(interRead.ValueSize); response->Record.set_requested_key(interRead.Key); } - if constexpr (!std::is_same_v<TResponse, TEvKeyValue::TEvGetStatusResponse>) { + if constexpr (!std::is_same_v<TResponse, TEvKeyValue::TEvGetStorageChannelStatusResponse>) { if (intermediate->HasCookie) { response->Record.set_cookie(intermediate->Cookie); } @@ -593,9 +593,9 @@ public: THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info); TPrepareResult PrepareOneGetStatus(TIntermediate::TGetStatus &cmd, ui64 publicStorageChannel, const TTabletStorageInfo *info); - bool PrepareGetStatusRequest(const TActorContext &ctx, TEvKeyValue::TEvGetStatus::TPtr &ev, + bool PrepareGetStorageChannelStatusRequest(const TActorContext &ctx, TEvKeyValue::TEvGetStorageChannelStatus::TPtr &ev, THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info); - bool PrepareObtainLockRequest(const TActorContext &ctx, TEvKeyValue::TEvObtainLock::TPtr &ev, + bool PrepareAcquireLockRequest(const TActorContext &ctx, TEvKeyValue::TEvAcquireLock::TPtr &ev, THolder<TIntermediate> &intermediate); template <typename TRequestType> diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp index 3adab0a863..182a2c513c 100644 --- a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp @@ -131,11 +131,12 @@ public: STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KV322, "Expected OK or UNKNOWN and given " << NKikimrProto::EReplyStatus_Name(status) << " readCount# " << readCount); - NKikimrKeyValue::Statuses::ReplyStatus replyStatus = ConvertStatus(status); - if (!readCount) { - replyStatus = NKikimrKeyValue::Statuses::RSTATUS_NO_DATA; - } else if (status == NKikimrProto::UNKNOWN) { + + NKikimrKeyValue::Statuses::ReplyStatus replyStatus; + if (status == NKikimrProto::UNKNOWN || status == NKikimrProto::NODATA) { replyStatus = NKikimrKeyValue::Statuses::RSTATUS_OK; + } else { + replyStatus = ConvertStatus(status); } SendResponseAndPassAway(replyStatus); @@ -381,6 +382,10 @@ public: response->Record.set_requested_size(interRead.RequestedSize); response->Record.set_value(interRead.Value); + if (IntermediateResult->RespondTo.NodeId() != SelfId().NodeId()) { + response->Record.set_node_id(SelfId().NodeId()); + } + return response; } @@ -389,8 +394,6 @@ public: return NKikimrKeyValue::Statuses::RSTATUS_OK; } else if (status == NKikimrProto::OVERRUN) { return NKikimrKeyValue::Statuses::RSTATUS_OVERRUN; - } else if (status == NKikimrProto::NODATA) { - return NKikimrKeyValue::Statuses::RSTATUS_NO_DATA; } else { return NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR; } @@ -432,6 +435,10 @@ public: } readRangeResult.set_status(status); + if (IntermediateResult->RespondTo.NodeId() != SelfId().NodeId()) { + readRangeResult.set_node_id(SelfId().NodeId()); + } + return response; } diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp index 3d33155ad5..9d4c716e64 100644 --- a/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp @@ -450,7 +450,7 @@ Y_UNIT_TEST(ReadRangeNoData) { TRangeReadRequestBuilder builder; - RunTest(env, builder, groupIds, NKikimrKeyValue::Statuses::RSTATUS_NO_DATA); + RunTest(env, builder, groupIds, NKikimrKeyValue::Statuses::RSTATUS_OK); } } // Y_UNIT_TEST_SUITE(KeyValueReadStorage) diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp index 0e41b9c9fb..7b46ebec58 100644 --- a/ydb/core/keyvalue/keyvalue_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_ut.cpp @@ -793,7 +793,7 @@ void ExecuteReadRange(TTestContext &tc, template <NKikimrKeyValue::Statuses::ReplyStatus ExpectedStatus = NKikimrKeyValue::Statuses::RSTATUS_OK> void ExecuteGetStatus(TTestContext &tc, const TDeque<ui32> &channels, ui64 lock_generation) { - TDesiredPair<TEvKeyValue::TEvGetStatus> dp; + TDesiredPair<TEvKeyValue::TEvGetStorageChannelStatus> dp; dp.Request.set_lock_generation(lock_generation); dp.Request.set_tablet_id(tc.TabletId); for (ui32 channel : channels) { @@ -806,14 +806,14 @@ void ExecuteGetStatus(TTestContext &tc, const TDeque<ui32> &channels, ui64 lock_ << " exp# " << NKikimrKeyValue::Statuses_ReplyStatus_Name(ExpectedStatus) << " msg# " << dp.Response.msg()); if constexpr (ExpectedStatus == NKikimrKeyValue::Statuses::RSTATUS_OK) { - for (auto &channel : dp.Response.channel()) { + for (auto &channel : dp.Response.storage_channel()) { UNIT_ASSERT(channel.status() == NKikimrKeyValue::Statuses::RSTATUS_OK); } } } void ExecuteObtainLock(TTestContext &tc, ui64 expectedLockGeneration) { - TDesiredPair<TEvKeyValue::TEvObtainLock> dp; + TDesiredPair<TEvKeyValue::TEvAcquireLock> dp; dp.Request.set_tablet_id(tc.TabletId); ExecuteEvent(dp, tc); UNIT_ASSERT_VALUES_EQUAL(dp.Response.lock_generation(), expectedLockGeneration); @@ -917,7 +917,7 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEvents) { TDispatchOptions options; options.FinalEvents.push_back(TKikimrEvents::TEvPoisonPill::EventType); tc.Runtime->DispatchEvents(options); - ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NO_DATA>(tc, "key", "", 0, 0, 0); + ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND>(tc, "key", "", 0, 0, 0); ExecuteRead(tc, "key1", "value1", 0, 0, 0); } @@ -931,7 +931,7 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsThenResponseOkWithNewApi) { ExecuteWrite(tc, {{"key", "value"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); ExecuteRead(tc, "key", "value", 0, 0, 0); ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0); - ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NO_DATA>(tc, "key", "", 0, 0, 0); + ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND>(tc, "key", "", 0, 0, 0); }); } @@ -1086,7 +1086,7 @@ Y_UNIT_TEST(TestInlineWriteReadDeleteWithRestartsThenResponseOkNewApi) { ExecuteWrite(tc, {{"key", "value"}}, 0, 1, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); ExecuteRead(tc, "key", "value", 0, 0, 0); ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0); - ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NO_DATA>(tc, "key", "", 0, 0, 0); + ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND>(tc, "key", "", 0, 0, 0); }); } @@ -1362,8 +1362,8 @@ Y_UNIT_TEST(TestInlineWriteReadWithRestartsWithNotCorrectUTF8NewApi) { TKeyValuePair pair{TString("key1\0"), TString("value")}; expectedPairs.push_back({TString("key1\\0"), TString()}); ExecuteWrite(tc, {pair}, 0, 1, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); - - ExecuteReadRange<NKikimrKeyValue::Statuses::RSTATUS_NO_DATA>(tc, "key0", EBorderKind::Include, + expectedPairs.clear(); + ExecuteReadRange<NKikimrKeyValue::Statuses::RSTATUS_OK>(tc, "key0", EBorderKind::Include, "key1", EBorderKind::Exclude, expectedPairs, 0, true, 0); }); } @@ -1413,7 +1413,7 @@ Y_UNIT_TEST(TestEmptyWriteReadDeleteWithRestartsThenResponseOkNewApi) { ExecuteReadRange(tc, "", EBorderKind::Without, "", EBorderKind::Without, pairs, 0, true, 0); ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0); - ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NO_DATA>(tc, "key", "", 0, 0, 0); + ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND>(tc, "key", "", 0, 0, 0); }); } @@ -1462,7 +1462,7 @@ Y_UNIT_TEST(TestInlineEmptyWriteReadDeleteWithRestartsThenResponseOkNewApi) { ExecuteReadRange(tc, "", EBorderKind::Without, "", EBorderKind::Without, pairs, 0, true, 0); ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0); - ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NO_DATA>(tc, "key", "", 0, 0, 0); + ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND>(tc, "key", "", 0, 0, 0); }); } @@ -2188,7 +2188,7 @@ Y_UNIT_TEST(TestObtainLockNewApi) { }, [&](const TString &dispatchName, std::function<void(TTestActorRuntime&)> setup, bool &activeZone) { TFinalizer finalizer(tc); tc.Prepare(dispatchName, setup, activeZone); - constexpr NKikimrKeyValue::Statuses::ReplyStatus status = NKikimrKeyValue::Statuses::RSTATUS_ERROR; + constexpr NKikimrKeyValue::Statuses::ReplyStatus status = NKikimrKeyValue::Statuses::RSTATUS_WRONG_LOCK_GENERATION; ExecuteObtainLock(tc, 1); ExecuteGetStatus<status>(tc, {1, 2}, 0); diff --git a/ydb/core/keyvalue/protos/events.proto b/ydb/core/keyvalue/protos/events.proto index fa365981ae..0d3d9a4b07 100644 --- a/ydb/core/keyvalue/protos/events.proto +++ b/ydb/core/keyvalue/protos/events.proto @@ -26,11 +26,15 @@ message Statuses { RSTATUS_ERROR = 2; RSTATUS_TIMEOUT = 3; RSTATUS_INTERNAL_ERROR = 4; - RSTATUS_NO_DATA = 5; + RSTATUS_NOT_FOUND = 5; RSTATUS_OVERRUN = 6; + RSTATUS_WRONG_LOCK_GENERATION = 7; } } +message StorageStatusFlags { +} + message Flags { bool disk_space_cyan = 1; bool disk_space_light_yellow_move = 2; @@ -41,28 +45,34 @@ message Flags { bool disk_space_black = 7; } -message Channel { +message StorageChannel { + enum StatusFlag { + STATUS_FLAG_UNSPECIFIED = 0; + STATUS_FLAG_GREEN = 10; + STATUS_FLAG_YELLOW_STOP = 20; + STATUS_FLAG_ORANGE_OUT_SPACE = 30; + } Statuses.ReplyStatus status = 1; uint32 storage_channel = 2; - optional Flags status_flags = 3; + StatusFlag status_flag = 3; } message KVRange { oneof from_bound { - bytes from_key_inclusive = 1; - bytes from_key_exclusive = 2; + string from_key_inclusive = 1; + string from_key_exclusive = 2; } oneof to_bound { - bytes to_key_inclusive = 3; - bytes to_key_exclusive = 4; + string to_key_inclusive = 3; + string to_key_exclusive = 4; } } message ReadRequest { uint64 tablet_id = 1; - uint64 lock_generation = 2; - bytes key = 3; + optional uint64 lock_generation = 2; + string key = 3; uint64 offset = 4; uint64 size = 5; uint64 cookie = 6; @@ -72,18 +82,19 @@ message ReadRequest { } message ReadResult { - bytes requested_key = 1; + string requested_key = 1; uint64 requested_offset = 2; uint64 requested_size = 3; bytes value = 4; string msg = 5; Statuses.ReplyStatus status = 6; uint64 cookie = 7; + uint64 node_id = 8; } message ReadRangeRequest { uint64 tablet_id = 1; - uint64 lock_generation = 2; + optional uint64 lock_generation = 2; KVRange range = 3; @@ -96,7 +107,7 @@ message ReadRangeRequest { message ReadRangeResult { message KeyValuePair { - bytes key = 1; + string key = 1; bytes value = 2; uint32 value_size = 3; @@ -109,23 +120,24 @@ message ReadRangeResult { string msg = 2; repeated KeyValuePair pair = 3; uint64 cookie = 4; + uint64 node_id = 5; } message ExecuteTransactionRequest { message Command { message Rename { - bytes old_key = 1; - bytes new_key = 2; + string old_key = 1; + string new_key = 2; } message Concat { - repeated bytes input_keys = 1; - bytes output_key = 2; + repeated string input_keys = 1; + string output_key = 2; bool keep_inputs = 3; } message CopyRange { KVRange range = 1; - bytes prefix_to_remove = 2; - bytes prefix_to_add = 3; + string prefix_to_remove = 2; + string prefix_to_add = 3; } message Write { enum Tactic { @@ -133,7 +145,7 @@ message ExecuteTransactionRequest { TACTIC_MAX_THROUGHPUT = 1; TACTIC_MIN_LATENCY = 2; } - bytes key = 1; + string key = 1; bytes value = 2; uint32 storage_channel = 3; Priorities.Priority priority = 4; @@ -153,38 +165,41 @@ message ExecuteTransactionRequest { } uint64 tablet_id = 1; - uint64 lock_generation = 2; + optional uint64 lock_generation = 2; repeated Command commands = 3; uint64 cookie = 4; uint64 deadline_instant_ms = 5; } message ExecuteTransactionResult { - repeated Channel channel = 1; + repeated StorageChannel storage_channel = 1; Statuses.ReplyStatus status = 2; string msg = 3; uint64 cookie = 4; + uint64 node_id = 5; } -message GetStatusRequest { +message GetStorageChannelStatusRequest { uint64 tablet_id = 1; - uint64 lock_generation = 2; + optional uint64 lock_generation = 2; repeated uint32 storage_channel = 3; uint64 deadline_instant_ms = 4; } -message GetStatusResult { - repeated Channel channel = 1; +message GetStorageChannelStatusResult { + repeated StorageChannel storage_channel = 1; Statuses.ReplyStatus status = 2; string msg = 3; + uint64 node_id = 4; } -message ObtainLockRequest { +message AcquireLockRequest { uint64 tablet_id = 1; uint64 cookie = 2; } -message ObtainLockResult { +message AcquireLockResult { uint64 lock_generation = 1; uint64 cookie = 2; + uint64 node_id = 3; } diff --git a/ydb/core/test_tablet/load_actor_read_validate.cpp b/ydb/core/test_tablet/load_actor_read_validate.cpp index f6fc171f1e..cfeaf00b0d 100644 --- a/ydb/core/test_tablet/load_actor_read_validate.cpp +++ b/ydb/core/test_tablet/load_actor_read_validate.cpp @@ -227,7 +227,6 @@ namespace NKikimr::NTestShard { } Y_VERIFY_S(status == NKikimrKeyValue::Statuses::RSTATUS_OK - || status == NKikimrKeyValue::Statuses::RSTATUS_NO_DATA || status == NKikimrKeyValue::Statuses::RSTATUS_OVERRUN, "TabletId# " << TabletId << " CmdReadRange failed" << " Status# " << NKikimrKeyValue::Statuses_ReplyStatus_Name(status)); |