aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@yandex-team.ru>2022-06-06 14:51:42 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-06-06 14:51:42 +0300
commit502f43be9df33c1a73e433ec7f46e6121c751db9 (patch)
treebe8f9fd85e0f6ce7da59e86c48b2ee1a08661914
parentf3748b98db7af1f1fd7e345bb6e8d452387d3366 (diff)
downloadydb-502f43be9df33c1a73e433ec7f46e6121c751db9.tar.gz
merge from trunk: r9240076, r9244096, r9255567, r9287393, r9386382; KIKIMR-13253 KV GRPC API
REVIEW: 2498910 x-ydb-stable-ref: f305413c20c64e5a0157a85c3672d04ef169ac6f
-rw-r--r--ydb/core/keyvalue/keyvalue_events.h40
-rw-r--r--ydb/core/keyvalue/keyvalue_flat_impl.h12
-rw-r--r--ydb/core/keyvalue/keyvalue_intermediate.h4
-rw-r--r--ydb/core/keyvalue/keyvalue_state.cpp100
-rw-r--r--ydb/core/keyvalue/keyvalue_state.h28
-rw-r--r--ydb/core/keyvalue/keyvalue_storage_read_request.cpp19
-rw-r--r--ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp2
-rw-r--r--ydb/core/keyvalue/keyvalue_ut.cpp22
-rw-r--r--ydb/core/keyvalue/protos/events.proto69
-rw-r--r--ydb/core/test_tablet/load_actor_read_validate.cpp1
10 files changed, 156 insertions, 141 deletions
diff --git a/ydb/core/keyvalue/keyvalue_events.h b/ydb/core/keyvalue/keyvalue_events.h
index 03bc81ad8f..2c8c23256a 100644
--- a/ydb/core/keyvalue/keyvalue_events.h
+++ b/ydb/core/keyvalue/keyvalue_events.h
@@ -27,16 +27,16 @@ struct TEvKeyValue {
EvRead = EvRequest + 16,
EvReadRange,
EvExecuteTransaction,
- EvGetStatus,
- EvObtainLock,
+ EvGetStorageChannelStatus,
+ EvAcquireLock,
EvResponse = EvRequest + 512,
EvReadResponse = EvResponse + 16,
EvReadRangeResponse,
EvExecuteTransactionResponse,
- EvGetStatusResponse,
- EvObtainLockResponse,
+ EvGetStorageChannelStatusResponse,
+ EvAcquireLockResponse,
EvEnd
};
@@ -87,32 +87,32 @@ struct TEvKeyValue {
TEvExecuteTransactionResponse() { }
};
- struct TEvGetStatusResponse;
+ struct TEvGetStorageChannelStatusResponse;
- struct TEvGetStatus : public TEventPB<TEvGetStatus,
- NKikimrKeyValue::GetStatusRequest, EvGetStatus> {
+ struct TEvGetStorageChannelStatus : public TEventPB<TEvGetStorageChannelStatus,
+ NKikimrKeyValue::GetStorageChannelStatusRequest, EvGetStorageChannelStatus> {
- using TResponse = TEvGetStatusResponse;
- TEvGetStatus() { }
+ using TResponse = TEvGetStorageChannelStatusResponse;
+ TEvGetStorageChannelStatus() { }
};
- struct TEvGetStatusResponse : public TEventPB<TEvGetStatusResponse,
- NKikimrKeyValue::GetStatusResult, EvGetStatusResponse> {
- TEvGetStatusResponse() { }
+ struct TEvGetStorageChannelStatusResponse : public TEventPB<TEvGetStorageChannelStatusResponse,
+ NKikimrKeyValue::GetStorageChannelStatusResult, EvGetStorageChannelStatusResponse> {
+ TEvGetStorageChannelStatusResponse() { }
};
- struct TEvObtainLockResponse;
+ struct TEvAcquireLockResponse;
- struct TEvObtainLock : public TEventPB<TEvObtainLock,
- NKikimrKeyValue::ObtainLockRequest, EvObtainLock> {
+ struct TEvAcquireLock : public TEventPB<TEvAcquireLock,
+ NKikimrKeyValue::AcquireLockRequest, EvAcquireLock> {
- using TResponse = TEvObtainLockResponse;
- TEvObtainLock() { }
+ using TResponse = TEvAcquireLockResponse;
+ TEvAcquireLock() { }
};
- struct TEvObtainLockResponse : public TEventPB<TEvObtainLockResponse,
- NKikimrKeyValue::ObtainLockResult, EvObtainLockResponse> {
- TEvObtainLockResponse() { }
+ struct TEvAcquireLockResponse : public TEventPB<TEvAcquireLockResponse,
+ NKikimrKeyValue::AcquireLockResult, EvAcquireLockResponse> {
+ TEvAcquireLockResponse() { }
};
struct TEvRequest : public TEventPB<TEvRequest,
diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h
index d5c52a1bd7..859e07d4de 100644
--- a/ydb/core/keyvalue/keyvalue_flat_impl.h
+++ b/ydb/core/keyvalue/keyvalue_flat_impl.h
@@ -293,12 +293,12 @@ protected:
State.OnEvExecuteTransaction(ev, TActivationContext::AsActorContext(), Info());
}
- void Handle(TEvKeyValue::TEvGetStatus::TPtr &ev) {
- State.OnEvGetStatus(ev, TActivationContext::AsActorContext(), Info());
+ void Handle(TEvKeyValue::TEvGetStorageChannelStatus::TPtr &ev) {
+ State.OnEvGetStorageChannelStatus(ev, TActivationContext::AsActorContext(), Info());
}
- void Handle(TEvKeyValue::TEvObtainLock::TPtr &ev) {
- State.OnEvObtainLock(ev, TActivationContext::AsActorContext(), Info());
+ void Handle(TEvKeyValue::TEvAcquireLock::TPtr &ev) {
+ State.OnEvAcquireLock(ev, TActivationContext::AsActorContext(), Info());
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -492,8 +492,8 @@ public:
hFunc(TEvKeyValue::TEvRead, Handle);
hFunc(TEvKeyValue::TEvReadRange, Handle);
hFunc(TEvKeyValue::TEvExecuteTransaction, Handle);
- hFunc(TEvKeyValue::TEvGetStatus, Handle);
- hFunc(TEvKeyValue::TEvObtainLock, Handle);
+ hFunc(TEvKeyValue::TEvGetStorageChannelStatus, Handle);
+ hFunc(TEvKeyValue::TEvAcquireLock, Handle);
HFunc(TEvKeyValue::TEvEraseCollect, Handle);
HFunc(TEvKeyValue::TEvCollect, Handle);
diff --git a/ydb/core/keyvalue/keyvalue_intermediate.h b/ydb/core/keyvalue/keyvalue_intermediate.h
index 38698a9c73..ef52c37a63 100644
--- a/ydb/core/keyvalue/keyvalue_intermediate.h
+++ b/ydb/core/keyvalue/keyvalue_intermediate.h
@@ -152,9 +152,9 @@ struct TIntermediate {
NKikimrClient::TResponse Response;
NKikimrKeyValue::ExecuteTransactionResult ExecuteTransactionResponse;
- NKikimrKeyValue::GetStatusResult GetStatusResponse;
+ NKikimrKeyValue::GetStorageChannelStatusResult GetStorageChannelStatusResponse;
- THashMap<ui32, NKikimrKeyValue::Channel*> Channels;
+ THashMap<ui32, NKikimrKeyValue::StorageChannel*> Channels;
ui32 EvType = 0;
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp
index 2c4246d21e..a196511633 100644
--- a/ydb/core/keyvalue/keyvalue_state.cpp
+++ b/ydb/core/keyvalue/keyvalue_state.cpp
@@ -874,19 +874,28 @@ void TKeyValueState::Reply(THolder<TIntermediate> &intermediate, const TActorCon
THolder<TEvKeyValue::TEvExecuteTransactionResponse> response(new TEvKeyValue::TEvExecuteTransactionResponse);
response->Record = intermediate->ExecuteTransactionResponse;
ResourceMetrics->Network.Increment(response->Record.ByteSize());
+ if (intermediate->RespondTo.NodeId() != ctx.SelfID.NodeId()) {
+ response->Record.set_node_id(ctx.SelfID.NodeId());
+ }
ctx.Send(intermediate->RespondTo, response.Release());
}
- if (intermediate->EvType == TEvKeyValue::TEvGetStatus::EventType) {
- THolder<TEvKeyValue::TEvGetStatusResponse> response(new TEvKeyValue::TEvGetStatusResponse);
- response->Record = intermediate->GetStatusResponse;
+ if (intermediate->EvType == TEvKeyValue::TEvGetStorageChannelStatus::EventType) {
+ THolder<TEvKeyValue::TEvGetStorageChannelStatusResponse> response(new TEvKeyValue::TEvGetStorageChannelStatusResponse);
+ response->Record = intermediate->GetStorageChannelStatusResponse;
ResourceMetrics->Network.Increment(response->Record.ByteSize());
+ if (intermediate->RespondTo.NodeId() != ctx.SelfID.NodeId()) {
+ response->Record.set_node_id(ctx.SelfID.NodeId());
+ }
ctx.Send(intermediate->RespondTo, response.Release());
}
- if (intermediate->EvType == TEvKeyValue::TEvObtainLock::EventType) {
- THolder<TEvKeyValue::TEvObtainLockResponse> response(new TEvKeyValue::TEvObtainLockResponse);
+ if (intermediate->EvType == TEvKeyValue::TEvAcquireLock::EventType) {
+ THolder<TEvKeyValue::TEvAcquireLockResponse> response(new TEvKeyValue::TEvAcquireLockResponse);
response->Record.set_lock_generation(StoredState.GetUserGeneration());
response->Record.set_cookie(intermediate->Cookie);
ResourceMetrics->Network.Increment(response->Record.ByteSize());
+ if (intermediate->RespondTo.NodeId() != ctx.SelfID.NodeId()) {
+ response->Record.set_node_id(ctx.SelfID.NodeId());
+ }
ctx.Send(intermediate->RespondTo, response.Release());
}
intermediate->IsReplied = true;
@@ -901,7 +910,7 @@ void TKeyValueState::Reply(THolder<TIntermediate> &intermediate, const TActorCon
void TKeyValueState::ProcessCmd(TIntermediate::TRead &request,
NKikimrClient::TKeyValueResponse::TReadResult *legacyResponse,
- NKikimrKeyValue::Channel */*response*/,
+ NKikimrKeyValue::StorageChannel */*response*/,
ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/,
TIntermediate* /*intermediate*/)
{
@@ -941,7 +950,7 @@ void TKeyValueState::ProcessCmd(TIntermediate::TRead &request,
void TKeyValueState::ProcessCmd(TIntermediate::TRangeRead &request,
NKikimrClient::TKeyValueResponse::TReadRangeResult *legacyResponse,
- NKikimrKeyValue::Channel */*response*/,
+ NKikimrKeyValue::StorageChannel */*response*/,
ISimpleDb &/*db*/, const TActorContext &/*ctx*/, TRequestStat &/*stat*/, ui64 /*unixTime*/,
TIntermediate* /*intermediate*/)
{
@@ -998,33 +1007,19 @@ void TKeyValueState::ProcessCmd(TIntermediate::TRangeRead &request,
}
-void SetStatusFlags(NKikimrKeyValue::Flags *flags, const TStorageStatusFlags &statusFlags) {
- if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceCyan)) {
- flags->set_disk_space_cyan(true);
- }
- if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove)) {
- flags->set_disk_space_light_yellow_move(true);
- }
- if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceYellowStop)) {
- flags->set_disk_space_yellow_stop(true);
- }
- if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceLightOrange)) {
- flags->set_disk_space_light_orange(true);
- }
+NKikimrKeyValue::StorageChannel::StatusFlag GetStatusFlag(const TStorageStatusFlags &statusFlags) {
if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceOrange)) {
- flags->set_disk_space_orange(true);
+ return NKikimrKeyValue::StorageChannel::STATUS_FLAG_ORANGE_OUT_SPACE;
}
- if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceRed)) {
- flags->set_disk_space_red(true);
- }
- if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceBlack)) {
- flags->set_disk_space_black(true);
+ if (statusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceOrange)) {
+ return NKikimrKeyValue::StorageChannel::STATUS_FLAG_YELLOW_STOP;
}
+ return NKikimrKeyValue::StorageChannel::STATUS_FLAG_GREEN;
}
void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request,
NKikimrClient::TKeyValueResponse::TWriteResult *legacyResponse,
- NKikimrKeyValue::Channel *response,
+ NKikimrKeyValue::StorageChannel *response,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime,
TIntermediate* /*intermediate*/)
{
@@ -1068,15 +1063,14 @@ void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request,
}
if (response) {
response->set_status(NKikimrKeyValue::Statuses::RSTATUS_OK);
- auto *flags = response->mutable_status_flags();
- SetStatusFlags(flags, request.StatusFlags);
+ response->set_status_flag(GetStatusFlag(request.StatusFlags));
response->set_storage_channel(storage_channel);
}
}
void TKeyValueState::ProcessCmd(const TIntermediate::TDelete &request,
NKikimrClient::TKeyValueResponse::TDeleteRangeResult *legacyResponse,
- NKikimrKeyValue::Channel */*response*/,
+ NKikimrKeyValue::StorageChannel */*response*/,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 /*unixTime*/,
TIntermediate* /*intermediate*/)
{
@@ -1095,7 +1089,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TDelete &request,
void TKeyValueState::ProcessCmd(const TIntermediate::TRename &request,
NKikimrClient::TKeyValueResponse::TRenameResult *legacyResponse,
- NKikimrKeyValue::Channel */*response*/,
+ NKikimrKeyValue::StorageChannel */*response*/,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime,
TIntermediate* /*intermediate*/)
{
@@ -1120,7 +1114,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TRename &request,
void TKeyValueState::ProcessCmd(const TIntermediate::TCopyRange &request,
NKikimrClient::TKeyValueResponse::TCopyRangeResult *legacyResponse,
- NKikimrKeyValue::Channel */*response*/,
+ NKikimrKeyValue::StorageChannel */*response*/,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 /*unixTime*/,
TIntermediate *intermediate)
{
@@ -1156,7 +1150,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TCopyRange &request,
void TKeyValueState::ProcessCmd(const TIntermediate::TConcat &request,
NKikimrClient::TKeyValueResponse::TConcatResult *legacyResponse,
- NKikimrKeyValue::Channel */*response*/,
+ NKikimrKeyValue::StorageChannel */*response*/,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime,
TIntermediate *intermediate)
{
@@ -1264,8 +1258,8 @@ void TKeyValueState::CmdGetStatus(THolder<TIntermediate> &intermediate, ISimpleD
response.SetStatus(request.Status);
response.SetStorageChannel(request.StorageChannel);
response.SetStatusFlags(request.StatusFlags.Raw);
- } else if ((intermediate->EvType == TEvKeyValue::TEvGetStatus::EventType)) {
- auto response = intermediate->GetStatusResponse.add_channel();
+ } else if ((intermediate->EvType == TEvKeyValue::TEvGetStorageChannelStatus::EventType)) {
+ auto response = intermediate->GetStorageChannelStatusResponse.add_storage_channel();
if (request.Status == NKikimrProto::OK) {
response->set_status(NKikimrKeyValue::Statuses::RSTATUS_OK);
@@ -1281,10 +1275,10 @@ void TKeyValueState::CmdGetStatus(THolder<TIntermediate> &intermediate, ISimpleD
response->set_storage_channel(request.StorageChannel - BLOB_CHANNEL + MainStorageChannelInPublicApi);
}
- SetStatusFlags(response->mutable_status_flags(), request.StatusFlags);
+ response->set_status_flag(GetStatusFlag(request.StatusFlags));
}
}
- intermediate->GetStatusResponse.set_status(NKikimrKeyValue::Statuses::RSTATUS_OK);
+ intermediate->GetStorageChannelStatusResponse.set_status(NKikimrKeyValue::Statuses::RSTATUS_OK);
}
void TKeyValueState::CmdCopyRange(THolder<TIntermediate>& intermediate, ISimpleDb& db, const TActorContext& ctx) {
@@ -1340,7 +1334,7 @@ void TKeyValueState::CmdSetExecutorFastLogPolicy(THolder<TIntermediate> &interme
void TKeyValueState::CmdCmds(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx) {
ui64 unixTime = TAppData::TimeProvider->Now().Seconds();
bool wasWrite = false;
- auto getChannel = [&](auto &cmd) -> NKikimrKeyValue::Channel* {
+ auto getChannel = [&](auto &cmd) -> NKikimrKeyValue::StorageChannel* {
using Type = std::decay_t<decltype(cmd)>;
if constexpr (std::is_same_v<Type, TIntermediate::TWrite>) {
if (intermediate->EvType != TEvKeyValue::TEvExecuteTransaction::EventType) {
@@ -1355,7 +1349,7 @@ void TKeyValueState::CmdCmds(THolder<TIntermediate> &intermediate, ISimpleDb &db
}
auto it = intermediate->Channels.find(storageChannel);
if (it == intermediate->Channels.end()) {
- auto channel = intermediate->ExecuteTransactionResponse.add_channel();
+ auto channel = intermediate->ExecuteTransactionResponse.add_storage_channel();
intermediate->Channels.emplace(storageChannel, channel);
return channel;
}
@@ -2605,7 +2599,7 @@ bool TKeyValueState::PrepareReadRequest(const TActorContext &ctx, TEvKeyValue::T
response.Status = NKikimrProto::NODATA;
response.Message = "No such key Marker# KV55";
ReplyError<TEvKeyValue::TEvReadResponse>(ctx, response.Message,
- NKikimrKeyValue::Statuses::RSTATUS_NO_DATA, intermediate);
+ NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND, intermediate);
return false;
}
bool isOverRun = PrepareOneRead<NKikimrKeyValue::Priorities, true, ReadResultSizeEstimationNewApi>(
@@ -2750,12 +2744,12 @@ TKeyValueState::TPrepareResult TKeyValueState::PrepareOneGetStatus(TIntermediate
}
-bool TKeyValueState::PrepareGetStatusRequest(const TActorContext &ctx, TEvKeyValue::TEvGetStatus::TPtr &ev,
+bool TKeyValueState::PrepareGetStorageChannelStatusRequest(const TActorContext &ctx, TEvKeyValue::TEvGetStorageChannelStatus::TPtr &ev,
THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info)
{
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " PrepareGetStatusRequest Marker# KV78");
+ LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " PrepareGetStorageChannelStatusRequest Marker# KV78");
- NKikimrKeyValue::GetStatusRequest &request = ev->Get()->Record;
+ NKikimrKeyValue::GetStorageChannelStatusRequest &request = ev->Get()->Record;
StoredState.SetChannelGeneration(ExecutorGeneration);
StoredState.SetChannelStep(NextLogoBlobStep - 1);
@@ -2766,7 +2760,7 @@ bool TKeyValueState::PrepareGetStatusRequest(const TActorContext &ctx, TEvKeyVal
intermediate->RequestUid = NextRequestUid;
++NextRequestUid;
RequestInputTime[intermediate->RequestUid] = TAppData::TimeProvider->Now();
- intermediate->EvType = TEvKeyValue::TEvGetStatus::EventType;
+ intermediate->EvType = TEvKeyValue::TEvGetStorageChannelStatus::EventType;
if (CheckDeadline(ctx, ev->Get(), intermediate)) {
return false;
@@ -2786,10 +2780,10 @@ bool TKeyValueState::PrepareGetStatusRequest(const TActorContext &ctx, TEvKeyVal
return true;
}
-bool TKeyValueState::PrepareObtainLockRequest(const TActorContext &ctx, TEvKeyValue::TEvObtainLock::TPtr &ev,
+bool TKeyValueState::PrepareAcquireLockRequest(const TActorContext &ctx, TEvKeyValue::TEvAcquireLock::TPtr &ev,
THolder<TIntermediate> &intermediate)
{
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " PrepareObtainLockRequest Marker# KV79");
+ LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " PrepareAcquireLockRequest Marker# KV79");
StoredState.SetChannelGeneration(ExecutorGeneration);
StoredState.SetChannelStep(NextLogoBlobStep - 1);
@@ -2801,7 +2795,7 @@ bool TKeyValueState::PrepareObtainLockRequest(const TActorContext &ctx, TEvKeyVa
intermediate->RequestUid = NextRequestUid;
++NextRequestUid;
RequestInputTime[intermediate->RequestUid] = TAppData::TimeProvider->Now();
- intermediate->EvType = TEvKeyValue::TEvObtainLock::EventType;
+ intermediate->EvType = TEvKeyValue::TEvAcquireLock::EventType;
intermediate->HasIncrementGeneration = true;
return true;
}
@@ -2931,7 +2925,7 @@ void TKeyValueState::OnEvExecuteTransaction(TEvKeyValue::TEvExecuteTransaction::
}
}
-void TKeyValueState::OnEvGetStatus(TEvKeyValue::TEvGetStatus::TPtr &ev, const TActorContext &ctx,
+void TKeyValueState::OnEvGetStorageChannelStatus(TEvKeyValue::TEvGetStorageChannelStatus::TPtr &ev, const TActorContext &ctx,
const TTabletStorageInfo *info)
{
THolder<TIntermediate> intermediate;
@@ -2942,10 +2936,10 @@ void TKeyValueState::OnEvGetStatus(TEvKeyValue::TEvGetStatus::TPtr &ev, const TA
TRequestType::EType requestType = TRequestType::ReadOnlyInline;
CountRequestIncoming(requestType);
- if (PrepareGetStatusRequest(ctx, ev, intermediate, info)) {
+ if (PrepareGetStorageChannelStatusRequest(ctx, ev, intermediate, info)) {
++InFlightForStep[StoredState.GetChannelStep()];
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
- << " Create GetStatus request, Marker# KV75");
+ << " Create GetStorageChannelStatus request, Marker# KV75");
RegisterRequestActor(ctx, std::move(intermediate), info);
++RoInlineIntermediatesInFlight;
CountRequestTakeOffOrEnqueue(requestType);
@@ -2955,7 +2949,7 @@ void TKeyValueState::OnEvGetStatus(TEvKeyValue::TEvGetStatus::TPtr &ev, const TA
}
}
-void TKeyValueState::OnEvObtainLock(TEvKeyValue::TEvObtainLock::TPtr &ev, const TActorContext &ctx,
+void TKeyValueState::OnEvAcquireLock(TEvKeyValue::TEvAcquireLock::TPtr &ev, const TActorContext &ctx,
const TTabletStorageInfo *info)
{
THolder<TIntermediate> intermediate;
@@ -2966,10 +2960,10 @@ void TKeyValueState::OnEvObtainLock(TEvKeyValue::TEvObtainLock::TPtr &ev, const
TRequestType::EType requestType = TRequestType::ReadOnlyInline;
CountRequestIncoming(requestType);
- if (PrepareObtainLockRequest(ctx, ev, intermediate)) {
+ if (PrepareAcquireLockRequest(ctx, ev, intermediate)) {
++InFlightForStep[StoredState.GetChannelStep()];
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
- << " Create ObtainLock request, Marker# KV80");
+ << " Create AcquireLock request, Marker# KV80");
RegisterRequestActor(ctx, std::move(intermediate), info);
++RoInlineIntermediatesInFlight;
CountRequestTakeOffOrEnqueue(requestType);
diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h
index 2c1c4ccf1f..ffdc1f7250 100644
--- a/ydb/core/keyvalue/keyvalue_state.h
+++ b/ydb/core/keyvalue/keyvalue_state.h
@@ -342,31 +342,31 @@ public:
void Reply(THolder<TIntermediate> &intermediate, const TActorContext &ctx, const TTabletStorageInfo *info);
void ProcessCmd(TIntermediate::TRead &read,
NKikimrClient::TKeyValueResponse::TReadResult *legacyResponse,
- NKikimrKeyValue::Channel *response,
+ NKikimrKeyValue::StorageChannel *response,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate);
void ProcessCmd(TIntermediate::TRangeRead &request,
NKikimrClient::TKeyValueResponse::TReadRangeResult *legacyResponse,
- NKikimrKeyValue::Channel *response,
+ NKikimrKeyValue::StorageChannel *response,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate);
void ProcessCmd(TIntermediate::TWrite &request,
NKikimrClient::TKeyValueResponse::TWriteResult *legacyResponse,
- NKikimrKeyValue::Channel *response,
+ NKikimrKeyValue::StorageChannel *response,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate);
void ProcessCmd(const TIntermediate::TDelete &request,
NKikimrClient::TKeyValueResponse::TDeleteRangeResult *legacyResponse,
- NKikimrKeyValue::Channel *response,
+ NKikimrKeyValue::StorageChannel *response,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate);
void ProcessCmd(const TIntermediate::TRename &request,
NKikimrClient::TKeyValueResponse::TRenameResult *legacyResponse,
- NKikimrKeyValue::Channel *response,
+ NKikimrKeyValue::StorageChannel *response,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate);
void ProcessCmd(const TIntermediate::TCopyRange &request,
NKikimrClient::TKeyValueResponse::TCopyRangeResult *legacyResponse,
- NKikimrKeyValue::Channel *response,
+ NKikimrKeyValue::StorageChannel *response,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate);
void ProcessCmd(const TIntermediate::TConcat &request,
NKikimrClient::TKeyValueResponse::TConcatResult *resplegacyResponseonse,
- NKikimrKeyValue::Channel *response,
+ NKikimrKeyValue::StorageChannel *response,
ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate);
void CmdRead(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx);
void CmdReadRange(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx);
@@ -429,9 +429,9 @@ public:
const TTabletStorageInfo *info);
void OnEvExecuteTransaction(TEvKeyValue::TEvExecuteTransaction::TPtr &ev, const TActorContext &ctx,
const TTabletStorageInfo *info);
- void OnEvGetStatus(TEvKeyValue::TEvGetStatus::TPtr &ev, const TActorContext &ctx,
+ void OnEvGetStorageChannelStatus(TEvKeyValue::TEvGetStorageChannelStatus::TPtr &ev, const TActorContext &ctx,
const TTabletStorageInfo *info);
- void OnEvObtainLock(TEvKeyValue::TEvObtainLock::TPtr &ev, const TActorContext &ctx,
+ void OnEvAcquireLock(TEvKeyValue::TEvAcquireLock::TPtr &ev, const TActorContext &ctx,
const TTabletStorageInfo *info);
void OnPeriodicRefresh(const TActorContext &ctx);
@@ -485,14 +485,14 @@ public:
auto &record = kvRequest->Record;
intermediate->HasGeneration = true;
intermediate->Generation = record.lock_generation();
- if (record.lock_generation() != StoredState.GetUserGeneration()) {
+ if (record.has_lock_generation() && record.lock_generation() != StoredState.GetUserGeneration()) {
TStringStream str;
str << "KeyValue# " << TabletId;
str << " Generation mismatch! Requested# " << record.lock_generation();
str << " Actual# " << StoredState.GetUserGeneration();
str << " Marker# KV05";
ReplyError<typename TGrpcRequestWithLockGeneration::TResponse>(ctx, str.Str(),
- NKikimrKeyValue::Statuses::RSTATUS_ERROR, intermediate);
+ NKikimrKeyValue::Statuses::RSTATUS_WRONG_LOCK_GENERATION, intermediate);
return true;
}
return false;
@@ -561,7 +561,7 @@ public:
response->Record.set_requested_size(interRead.ValueSize);
response->Record.set_requested_key(interRead.Key);
}
- if constexpr (!std::is_same_v<TResponse, TEvKeyValue::TEvGetStatusResponse>) {
+ if constexpr (!std::is_same_v<TResponse, TEvKeyValue::TEvGetStorageChannelStatusResponse>) {
if (intermediate->HasCookie) {
response->Record.set_cookie(intermediate->Cookie);
}
@@ -593,9 +593,9 @@ public:
THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info);
TPrepareResult PrepareOneGetStatus(TIntermediate::TGetStatus &cmd, ui64 publicStorageChannel,
const TTabletStorageInfo *info);
- bool PrepareGetStatusRequest(const TActorContext &ctx, TEvKeyValue::TEvGetStatus::TPtr &ev,
+ bool PrepareGetStorageChannelStatusRequest(const TActorContext &ctx, TEvKeyValue::TEvGetStorageChannelStatus::TPtr &ev,
THolder<TIntermediate> &intermediate, const TTabletStorageInfo *info);
- bool PrepareObtainLockRequest(const TActorContext &ctx, TEvKeyValue::TEvObtainLock::TPtr &ev,
+ bool PrepareAcquireLockRequest(const TActorContext &ctx, TEvKeyValue::TEvAcquireLock::TPtr &ev,
THolder<TIntermediate> &intermediate);
template <typename TRequestType>
diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
index 3adab0a863..182a2c513c 100644
--- a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
+++ b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
@@ -131,11 +131,12 @@ public:
STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KV322,
"Expected OK or UNKNOWN and given " << NKikimrProto::EReplyStatus_Name(status)
<< " readCount# " << readCount);
- NKikimrKeyValue::Statuses::ReplyStatus replyStatus = ConvertStatus(status);
- if (!readCount) {
- replyStatus = NKikimrKeyValue::Statuses::RSTATUS_NO_DATA;
- } else if (status == NKikimrProto::UNKNOWN) {
+
+ NKikimrKeyValue::Statuses::ReplyStatus replyStatus;
+ if (status == NKikimrProto::UNKNOWN || status == NKikimrProto::NODATA) {
replyStatus = NKikimrKeyValue::Statuses::RSTATUS_OK;
+ } else {
+ replyStatus = ConvertStatus(status);
}
SendResponseAndPassAway(replyStatus);
@@ -381,6 +382,10 @@ public:
response->Record.set_requested_size(interRead.RequestedSize);
response->Record.set_value(interRead.Value);
+ if (IntermediateResult->RespondTo.NodeId() != SelfId().NodeId()) {
+ response->Record.set_node_id(SelfId().NodeId());
+ }
+
return response;
}
@@ -389,8 +394,6 @@ public:
return NKikimrKeyValue::Statuses::RSTATUS_OK;
} else if (status == NKikimrProto::OVERRUN) {
return NKikimrKeyValue::Statuses::RSTATUS_OVERRUN;
- } else if (status == NKikimrProto::NODATA) {
- return NKikimrKeyValue::Statuses::RSTATUS_NO_DATA;
} else {
return NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR;
}
@@ -432,6 +435,10 @@ public:
}
readRangeResult.set_status(status);
+ if (IntermediateResult->RespondTo.NodeId() != SelfId().NodeId()) {
+ readRangeResult.set_node_id(SelfId().NodeId());
+ }
+
return response;
}
diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp
index 3d33155ad5..9d4c716e64 100644
--- a/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_storage_read_request_ut.cpp
@@ -450,7 +450,7 @@ Y_UNIT_TEST(ReadRangeNoData) {
TRangeReadRequestBuilder builder;
- RunTest(env, builder, groupIds, NKikimrKeyValue::Statuses::RSTATUS_NO_DATA);
+ RunTest(env, builder, groupIds, NKikimrKeyValue::Statuses::RSTATUS_OK);
}
} // Y_UNIT_TEST_SUITE(KeyValueReadStorage)
diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp
index 0e41b9c9fb..7b46ebec58 100644
--- a/ydb/core/keyvalue/keyvalue_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_ut.cpp
@@ -793,7 +793,7 @@ void ExecuteReadRange(TTestContext &tc,
template <NKikimrKeyValue::Statuses::ReplyStatus ExpectedStatus = NKikimrKeyValue::Statuses::RSTATUS_OK>
void ExecuteGetStatus(TTestContext &tc, const TDeque<ui32> &channels, ui64 lock_generation) {
- TDesiredPair<TEvKeyValue::TEvGetStatus> dp;
+ TDesiredPair<TEvKeyValue::TEvGetStorageChannelStatus> dp;
dp.Request.set_lock_generation(lock_generation);
dp.Request.set_tablet_id(tc.TabletId);
for (ui32 channel : channels) {
@@ -806,14 +806,14 @@ void ExecuteGetStatus(TTestContext &tc, const TDeque<ui32> &channels, ui64 lock_
<< " exp# " << NKikimrKeyValue::Statuses_ReplyStatus_Name(ExpectedStatus)
<< " msg# " << dp.Response.msg());
if constexpr (ExpectedStatus == NKikimrKeyValue::Statuses::RSTATUS_OK) {
- for (auto &channel : dp.Response.channel()) {
+ for (auto &channel : dp.Response.storage_channel()) {
UNIT_ASSERT(channel.status() == NKikimrKeyValue::Statuses::RSTATUS_OK);
}
}
}
void ExecuteObtainLock(TTestContext &tc, ui64 expectedLockGeneration) {
- TDesiredPair<TEvKeyValue::TEvObtainLock> dp;
+ TDesiredPair<TEvKeyValue::TEvAcquireLock> dp;
dp.Request.set_tablet_id(tc.TabletId);
ExecuteEvent(dp, tc);
UNIT_ASSERT_VALUES_EQUAL(dp.Response.lock_generation(), expectedLockGeneration);
@@ -917,7 +917,7 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEvents) {
TDispatchOptions options;
options.FinalEvents.push_back(TKikimrEvents::TEvPoisonPill::EventType);
tc.Runtime->DispatchEvents(options);
- ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NO_DATA>(tc, "key", "", 0, 0, 0);
+ ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND>(tc, "key", "", 0, 0, 0);
ExecuteRead(tc, "key1", "value1", 0, 0, 0);
}
@@ -931,7 +931,7 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsThenResponseOkWithNewApi) {
ExecuteWrite(tc, {{"key", "value"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
ExecuteRead(tc, "key", "value", 0, 0, 0);
ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0);
- ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NO_DATA>(tc, "key", "", 0, 0, 0);
+ ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND>(tc, "key", "", 0, 0, 0);
});
}
@@ -1086,7 +1086,7 @@ Y_UNIT_TEST(TestInlineWriteReadDeleteWithRestartsThenResponseOkNewApi) {
ExecuteWrite(tc, {{"key", "value"}}, 0, 1, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
ExecuteRead(tc, "key", "value", 0, 0, 0);
ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0);
- ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NO_DATA>(tc, "key", "", 0, 0, 0);
+ ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND>(tc, "key", "", 0, 0, 0);
});
}
@@ -1362,8 +1362,8 @@ Y_UNIT_TEST(TestInlineWriteReadWithRestartsWithNotCorrectUTF8NewApi) {
TKeyValuePair pair{TString("key1\0"), TString("value")};
expectedPairs.push_back({TString("key1\\0"), TString()});
ExecuteWrite(tc, {pair}, 0, 1, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
-
- ExecuteReadRange<NKikimrKeyValue::Statuses::RSTATUS_NO_DATA>(tc, "key0", EBorderKind::Include,
+ expectedPairs.clear();
+ ExecuteReadRange<NKikimrKeyValue::Statuses::RSTATUS_OK>(tc, "key0", EBorderKind::Include,
"key1", EBorderKind::Exclude, expectedPairs, 0, true, 0);
});
}
@@ -1413,7 +1413,7 @@ Y_UNIT_TEST(TestEmptyWriteReadDeleteWithRestartsThenResponseOkNewApi) {
ExecuteReadRange(tc, "", EBorderKind::Without, "", EBorderKind::Without, pairs, 0, true, 0);
ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0);
- ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NO_DATA>(tc, "key", "", 0, 0, 0);
+ ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND>(tc, "key", "", 0, 0, 0);
});
}
@@ -1462,7 +1462,7 @@ Y_UNIT_TEST(TestInlineEmptyWriteReadDeleteWithRestartsThenResponseOkNewApi) {
ExecuteReadRange(tc, "", EBorderKind::Without, "", EBorderKind::Without, pairs, 0, true, 0);
ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0);
- ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NO_DATA>(tc, "key", "", 0, 0, 0);
+ ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND>(tc, "key", "", 0, 0, 0);
});
}
@@ -2188,7 +2188,7 @@ Y_UNIT_TEST(TestObtainLockNewApi) {
}, [&](const TString &dispatchName, std::function<void(TTestActorRuntime&)> setup, bool &activeZone) {
TFinalizer finalizer(tc);
tc.Prepare(dispatchName, setup, activeZone);
- constexpr NKikimrKeyValue::Statuses::ReplyStatus status = NKikimrKeyValue::Statuses::RSTATUS_ERROR;
+ constexpr NKikimrKeyValue::Statuses::ReplyStatus status = NKikimrKeyValue::Statuses::RSTATUS_WRONG_LOCK_GENERATION;
ExecuteObtainLock(tc, 1);
ExecuteGetStatus<status>(tc, {1, 2}, 0);
diff --git a/ydb/core/keyvalue/protos/events.proto b/ydb/core/keyvalue/protos/events.proto
index fa365981ae..0d3d9a4b07 100644
--- a/ydb/core/keyvalue/protos/events.proto
+++ b/ydb/core/keyvalue/protos/events.proto
@@ -26,11 +26,15 @@ message Statuses {
RSTATUS_ERROR = 2;
RSTATUS_TIMEOUT = 3;
RSTATUS_INTERNAL_ERROR = 4;
- RSTATUS_NO_DATA = 5;
+ RSTATUS_NOT_FOUND = 5;
RSTATUS_OVERRUN = 6;
+ RSTATUS_WRONG_LOCK_GENERATION = 7;
}
}
+message StorageStatusFlags {
+}
+
message Flags {
bool disk_space_cyan = 1;
bool disk_space_light_yellow_move = 2;
@@ -41,28 +45,34 @@ message Flags {
bool disk_space_black = 7;
}
-message Channel {
+message StorageChannel {
+ enum StatusFlag {
+ STATUS_FLAG_UNSPECIFIED = 0;
+ STATUS_FLAG_GREEN = 10;
+ STATUS_FLAG_YELLOW_STOP = 20;
+ STATUS_FLAG_ORANGE_OUT_SPACE = 30;
+ }
Statuses.ReplyStatus status = 1;
uint32 storage_channel = 2;
- optional Flags status_flags = 3;
+ StatusFlag status_flag = 3;
}
message KVRange {
oneof from_bound {
- bytes from_key_inclusive = 1;
- bytes from_key_exclusive = 2;
+ string from_key_inclusive = 1;
+ string from_key_exclusive = 2;
}
oneof to_bound {
- bytes to_key_inclusive = 3;
- bytes to_key_exclusive = 4;
+ string to_key_inclusive = 3;
+ string to_key_exclusive = 4;
}
}
message ReadRequest {
uint64 tablet_id = 1;
- uint64 lock_generation = 2;
- bytes key = 3;
+ optional uint64 lock_generation = 2;
+ string key = 3;
uint64 offset = 4;
uint64 size = 5;
uint64 cookie = 6;
@@ -72,18 +82,19 @@ message ReadRequest {
}
message ReadResult {
- bytes requested_key = 1;
+ string requested_key = 1;
uint64 requested_offset = 2;
uint64 requested_size = 3;
bytes value = 4;
string msg = 5;
Statuses.ReplyStatus status = 6;
uint64 cookie = 7;
+ uint64 node_id = 8;
}
message ReadRangeRequest {
uint64 tablet_id = 1;
- uint64 lock_generation = 2;
+ optional uint64 lock_generation = 2;
KVRange range = 3;
@@ -96,7 +107,7 @@ message ReadRangeRequest {
message ReadRangeResult {
message KeyValuePair {
- bytes key = 1;
+ string key = 1;
bytes value = 2;
uint32 value_size = 3;
@@ -109,23 +120,24 @@ message ReadRangeResult {
string msg = 2;
repeated KeyValuePair pair = 3;
uint64 cookie = 4;
+ uint64 node_id = 5;
}
message ExecuteTransactionRequest {
message Command {
message Rename {
- bytes old_key = 1;
- bytes new_key = 2;
+ string old_key = 1;
+ string new_key = 2;
}
message Concat {
- repeated bytes input_keys = 1;
- bytes output_key = 2;
+ repeated string input_keys = 1;
+ string output_key = 2;
bool keep_inputs = 3;
}
message CopyRange {
KVRange range = 1;
- bytes prefix_to_remove = 2;
- bytes prefix_to_add = 3;
+ string prefix_to_remove = 2;
+ string prefix_to_add = 3;
}
message Write {
enum Tactic {
@@ -133,7 +145,7 @@ message ExecuteTransactionRequest {
TACTIC_MAX_THROUGHPUT = 1;
TACTIC_MIN_LATENCY = 2;
}
- bytes key = 1;
+ string key = 1;
bytes value = 2;
uint32 storage_channel = 3;
Priorities.Priority priority = 4;
@@ -153,38 +165,41 @@ message ExecuteTransactionRequest {
}
uint64 tablet_id = 1;
- uint64 lock_generation = 2;
+ optional uint64 lock_generation = 2;
repeated Command commands = 3;
uint64 cookie = 4;
uint64 deadline_instant_ms = 5;
}
message ExecuteTransactionResult {
- repeated Channel channel = 1;
+ repeated StorageChannel storage_channel = 1;
Statuses.ReplyStatus status = 2;
string msg = 3;
uint64 cookie = 4;
+ uint64 node_id = 5;
}
-message GetStatusRequest {
+message GetStorageChannelStatusRequest {
uint64 tablet_id = 1;
- uint64 lock_generation = 2;
+ optional uint64 lock_generation = 2;
repeated uint32 storage_channel = 3;
uint64 deadline_instant_ms = 4;
}
-message GetStatusResult {
- repeated Channel channel = 1;
+message GetStorageChannelStatusResult {
+ repeated StorageChannel storage_channel = 1;
Statuses.ReplyStatus status = 2;
string msg = 3;
+ uint64 node_id = 4;
}
-message ObtainLockRequest {
+message AcquireLockRequest {
uint64 tablet_id = 1;
uint64 cookie = 2;
}
-message ObtainLockResult {
+message AcquireLockResult {
uint64 lock_generation = 1;
uint64 cookie = 2;
+ uint64 node_id = 3;
}
diff --git a/ydb/core/test_tablet/load_actor_read_validate.cpp b/ydb/core/test_tablet/load_actor_read_validate.cpp
index f6fc171f1e..cfeaf00b0d 100644
--- a/ydb/core/test_tablet/load_actor_read_validate.cpp
+++ b/ydb/core/test_tablet/load_actor_read_validate.cpp
@@ -227,7 +227,6 @@ namespace NKikimr::NTestShard {
}
Y_VERIFY_S(status == NKikimrKeyValue::Statuses::RSTATUS_OK
- || status == NKikimrKeyValue::Statuses::RSTATUS_NO_DATA
|| status == NKikimrKeyValue::Statuses::RSTATUS_OVERRUN,
"TabletId# " << TabletId << " CmdReadRange failed"
<< " Status# " << NKikimrKeyValue::Statuses_ReplyStatus_Name(status));