diff options
author | mokhotskii <mokhotskii@yandex-team.ru> | 2022-03-24 22:41:55 +0300 |
---|---|---|
committer | mokhotskii <mokhotskii@yandex-team.ru> | 2022-03-24 22:41:55 +0300 |
commit | 3d072ebbe1c2292e8eb35427050bf960f3f1a8d9 (patch) | |
tree | fb3d832e047c2552959a8e772cb29b291e5f4d87 | |
parent | 336c4b71b8c800813fb3fc246335ce71bab6d170 (diff) | |
download | ydb-3d072ebbe1c2292e8eb35427050bf960f3f1a8d9.tar.gz |
LOGBROKER-7298 Fix verify on GetRecords call
Fix VERIFY issue
ref:8a203cd72d8e89bce0e0859479ae144bf8576aef
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 24 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 8 | ||||
-rw-r--r-- | ydb/services/datastreams/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_proxy.cpp | 56 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_ut.cpp | 16 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_actor.h | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_read_actor.cpp | 57 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_utils.cpp | 88 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_utils.h | 3 |
9 files changed, 182 insertions, 73 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index af28a8f8e2..127f7ddf5e 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2617,19 +2617,29 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) { Counters.Cumulative()[COUNTER_PQ_READ_ERROR_SMALL_OFFSET].Increment(1); read->Offset = StartOffset; if (read->PartNo > 0) { - LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "I was right, there could be rewinds and deletions at once! Topic " << TopicName << " partition " << Partition - << " readOffset " << read->Offset << " readPartNo " << read->PartNo << " startOffset " << StartOffset); - ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::READ_ERROR_TOO_SMALL_OFFSET, "client requested not from first part, and this part is lost"); + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, + "I was right, there could be rewinds and deletions at once! Topic " << TopicName << + " partition " << Partition << + " readOffset " << read->Offset << + " readPartNo " << read->PartNo << + " startOffset " << StartOffset); + ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::READ_ERROR_TOO_SMALL_OFFSET, + "client requested not from first part, and this part is lost"); return; } } if (read->Offset > EndOffset || read->Offset == EndOffset && read->PartNo > 0) { Counters.Cumulative()[COUNTER_PQ_READ_ERROR_BIG_OFFSET].Increment(1); Counters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor(0); - LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "reading from too big offset - topic " << TopicName << " partition " << Partition << " client " - << read->ClientId << " EndOffset " << EndOffset << " offset " << read->Offset); - ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::READ_ERROR_TOO_BIG_OFFSET, - TStringBuilder() << "trying to read from future. ReadOffset " << read->Offset << ", " << read->PartNo << " EndOffset " << EndOffset); + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, + "reading from too big offset - topic " << TopicName << + " partition " << Partition << + " client " << read->ClientId << + " EndOffset " << EndOffset << + " offset " << read->Offset); + ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::READ_ERROR_TOO_BIG_OFFSET, + TStringBuilder() << "trying to read from future. ReadOffset " << + read->Offset << ", " << read->PartNo << " EndOffset " << EndOffset); return; } diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index e3d68aea43..bdec76536e 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -313,8 +313,12 @@ public: bool HandleError(TEvPQ::TEvError *ev, const TActorContext& ctx) { - LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE, "Answer error topic: '" << TopicName << "' partition: " << Partition - << " messageNo: " << MessageNo << " requestId: " << ReqId << " error: " << ev->Error); + LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE, + "Answer error topic: '" << TopicName << "'" << + " partition: " << Partition << + " messageNo: " << MessageNo << + " requestId: " << ReqId << + " error: " << ev->Error); Response->Record.SetStatus(NMsgBusProxy::MSTATUS_ERROR); Response->Record.SetErrorCode(ev->ErrorCode); Response->Record.SetErrorReason(ev->Error); diff --git a/ydb/services/datastreams/CMakeLists.txt b/ydb/services/datastreams/CMakeLists.txt index 05d672dbdd..5cbec71234 100644 --- a/ydb/services/datastreams/CMakeLists.txt +++ b/ydb/services/datastreams/CMakeLists.txt @@ -23,6 +23,7 @@ target_link_libraries(ydb-services-datastreams PUBLIC cpp-client-ydb_datastreams services-lib-actors services-lib-sharding + ydb-services-persqueue_v1 ydb-services-ydb ) target_sources(ydb-services-datastreams PRIVATE diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index ba5ee058f0..9987c8d4c0 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -12,6 +12,7 @@ #include <ydb/services/lib/actors/pq_schema_actor.h> #include <ydb/services/lib/sharding/sharding.h> +#include <ydb/services/persqueue_v1/persqueue_utils.h> #include <util/folder/path.h> @@ -438,13 +439,13 @@ namespace NKikimr::NDataStreams::V1 { void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) { - ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ERROR, + ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); } } void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { - ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ERROR, + ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); } @@ -1141,6 +1142,9 @@ namespace NKikimr::NDataStreams::V1 { void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx); void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx); + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx); + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx); + void Die(const TActorContext& ctx) override; private: void SendReadRequest(const TActorContext& ctx); @@ -1153,6 +1157,7 @@ namespace NKikimr::NDataStreams::V1 { ui64 TabletId; i32 Limit; TActorId NewSchemeCache; + TActorId PipeClient; }; TGetRecordsActor::TGetRecordsActor(TEvDataStreamsGetRecordsRequest* request, @@ -1195,7 +1200,9 @@ namespace NKikimr::NDataStreams::V1 { .BackoffMultiplier = 2, .DoFirstRetryInstantly = true }; - auto PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, TabletId, clientConfig)); + PipeClient = ctx.RegisterWithSameMailbox( + NTabletPipe::CreateClient(ctx.SelfID, TabletId, clientConfig) + ); NKikimrClient::TPersQueueRequest request; request.MutablePartitionRequest()->SetTopic(this->GetTopicPath(ctx)); @@ -1218,6 +1225,8 @@ namespace NKikimr::NDataStreams::V1 { void TGetRecordsActor::StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { switch (ev->GetTypeRewrite()) { HFunc(TEvPersQueue::TEvResponse, Handle); + HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + HFunc(TEvTabletPipe::TEvClientConnected, Handle); default: TBase::StateWork(ev, ctx); } } @@ -1241,7 +1250,6 @@ namespace NKikimr::NDataStreams::V1 { } } - if (response.Self->Info.GetPathType() == NKikimrSchemeOp::EPathTypePersQueueGroup) { const auto& partitions = response.PQGroupInfo->Description.GetPartitions(); for (auto& partition : partitions) { @@ -1259,7 +1267,20 @@ namespace NKikimr::NDataStreams::V1 { void TGetRecordsActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) { const auto& record = ev->Get()->Record; - Y_VERIFY(ev->Get()->Record.HasPartitionResponse()); + switch (record.GetStatus()) { + case NMsgBusProxy::MSTATUS_ERROR: + switch (record.GetErrorCode()) { + case NPersQueue::NErrorCode::READ_ERROR_TOO_SMALL_OFFSET: + case NPersQueue::NErrorCode::READ_ERROR_TOO_BIG_OFFSET: + return SendResponse(ctx, {}, 0); + default: + return ReplyWithError(ConvertPersQueueInternalCodeToStatus(record.GetErrorCode()), + Ydb::PersQueue::ErrorCode::ERROR, + record.GetErrorReason(), ctx); + } + break; + default: {} + } ui64 millisBehindLatestMs = 0; std::vector<Ydb::DataStreams::V1::Record> records; @@ -1277,12 +1298,26 @@ namespace NKikimr::NDataStreams::V1 { record.set_sequence_number(std::to_string(r.GetOffset()).c_str()); records.push_back(record); } - millisBehindLatestMs = records.size() > 0 ? TInstant::Now().MilliSeconds() - results.rbegin()->GetWriteTimestampMS() : 0; + millisBehindLatestMs = records.size() > 0 + ? TInstant::Now().MilliSeconds() - results.rbegin()->GetWriteTimestampMS() + : 0; } SendResponse(ctx, records, millisBehindLatestMs); } + void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { + if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) { + ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, + TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); + } + } + + void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { + ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, + TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); + } + void TGetRecordsActor::SendResponse(const TActorContext& ctx, const std::vector<Ydb::DataStreams::V1::Record>& records, ui64 millisBehindLatestMs) { @@ -1307,6 +1342,11 @@ namespace NKikimr::NDataStreams::V1 { Die(ctx); } + void TGetRecordsActor::Die(const TActorContext& ctx) { + NTabletPipe::CloseClient(ctx, PipeClient); + TBase::Die(ctx); + } + //----------------------------------------------------------------------------------------- class TListShardsActor : public TPQGrpcSchemaBase<TListShardsActor, NKikimr::NGRpcService::TEvDataStreamsListShardsRequest> { @@ -1533,13 +1573,13 @@ namespace NKikimr::NDataStreams::V1 { void TListShardsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) { - ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ERROR, + ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); } } void TListShardsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { - ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ERROR, + ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); } diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index 396363e1e5..27847e1ace 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -1265,6 +1265,22 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(result.GetResult().records().size(), recordsCount - 3); } + { + auto result = testServer.DataStreamsClient->GetShardIterator(streamName, "shard-000000", + YDS_V1::ShardIteratorType::AFTER_SEQUENCE_NUMBER, + NYDS_V1::TGetShardIteratorSettings().StartingSequenceNumber("99999") + ).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + shardIterator = result.GetResult().shard_iterator(); + } + + { + auto result = testServer.DataStreamsClient->GetRecords(shardIterator).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(result.GetResult().records().size(), 0); + } { auto result = testServer.DataStreamsClient->GetShardIterator(streamName, "shard-000000", diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h index 0e86e0fd7f..afc4d8f69e 100644 --- a/ydb/services/persqueue_v1/grpc_pq_actor.h +++ b/ydb/services/persqueue_v1/grpc_pq_actor.h @@ -46,8 +46,6 @@ PersQueue::ErrorCode::ErrorCode ConvertOldCode(const NPersQueue::NErrorCode::EEr void FillIssue(Ydb::Issue::IssueMessage* issue, const PersQueue::ErrorCode::ErrorCode errorCode, const TString& errorReason); -Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const PersQueue::ErrorCode::ErrorCode code); - const TString& TopicPrefix(const TActorContext& ctx); static const TDuration CHECK_ACL_DELAY = TDuration::Minutes(5); diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index 3e46415a89..9f50b6511b 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -29,8 +29,7 @@ namespace NKikimr { using namespace NMsgBusProxy; -namespace NGRpcProxy { -namespace V1 { +namespace NGRpcProxy::V1 { using namespace PersQueue::V1; @@ -3015,55 +3014,5 @@ void TReadInfoActor::Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorC AnswerError(ev->Get()->Reason, ev->Get()->ErrorCode, ctx); } - - -Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const PersQueue::ErrorCode::ErrorCode code) { - - using namespace PersQueue::ErrorCode; - - switch(code) { - case OK : - return Ydb::StatusIds::SUCCESS; - case INITIALIZING: - case CLUSTER_DISABLED: - return Ydb::StatusIds::UNAVAILABLE; - case PREFERRED_CLUSTER_MISMATCHED: - return Ydb::StatusIds::ABORTED; - case OVERLOAD: - return Ydb::StatusIds::OVERLOADED; - case BAD_REQUEST: - return Ydb::StatusIds::BAD_REQUEST; - case WRONG_COOKIE: - case CREATE_SESSION_ALREADY_LOCKED: - case DELETE_SESSION_NO_SESSION: - case READ_ERROR_NO_SESSION: - return Ydb::StatusIds::SESSION_EXPIRED; - case WRITE_ERROR_PARTITION_IS_FULL: - case WRITE_ERROR_DISK_IS_FULL: - case WRITE_ERROR_BAD_OFFSET: - case SOURCEID_DELETED: - case READ_ERROR_IN_PROGRESS: - case READ_ERROR_TOO_SMALL_OFFSET: - case READ_ERROR_TOO_BIG_OFFSET: - case SET_OFFSET_ERROR_COMMIT_TO_FUTURE: - case READ_NOT_DONE: - return Ydb::StatusIds::GENERIC_ERROR; - case TABLET_IS_DROPPED: - case UNKNOWN_TOPIC: - case WRONG_PARTITION_NUMBER: - return Ydb::StatusIds::SCHEME_ERROR; - case ACCESS_DENIED: - return Ydb::StatusIds::UNAUTHORIZED; - case ERROR: - return Ydb::StatusIds::GENERIC_ERROR; - - default: - return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; - } -} - - - -} -} -} +} // namespace NGRpcProxy::V1 +} // namespace NKikimr diff --git a/ydb/services/persqueue_v1/persqueue_utils.cpp b/ydb/services/persqueue_v1/persqueue_utils.cpp index 51d7f38045..3b7654d01c 100644 --- a/ydb/services/persqueue_v1/persqueue_utils.cpp +++ b/ydb/services/persqueue_v1/persqueue_utils.cpp @@ -83,4 +83,92 @@ TProcessingResult ProcessMetaCacheTopicResponse(const TSchemeCacheNavigate::TEnt return {}; } +Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const Ydb::PersQueue::ErrorCode::ErrorCode code) { + + using namespace Ydb::PersQueue::ErrorCode; + + switch(code) { + case OK : + return Ydb::StatusIds::SUCCESS; + case INITIALIZING: + case CLUSTER_DISABLED: + return Ydb::StatusIds::UNAVAILABLE; + case PREFERRED_CLUSTER_MISMATCHED: + return Ydb::StatusIds::ABORTED; + case OVERLOAD: + return Ydb::StatusIds::OVERLOADED; + case BAD_REQUEST: + return Ydb::StatusIds::BAD_REQUEST; + case WRONG_COOKIE: + case CREATE_SESSION_ALREADY_LOCKED: + case DELETE_SESSION_NO_SESSION: + case READ_ERROR_NO_SESSION: + return Ydb::StatusIds::SESSION_EXPIRED; + case WRITE_ERROR_PARTITION_IS_FULL: + case WRITE_ERROR_DISK_IS_FULL: + case WRITE_ERROR_BAD_OFFSET: + case SOURCEID_DELETED: + case READ_ERROR_IN_PROGRESS: + case READ_ERROR_TOO_SMALL_OFFSET: + case READ_ERROR_TOO_BIG_OFFSET: + case SET_OFFSET_ERROR_COMMIT_TO_FUTURE: + case READ_NOT_DONE: + return Ydb::StatusIds::GENERIC_ERROR; + case TABLET_IS_DROPPED: + case UNKNOWN_TOPIC: + case WRONG_PARTITION_NUMBER: + return Ydb::StatusIds::SCHEME_ERROR; + case ACCESS_DENIED: + return Ydb::StatusIds::UNAUTHORIZED; + case ERROR: + return Ydb::StatusIds::GENERIC_ERROR; + + default: + return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; + } +} + +Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const NPersQueue::NErrorCode::EErrorCode code) +{ + using namespace NPersQueue::NErrorCode; + + switch(code) { + case OK : + return Ydb::StatusIds::SUCCESS; + case INITIALIZING: + case CLUSTER_DISABLED: + return Ydb::StatusIds::UNAVAILABLE; + case OVERLOAD: + return Ydb::StatusIds::OVERLOADED; + case BAD_REQUEST: + return Ydb::StatusIds::BAD_REQUEST; + case WRONG_COOKIE: + case CREATE_SESSION_ALREADY_LOCKED: + case DELETE_SESSION_NO_SESSION: + case READ_ERROR_NO_SESSION: + return Ydb::StatusIds::SESSION_EXPIRED; + case WRITE_ERROR_PARTITION_IS_FULL: + case WRITE_ERROR_DISK_IS_FULL: + case WRITE_ERROR_BAD_OFFSET: + case SOURCEID_DELETED: + case READ_ERROR_IN_PROGRESS: + case READ_ERROR_TOO_SMALL_OFFSET: + case READ_ERROR_TOO_BIG_OFFSET: + case SET_OFFSET_ERROR_COMMIT_TO_FUTURE: + case READ_NOT_DONE: + return Ydb::StatusIds::GENERIC_ERROR; + case TABLET_IS_DROPPED: + case UNKNOWN_TOPIC: + case WRONG_PARTITION_NUMBER: + return Ydb::StatusIds::SCHEME_ERROR; + case ACCESS_DENIED: + return Ydb::StatusIds::UNAUTHORIZED; + case ERROR: + return Ydb::StatusIds::GENERIC_ERROR; + + default: + return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; + } +} + } // namespace NKikimr::NGRpcProxy::V1 diff --git a/ydb/services/persqueue_v1/persqueue_utils.h b/ydb/services/persqueue_v1/persqueue_utils.h index 7860067982..b4ffa5b857 100644 --- a/ydb/services/persqueue_v1/persqueue_utils.h +++ b/ydb/services/persqueue_v1/persqueue_utils.h @@ -29,4 +29,7 @@ struct TProcessingResult { TProcessingResult ProcessMetaCacheTopicResponse(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry); +Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const Ydb::PersQueue::ErrorCode::ErrorCode code); +Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const NPersQueue::NErrorCode::EErrorCode code); + } //namespace NKikimr::NGRpcProxy::V1 |