diff options
author | abcdef <akotov@ydb.tech> | 2023-02-07 12:47:29 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-02-07 12:47:29 +0300 |
commit | 02d113168a3b5ec0f6ce872057cd39f1c6dc1300 (patch) | |
tree | 1a47678087de1acee8d4fc9390fafd87aa3719da | |
parent | 5bb4d425bb72b388d18ada7bac07e5f2d8f1b9d6 (diff) | |
download | ydb-02d113168a3b5ec0f6ce872057cd39f1c6dc1300.tar.gz |
20 files changed, 166 insertions, 235 deletions
diff --git a/ydb/core/grpc_services/service_table.h b/ydb/core/grpc_services/service_table.h index dd7dd90035..adf8493daa 100644 --- a/ydb/core/grpc_services/service_table.h +++ b/ydb/core/grpc_services/service_table.h @@ -31,7 +31,5 @@ void DoDescribeTableOptionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFaci void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); void DoExecuteScanQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&); -void DoAddOffsetsToTransaction(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); - } } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 96110e8f8f..24d42aa234 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -764,6 +764,7 @@ message TFeatureFlags { reserved 83; // EnableKqpDataQuerySourceRead optional bool EnableSmallDiskOptimization = 84 [default = true]; optional bool EnableDataShardVolatileTransactions = 85 [default = false]; + optional bool EnableTopicServiceTx = 86 [default = false]; } diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 06485cb653..58c9ba8b75 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -69,7 +69,7 @@ enum EQueryReplyFlags { message TTopicOperations { optional string Consumer = 1; - repeated Ydb.Topic.AddOffsetsToTransactionRequest.TopicOffsets Topics = 2; + repeated Ydb.Topic.UpdateOffsetsInTransactionRequest.TopicOffsets Topics = 2; } message TQueryRequest { diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 48eeae29cb..c4c47f2957 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -45,6 +45,7 @@ public: FEATURE_FLAG_SETTER(EnableDataShardGenericReadSets) FEATURE_FLAG_SETTER(EnableAlterDatabaseCreateHiveFirst) FEATURE_FLAG_SETTER(EnableDataShardVolatileTransactions) + FEATURE_FLAG_SETTER(EnableTopicServiceTx) TDerived& SetEnableMvcc(std::optional<bool> value) { if (value) { diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index fadef0571b..6d7b66eadc 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -329,7 +329,6 @@ namespace Tests { GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxyId, true)); GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true)); GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicServiceTx(system, counters, grpcRequestProxyId)); GRpcServer->AddService(new NGRpcService::TGRpcPQClusterDiscoveryService(system, counters, grpcRequestProxyId)); GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxyId, true)); GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxyId, true)); diff --git a/ydb/public/api/grpc/draft/CMakeLists.darwin.txt b/ydb/public/api/grpc/draft/CMakeLists.darwin.txt index 6eadd10c9b..c1b1013216 100644 --- a/ydb/public/api/grpc/draft/CMakeLists.darwin.txt +++ b/ydb/public/api/grpc/draft/CMakeLists.darwin.txt @@ -27,7 +27,6 @@ target_proto_messages(api-grpc-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_query_v1.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto ) target_proto_addincls(api-grpc-draft ./ diff --git a/ydb/public/api/grpc/draft/CMakeLists.linux-aarch64.txt b/ydb/public/api/grpc/draft/CMakeLists.linux-aarch64.txt index a0e5a143d3..a3854476ba 100644 --- a/ydb/public/api/grpc/draft/CMakeLists.linux-aarch64.txt +++ b/ydb/public/api/grpc/draft/CMakeLists.linux-aarch64.txt @@ -28,7 +28,6 @@ target_proto_messages(api-grpc-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_query_v1.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto ) target_proto_addincls(api-grpc-draft ./ diff --git a/ydb/public/api/grpc/draft/CMakeLists.linux.txt b/ydb/public/api/grpc/draft/CMakeLists.linux.txt index a0e5a143d3..a3854476ba 100644 --- a/ydb/public/api/grpc/draft/CMakeLists.linux.txt +++ b/ydb/public/api/grpc/draft/CMakeLists.linux.txt @@ -28,7 +28,6 @@ target_proto_messages(api-grpc-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_query_v1.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto ) target_proto_addincls(api-grpc-draft ./ diff --git a/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto b/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto deleted file mode 100644 index 120f32f500..0000000000 --- a/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto +++ /dev/null @@ -1,16 +0,0 @@ -syntax = "proto3"; -option cc_enable_arenas = true; - -package Ydb.Topic.V1; - -option java_package = "com.yandex.ydb.topic.v1"; - -import "ydb/public/api/protos/ydb_topic.proto"; - -service TopicServiceTx { - // Add offsets to transaction - rpc AddOffsetsToTransaction(AddOffsetsToTransactionRequest) returns (AddOffsetsToTransactionResponse); -} - - - diff --git a/ydb/public/api/grpc/ydb_topic_v1.proto b/ydb/public/api/grpc/ydb_topic_v1.proto index c991df6208..e2ecd59273 100644 --- a/ydb/public/api/grpc/ydb_topic_v1.proto +++ b/ydb/public/api/grpc/ydb_topic_v1.proto @@ -68,6 +68,9 @@ service TopicService { // Single commit offset request. rpc CommitOffset(CommitOffsetRequest) returns (CommitOffsetResponse); + // Add information about offset ranges to the transaction. + rpc UpdateOffsetsInTransaction(UpdateOffsetsInTransactionRequest) returns (UpdateOffsetsInTransactionResponse); + // Create topic command. rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse); diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index 5ade5338e7..2492635ead 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -133,6 +133,8 @@ message StreamWriteMessage { // See enum Codec above for values. int32 codec = 2; + optional TransactionIdentity tx = 3; + message MessageData { // Message sequence number, provided by client for deduplication. // Starts at 1 @@ -498,20 +500,24 @@ message StreamReadMessage { } } -// Add offsets to transaction request sent from client to server. -message AddOffsetsToTransactionRequest { - Ydb.Operations.OperationParams operation_params = 1; +message TransactionIdentity { + // Transaction identifier from TableService. + string id = 1; // Session identifier from TableService. - string session_id = 2; + string session = 2; +} - // Transaction identifier from TableService. - Ydb.Table.TransactionControl tx_control = 3; +// Add offsets to transaction request sent from client to server. +message UpdateOffsetsInTransactionRequest { + Ydb.Operations.OperationParams operation_params = 1; + + TransactionIdentity tx = 2; // Ranges of offsets by topics. - repeated TopicOffsets topics = 4; + repeated TopicOffsets topics = 3; - string consumer = 5; + string consumer = 4; message TopicOffsets { // Topic path. @@ -531,13 +537,13 @@ message AddOffsetsToTransactionRequest { } // Add offsets to transaction response sent from server to client. -message AddOffsetsToTransactionResponse { +message UpdateOffsetsInTransactionResponse { // Result of request will be inside operation. Ydb.Operations.Operation operation = 1; } -// Add offsets to transaction result message that will be inside AddOffsetsToTransactionResponse.operation. -message AddOffsetsToTransactionResult { +// Add offsets to transaction result message that will be inside UpdateOffsetsInTransactionResponse.operation. +message UpdateOffsetsInTransactionResult { } diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt index a43e8508cc..0a9aa85772 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt @@ -37,5 +37,5 @@ target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp ) diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt index afde3c630d..7c0aa49098 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt @@ -38,5 +38,5 @@ target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp ) diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt index afde3c630d..7c0aa49098 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt @@ -38,5 +38,5 @@ target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp ) diff --git a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp b/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp index 5c962458e8..2586714c65 100644 --- a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp +++ b/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp @@ -1,30 +1,44 @@ -#include "add_offsets_to_transaction_actor.h" +#include "update_offsets_in_transaction_actor.h" namespace NKikimr::NGRpcService { -TAddOffsetsToTransactionActor::TAddOffsetsToTransactionActor(IRequestOpCtx* request) +TUpdateOffsetsInTransactionActor::TUpdateOffsetsInTransactionActor(IRequestOpCtx* request) : TBase{request} { } -void TAddOffsetsToTransactionActor::Bootstrap(const NActors::TActorContext& ctx) +void TUpdateOffsetsInTransactionActor::Bootstrap(const NActors::TActorContext& ctx) { TBase::Bootstrap(ctx); - Become(&TAddOffsetsToTransactionActor::StateWork); + Become(&TUpdateOffsetsInTransactionActor::StateWork); Proceed(ctx); } -void TAddOffsetsToTransactionActor::Proceed(const NActors::TActorContext& ctx) +void TUpdateOffsetsInTransactionActor::Proceed(const NActors::TActorContext& ctx) { + if (!AppData(ctx)->FeatureFlags.GetEnableTopicServiceTx()) { + return Reply(Ydb::StatusIds::UNSUPPORTED, + "Disabled transaction support for TopicService.", + NKikimrIssues::TIssuesIds::DEFAULT_ERROR, + ctx); + } + const auto req = GetProtoRequest(); + if (!req->has_tx()) { + return Reply(Ydb::StatusIds::BAD_REQUEST, + "Empty tx.", + NKikimrIssues::TIssuesIds::DEFAULT_ERROR, + ctx); + } + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); SetAuthToken(ev, *Request_); SetDatabase(ev, *Request_); NYql::TIssues issues; - if (CheckSession(req->session_id(), issues)) { - ev->Record.MutableRequest()->SetSessionId(req->session_id()); + if (CheckSession(req->tx().session(), issues)) { + ev->Record.MutableRequest()->SetSessionId(req->tx().session()); } else { return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx); } @@ -43,24 +57,16 @@ void TAddOffsetsToTransactionActor::Proceed(const NActors::TActorContext& ctx) ev->Record.MutableRequest()->SetCancelAfterMs(GetCancelAfter().MilliSeconds()); ev->Record.MutableRequest()->SetTimeoutMs(GetOperationTimeout().MilliSeconds()); - if (!req->has_tx_control()) { - NYql::TIssues issues; - issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Empty tx_control.")); - return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx); - } - - // - // TODO: проверить комбинацию значений атрибутов tx_control - // + ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(req->tx().id()); - ev->Record.MutableRequest()->MutableTxControl()->CopyFrom(req->tx_control()); ev->Record.MutableRequest()->MutableTopicOperations()->SetConsumer(req->consumer()); *ev->Record.MutableRequest()->MutableTopicOperations()->MutableTopics() = req->topics(); ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); } -void TAddOffsetsToTransactionActor::Handle(const NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx) +void TUpdateOffsetsInTransactionActor::Handle(const NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, + const NActors::TActorContext& ctx) { const auto& record = ev->Get()->Record.GetRef(); SetCost(record.GetConsumedRu()); @@ -69,7 +75,7 @@ void TAddOffsetsToTransactionActor::Handle(const NKqp::TEvKqp::TEvQueryResponse: if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { const auto& kqpResponse = record.GetResponse(); const auto& issueMessage = kqpResponse.GetQueryIssues(); - auto queryResult = TEvAddOffsetsToTransactionRequest::AllocateResult<Ydb::Topic::AddOffsetsToTransactionResult>(Request_); + auto queryResult = TEvUpdateOffsetsInTransactionRequest::AllocateResult<Ydb::Topic::UpdateOffsetsInTransactionResult>(Request_); // // TODO: сохранить результат @@ -89,9 +95,4 @@ void TAddOffsetsToTransactionActor::Handle(const NKqp::TEvKqp::TEvQueryResponse: } } -void DoAddOffsetsToTransaction(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) -{ - TActivationContext::AsActorContext().Register(new TAddOffsetsToTransactionActor(p.release())); -} - } diff --git a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.h b/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.h index 2e82458709..3b905e294b 100644 --- a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.h +++ b/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.h @@ -12,15 +12,16 @@ namespace NKikimr::NGRpcService { -using TEvAddOffsetsToTransactionRequest = - TGrpcRequestOperationCall<Ydb::Topic::AddOffsetsToTransactionRequest, Ydb::Topic::AddOffsetsToTransactionResponse>; +using TEvUpdateOffsetsInTransactionRequest = + TGrpcRequestOperationCall<Ydb::Topic::UpdateOffsetsInTransactionRequest, Ydb::Topic::UpdateOffsetsInTransactionResponse>; -class TAddOffsetsToTransactionActor : public TRpcKqpRequestActor<TAddOffsetsToTransactionActor, TEvAddOffsetsToTransactionRequest> { +class TUpdateOffsetsInTransactionActor : + public TRpcKqpRequestActor<TUpdateOffsetsInTransactionActor, TEvUpdateOffsetsInTransactionRequest> { public: - using TBase = TRpcKqpRequestActor<TAddOffsetsToTransactionActor, TEvAddOffsetsToTransactionRequest>; - using TResult = Ydb::Topic::AddOffsetsToTransactionResult; + using TBase = TRpcKqpRequestActor<TUpdateOffsetsInTransactionActor, TEvUpdateOffsetsInTransactionRequest>; + using TResult = Ydb::Topic::UpdateOffsetsInTransactionResult; - explicit TAddOffsetsToTransactionActor(IRequestOpCtx* msg); + explicit TUpdateOffsetsInTransactionActor(IRequestOpCtx* msg); void Bootstrap(const NActors::TActorContext& ctx); diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp index 464c8a9341..cb286e2fa6 100644 --- a/ydb/services/persqueue_v1/topic.cpp +++ b/ydb/services/persqueue_v1/topic.cpp @@ -7,21 +7,26 @@ #include <ydb/core/grpc_services/service_table.h> #include <ydb/core/tx/scheme_board/cache.h> +#include "actors/update_offsets_in_transaction_actor.h" + #include "grpc_pq_read.h" #include "grpc_pq_write.h" #include "grpc_pq_schema.h" -namespace NKikimr { -namespace NGRpcService { -namespace V1 { +namespace NKikimr::NGRpcService::V1 { static const ui32 TopicWriteSessionsMaxCount = 1000000; static const ui32 TopicReadSessionsMaxCount = 100000; -TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache,const NActors::TActorId& grpcRequestProxy, bool rlAllowed) +TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const NActors::TActorId& schemeCache, + const NActors::TActorId& grpcRequestProxy, + bool rlAllowed) : TGrpcServiceBase<Ydb::Topic::V1::TopicService>(system, counters, grpcRequestProxy, rlAllowed) , SchemeCache(schemeCache) -{ } +{ +} void TGRpcTopicService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { CQ_ = cq; @@ -53,6 +58,11 @@ void TGRpcTopicService::InitNewSchemeCacheActor() { TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); } +void TGRpcTopicService::DoUpdateOffsetsInTransaction(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) +{ + TActivationContext::AsActorContext().Register(new TUpdateOffsetsInTransactionActor(p.release())); +} + void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters_, ActorSystem_); @@ -128,71 +138,29 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribeConsumerRequest(ctx, IsRlAllowed())); }) #undef ADD_REQUEST -} - -void TGRpcTopicService::StopService() noexcept { - TGrpcServiceBase::StopService(); -} - -// -// TGRpcTopicServiceTx -// -TGRpcTopicServiceTx::TGRpcTopicServiceTx(NActors::TActorSystem *system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - const NActors::TActorId& grpcRequestProxy) - : ActorSystem(system) - , Counters(counters) - , GRpcRequestProxy(grpcRequestProxy) -{ -} - -void TGRpcTopicServiceTx::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ = cq; - - if (ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled()) { - SetupIncomingRequests(std::move(logger)); - } -} - -void TGRpcTopicServiceTx::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter = limiter; -} - -bool TGRpcTopicServiceTx::IncRequest() { - return Limiter->Inc(); -} - -void TGRpcTopicServiceTx::DecRequest() { - Limiter->Dec(); -} - -void TGRpcTopicServiceTx::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { - auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters, ActorSystem); #ifdef ADD_REQUEST_LIMIT #error ADD_REQUEST_LIMIT macro already defined #endif #define ADD_REQUEST_LIMIT(NAME, CB, LIMIT_TYPE) \ - MakeIntrusive<TGRpcRequest<Ydb::Topic::NAME##Request, Ydb::Topic::NAME##Response, TGRpcTopicServiceTx>> \ - (this, this->GetService(), CQ, \ - [this](NGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem, ctx->GetPeer()); \ - ActorSystem->Send(GRpcRequestProxy, \ - new TGrpcRequestOperationCall<Ydb::Topic::NAME##Request, Ydb::Topic::NAME##Response> \ - (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::LIMIT_TYPE, nullptr})); \ - }, &Ydb::Topic::V1::TopicServiceTx::AsyncService::Request ## NAME, \ + MakeIntrusive<TGRpcRequest<Ydb::Topic::NAME##Request, Ydb::Topic::NAME##Response, TGRpcTopicService>> \ + (this, this->GetService(), CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestOperationCall<Ydb::Topic::NAME##Request, Ydb::Topic::NAME##Response> \ + (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::LIMIT_TYPE, nullptr})); \ + }, &Ydb::Topic::V1::TopicService::AsyncService::Request ## NAME, \ #NAME, logger, getCounterBlock("topic", #NAME))->Run(); - ADD_REQUEST_LIMIT(AddOffsetsToTransaction, DoAddOffsetsToTransaction, Ru) + ADD_REQUEST_LIMIT(UpdateOffsetsInTransaction, DoUpdateOffsetsInTransaction, Ru) #undef ADD_REQUEST_LIMIT } -void TGRpcTopicServiceTx::StopService() noexcept { +void TGRpcTopicService::StopService() noexcept { TGrpcServiceBase::StopService(); } -} // V1 -} // namespace NGRpcService -} // namespace NKikimr +} // namespace NKikimr::NGRpcService::V1 diff --git a/ydb/services/persqueue_v1/topic.h b/ydb/services/persqueue_v1/topic.h index ee5f8be7b1..ba26ae6a6c 100644 --- a/ydb/services/persqueue_v1/topic.h +++ b/ydb/services/persqueue_v1/topic.h @@ -3,17 +3,14 @@ #include <library/cpp/actors/core/actorsystem.h> #include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h> -#include <ydb/public/api/grpc/draft/ydb_topic_tx_v1.grpc.pb.h> #include <library/cpp/grpc/server/grpc_server.h> #include <ydb/core/grpc_services/base/base_service.h> +#include <ydb/core/grpc_services/base/base.h> -namespace NKikimr { - -namespace NGRpcService { -namespace V1 { +namespace NKikimr::NGRpcService::V1 { class TGRpcTopicService : public TGrpcServiceBase<Ydb::Topic::V1::TopicService> @@ -26,44 +23,18 @@ public: using NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicService>::GetService; - private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger) override; void InitNewSchemeCacheActor(); + static void DoUpdateOffsetsInTransaction(std::unique_ptr<IRequestOpCtx> p, + const IFacilityProvider &); + NActors::TActorId SchemeCache; NActors::TActorId NewSchemeCache; -}; - -class TGRpcTopicServiceTx - : public NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicServiceTx> -{ -public: - TGRpcTopicServiceTx(NActors::TActorSystem* system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - const NActors::TActorId& grpcRequestProxy); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - void StopService() noexcept override; - - using NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicServiceTx>::GetService; - - bool IncRequest(); - void DecRequest(); - -private: - void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - - NActors::TActorSystem* ActorSystem; - grpc::ServerCompletionQueue* CQ = nullptr; TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; - NGrpc::TGlobalLimiter* Limiter = nullptr; - NActors::TActorId GRpcRequestProxy; }; -} // namespace V1 -} // namespace NGRpcService -} // namespace NKikimr +} // namespace NKikimr::NGRpcService::V1 diff --git a/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp b/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp index 7a1ab22113..88b3943cd1 100644 --- a/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp +++ b/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp @@ -1,4 +1,4 @@ -#include <ydb/public/api/grpc/draft/ydb_topic_tx_v1.grpc.pb.h> +#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h> #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h> @@ -41,11 +41,11 @@ protected: void Wait_DataReceivedEvent(TTopicReadSession& reader, ui64 offset); - void Call_AddOffsetsToTransaction(const TString& sessionId, - const TString& txId, - const TString& consumer, - ui64 rangeBegin, - ui64 rangeEnd); + void Call_UpdateOffsetsInTransaction(const TString& sessionId, + const TString& txId, + const TString& consumer, + ui64 rangeBegin, + ui64 rangeEnd); const TString CONSUMER = "user"; const TString SHORT_TOPIC_NAME = "demo"; @@ -57,7 +57,7 @@ protected: const TString TOPIC_PATH = TOPIC_PARENT + "/" + FULL_TOPIC_NAME; TMaybe<NPersQueue::TTestServer> Server; - std::unique_ptr<Ydb::Topic::V1::TopicServiceTx::Stub> TopicTxStub; + std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> TopicStub; }; void TImmediateTxFixture::SetUp(NUnitTest::TTestContext&) @@ -69,7 +69,7 @@ void TImmediateTxFixture::SetUp(NUnitTest::TTestContext&) void TImmediateTxFixture::CreateTestServer() { - Server.ConstructInPlace(PQSettings(0).SetDomainName("Root")); + Server.ConstructInPlace(PQSettings(0).SetDomainName("Root").SetEnableTopicServiceTx(true)); Server->EnableLogs({NKikimrServices::FLAT_TX_SCHEMESHARD , NKikimrServices::PERSQUEUE}); @@ -99,7 +99,7 @@ void TImmediateTxFixture::CreateTopic() void TImmediateTxFixture::CreateTopicTxStub() { auto channel = grpc::CreateChannel("localhost:" + ToString(Server->GrpcPort), grpc::InsecureChannelCredentials()); - TopicTxStub = Ydb::Topic::V1::TopicServiceTx::NewStub(channel); + TopicStub = Ydb::Topic::V1::TopicService::NewStub(channel); } NYdb::NTable::TSession TImmediateTxFixture::CreateSession() @@ -178,21 +178,21 @@ void TImmediateTxFixture::Wait_DataReceivedEvent(TTopicReadSession& reader, UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetOffset(), offset); } -void TImmediateTxFixture::Call_AddOffsetsToTransaction(const TString& sessionId, - const TString& txId, - const TString& consumer, - ui64 rangeBegin, - ui64 rangeEnd) +void TImmediateTxFixture::Call_UpdateOffsetsInTransaction(const TString& sessionId, + const TString& txId, + const TString& consumer, + ui64 rangeBegin, + ui64 rangeEnd) { grpc::ClientContext rcontext; rcontext.AddMetadata("x-ydb-auth-ticket", AUTH_TOKEN); rcontext.AddMetadata("x-ydb-database", DATABASE); - Ydb::Topic::AddOffsetsToTransactionRequest request; - Ydb::Topic::AddOffsetsToTransactionResponse response; + Ydb::Topic::UpdateOffsetsInTransactionRequest request; + Ydb::Topic::UpdateOffsetsInTransactionResponse response; - request.set_session_id(sessionId); - request.mutable_tx_control()->set_tx_id(txId); + request.mutable_tx()->set_id(txId); + request.mutable_tx()->set_session(sessionId); request.set_consumer(consumer); auto *topic = request.mutable_topics()->Add(); @@ -205,9 +205,9 @@ void TImmediateTxFixture::Call_AddOffsetsToTransaction(const TString& sessionId, range->set_start(rangeBegin); range->set_end(rangeEnd); - grpc::Status status = TopicTxStub->AddOffsetsToTransaction(&rcontext, - request, - &response); + grpc::Status status = TopicStub->UpdateOffsetsInTransaction(&rcontext, + request, + &response); UNIT_ASSERT(status.ok()); UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); @@ -226,7 +226,7 @@ Y_UNIT_TEST_F(Scenario_1, TImmediateTxFixture) Wait_DataReceivedEvent(*reader, 0); Wait_DataReceivedEvent(*reader, 1); - Call_AddOffsetsToTransaction(session.GetId(), tx.GetId(), CONSUMER, 0, 2); + Call_UpdateOffsetsInTransaction(session.GetId(), tx.GetId(), CONSUMER, 0, 2); CommitTx(tx, NYdb::EStatus::SUCCESS); } @@ -242,7 +242,7 @@ Y_UNIT_TEST_F(Scenario_1, TImmediateTxFixture) Wait_DataReceivedEvent(*reader, 2); Wait_DataReceivedEvent(*reader, 3); - Call_AddOffsetsToTransaction(session.GetId(), tx.GetId(), CONSUMER, 2, 4); + Call_UpdateOffsetsInTransaction(session.GetId(), tx.GetId(), CONSUMER, 2, 4); } { @@ -266,7 +266,7 @@ Y_UNIT_TEST_F(Scenario_2, TImmediateTxFixture) Wait_DataReceivedEvent(*reader, 1); Wait_DataReceivedEvent(*reader, 2); - Call_AddOffsetsToTransaction(s1.GetId(), t1.GetId(), CONSUMER, 0, 3); + Call_UpdateOffsetsInTransaction(s1.GetId(), t1.GetId(), CONSUMER, 0, 3); } NYdb::NTable::TSession s2 = CreateSession(); @@ -280,7 +280,7 @@ Y_UNIT_TEST_F(Scenario_2, TImmediateTxFixture) Wait_DataReceivedEvent(*reader, 0); Wait_DataReceivedEvent(*reader, 1); - Call_AddOffsetsToTransaction(s2.GetId(), t2.GetId(), CONSUMER, 0, 2); + Call_UpdateOffsetsInTransaction(s2.GetId(), t2.GetId(), CONSUMER, 0, 2); } CommitTx(t2, NYdb::EStatus::SUCCESS); diff --git a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp index 27a01277f3..16882a8c62 100644 --- a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp +++ b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp @@ -1,4 +1,4 @@ -#include <ydb/public/api/grpc/draft/ydb_topic_tx_v1.grpc.pb.h> +#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h> #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h> @@ -44,8 +44,8 @@ std::unique_ptr<typename T::Stub> CreateServiceStub(const NPersQueue::TTestServe return stub; } -std::unique_ptr<Ydb::Topic::V1::TopicServiceTx::Stub> CreateTopicServiceTxStub(const NPersQueue::TTestServer &server) { - return CreateServiceStub<Ydb::Topic::V1::TopicServiceTx>(server); +std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> CreateTopicServiceStub(const NPersQueue::TTestServer &server) { + return CreateServiceStub<Ydb::Topic::V1::TopicService>(server); } struct TOffsetRange { @@ -72,7 +72,7 @@ void AppendOffsetsRange(const TOffsetRange& r, google::protobuf::RepeatedPtrFiel } void AppendPartition(const TPartition& p, - google::protobuf::RepeatedPtrField<Ydb::Topic::AddOffsetsToTransactionRequest_TopicOffsets_PartitionOffsets> *partitions) + google::protobuf::RepeatedPtrField<Ydb::Topic::UpdateOffsetsInTransactionRequest_TopicOffsets_PartitionOffsets> *partitions) { auto* partition = partitions->Add(); @@ -84,7 +84,7 @@ void AppendPartition(const TPartition& p, } void AppendTopic(const TTopic &t, - google::protobuf::RepeatedPtrField<Ydb::Topic::AddOffsetsToTransactionRequest_TopicOffsets> *topics) + google::protobuf::RepeatedPtrField<Ydb::Topic::UpdateOffsetsInTransactionRequest_TopicOffsets> *topics) { auto* topic = topics->Add(); @@ -95,15 +95,15 @@ void AppendTopic(const TTopic &t, } } -Ydb::Topic::AddOffsetsToTransactionRequest CreateRequest(const TString& session_id, - const TString& tx_id, - const TString& consumer, - const TVector<TTopic>& topics) +Ydb::Topic::UpdateOffsetsInTransactionRequest CreateRequest(const TString& session_id, + const TString& tx_id, + const TString& consumer, + const TVector<TTopic>& topics) { - Ydb::Topic::AddOffsetsToTransactionRequest request; + Ydb::Topic::UpdateOffsetsInTransactionRequest request; - request.set_session_id(session_id); - request.mutable_tx_control()->set_tx_id(tx_id); + request.mutable_tx()->set_id(tx_id); + request.mutable_tx()->set_session(session_id); request.set_consumer(consumer); @@ -114,12 +114,12 @@ Ydb::Topic::AddOffsetsToTransactionRequest CreateRequest(const TString& session_ return request; } -class TAddOffsetToTransactionFixture : public NUnitTest::TBaseFixture { +class TUpdateOffsetsInTransactionFixture : public NUnitTest::TBaseFixture { protected: TMaybe<NPersQueue::TTestServer> server; TMaybe<NYdb::NTable::TSession> session; TMaybe<NYdb::NTable::TTransaction> tx; - std::unique_ptr<Ydb::Topic::V1::TopicServiceTx::Stub> stub; + std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> stub; const TString DATABASE = "/Root"; const TString TOPIC_PARENT = "/Root/PQ"; @@ -137,6 +137,7 @@ protected: void SetUp(NUnitTest::TTestContext&) override { server = NPersQueue::TTestServer(false); server->ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(true); + server->ServerSettings.SetEnableTopicServiceTx(true); server->StartServer(); server->EnableLogs({NKikimrServices::PQ_WRITE_PROXY , NKikimrServices::PQ_READ_PROXY @@ -169,21 +170,21 @@ protected: session = CreateSession(*ydbDriver); tx = BeginTransaction(*session); - stub = CreateTopicServiceTxStub(*server); + stub = CreateTopicServiceStub(*server); } - Ydb::Topic::AddOffsetsToTransactionResponse CallAddOffsetsToTransaction(const TVector<TTopic>& topics, - const TString& consumer = "c0nsumer") { + Ydb::Topic::UpdateOffsetsInTransactionResponse Call_UpdateOffsetsInTransaction(const TVector<TTopic>& topics, + const TString& consumer = "c0nsumer") { grpc::ClientContext rcontext; rcontext.AddMetadata("x-ydb-auth-ticket", AUTH_TOKEN); rcontext.AddMetadata("x-ydb-database", DATABASE); - Ydb::Topic::AddOffsetsToTransactionResponse response; + Ydb::Topic::UpdateOffsetsInTransactionResponse response; - grpc::Status status = stub->AddOffsetsToTransaction(&rcontext, - CreateRequest(session->GetId(), tx->GetId(), - consumer, topics), - &response); + grpc::Status status = stub->UpdateOffsetsInTransaction(&rcontext, + CreateRequest(session->GetId(), tx->GetId(), + consumer, topics), + &response); UNIT_ASSERT(status.ok()); return response; @@ -194,7 +195,7 @@ protected: const auto BEGIN = 4; const auto END = 7; - auto response = CallAddOffsetsToTransaction({ + auto response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=path1, .Partitions={ TPartition{.Id=PARTITION_ID, .Offsets={ TOffsetRange{.Begin=BEGIN, .End=END} @@ -203,7 +204,7 @@ protected: }); UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - response = CallAddOffsetsToTransaction({ + response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=path2, .Partitions={ TPartition{.Id=PARTITION_ID, .Offsets={ TOffsetRange{.Begin=BEGIN, .End=END} @@ -214,8 +215,8 @@ protected: } }; -Y_UNIT_TEST_F(OneConsumer_TheRangesDoNotOverlap, TAddOffsetToTransactionFixture) { - Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({ +Y_UNIT_TEST_F(OneConsumer_TheRangesDoNotOverlap, TUpdateOffsetsInTransactionFixture) { + Ydb::Topic::UpdateOffsetsInTransactionResponse response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=4, .Offsets={ TOffsetRange{.Begin=1, .End=3}, @@ -228,7 +229,7 @@ Y_UNIT_TEST_F(OneConsumer_TheRangesDoNotOverlap, TAddOffsetToTransactionFixture) }); UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - response = CallAddOffsetsToTransaction({ + response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=1, .Offsets={ TOffsetRange{.Begin=8, .End=11} @@ -238,8 +239,8 @@ Y_UNIT_TEST_F(OneConsumer_TheRangesDoNotOverlap, TAddOffsetToTransactionFixture) UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); } -Y_UNIT_TEST_F(OneConsumer_TheRangesOverlap, TAddOffsetToTransactionFixture) { - Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({ +Y_UNIT_TEST_F(OneConsumer_TheRangesOverlap, TUpdateOffsetsInTransactionFixture) { + Ydb::Topic::UpdateOffsetsInTransactionResponse response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=4, .Offsets={ TOffsetRange{.Begin=1, .End=3}, @@ -252,7 +253,7 @@ Y_UNIT_TEST_F(OneConsumer_TheRangesOverlap, TAddOffsetToTransactionFixture) { }); UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - response = CallAddOffsetsToTransaction({ + response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=4, .Offsets={ TOffsetRange{.Begin=4, .End=7} @@ -262,8 +263,8 @@ Y_UNIT_TEST_F(OneConsumer_TheRangesOverlap, TAddOffsetToTransactionFixture) { UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::BAD_REQUEST); } -Y_UNIT_TEST_F(DifferentConsumers_TheRangesOverlap, TAddOffsetToTransactionFixture) { - Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({ +Y_UNIT_TEST_F(DifferentConsumers_TheRangesOverlap, TUpdateOffsetsInTransactionFixture) { + Ydb::Topic::UpdateOffsetsInTransactionResponse response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=4, .Offsets={ TOffsetRange{.Begin=1, .End=3}, @@ -276,7 +277,7 @@ Y_UNIT_TEST_F(DifferentConsumers_TheRangesOverlap, TAddOffsetToTransactionFixtur }, "consumer-1"); UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - response = CallAddOffsetsToTransaction({ + response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=4, .Offsets={ TOffsetRange{.Begin=4, .End=7} @@ -286,8 +287,8 @@ Y_UNIT_TEST_F(DifferentConsumers_TheRangesOverlap, TAddOffsetToTransactionFixtur UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); } -Y_UNIT_TEST_F(UnknownConsumer, TAddOffsetToTransactionFixture) { - auto response = CallAddOffsetsToTransaction({ +Y_UNIT_TEST_F(UnknownConsumer, TUpdateOffsetsInTransactionFixture) { + auto response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=4, .Offsets={ TOffsetRange{.Begin=4, .End=7} @@ -297,8 +298,8 @@ Y_UNIT_TEST_F(UnknownConsumer, TAddOffsetToTransactionFixture) { UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::BAD_REQUEST); } -Y_UNIT_TEST_F(UnknownTopic, TAddOffsetToTransactionFixture) { - auto response = CallAddOffsetsToTransaction({ +Y_UNIT_TEST_F(UnknownTopic, TUpdateOffsetsInTransactionFixture) { + auto response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=INVALID_TOPIC_PATH, .Partitions={ TPartition{.Id=4, .Offsets={ TOffsetRange{.Begin=4, .End=7} @@ -308,16 +309,16 @@ Y_UNIT_TEST_F(UnknownTopic, TAddOffsetToTransactionFixture) { UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SCHEME_ERROR); } -Y_UNIT_TEST_F(UseDoubleSlashInTopicPath, TAddOffsetToTransactionFixture) { +Y_UNIT_TEST_F(UseDoubleSlashInTopicPath, TUpdateOffsetsInTransactionFixture) { TestTopicPaths("//Root//PQ//rt3.dc1--topic1", "/Root/PQ/rt3.dc1--topic1"); } -Y_UNIT_TEST_F(RelativePath, TAddOffsetToTransactionFixture) { +Y_UNIT_TEST_F(RelativePath, TUpdateOffsetsInTransactionFixture) { TestTopicPaths("PQ/rt3.dc1--topic1", "/Root/PQ/rt3.dc1--topic1"); } -Y_UNIT_TEST_F(AccessRights, TAddOffsetToTransactionFixture) { - auto response = CallAddOffsetsToTransaction({ +Y_UNIT_TEST_F(AccessRights, TUpdateOffsetsInTransactionFixture) { + auto response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=4, .Offsets={ TOffsetRange{.Begin=4, .End=7} @@ -330,7 +331,7 @@ Y_UNIT_TEST_F(AccessRights, TAddOffsetToTransactionFixture) { acl.RemoveAccess(NACLib::EAccessType::Allow, NACLib::ReadAttributes, AUTH_TOKEN); server->AnnoyingClient->ModifyACL(TOPIC_PARENT, VALID_TOPIC_NAME, acl.SerializeAsString()); - response = CallAddOffsetsToTransaction({ + response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=4, .Offsets={ TOffsetRange{.Begin=14, .End=17} @@ -340,8 +341,8 @@ Y_UNIT_TEST_F(AccessRights, TAddOffsetToTransactionFixture) { UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::UNAUTHORIZED); } -Y_UNIT_TEST_F(ThereAreGapsInTheOffsetRanges, TAddOffsetToTransactionFixture) { - auto response = CallAddOffsetsToTransaction({ +Y_UNIT_TEST_F(ThereAreGapsInTheOffsetRanges, TUpdateOffsetsInTransactionFixture) { + auto response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=1, .Offsets={ TOffsetRange{.Begin=0, .End=2}, @@ -356,8 +357,8 @@ Y_UNIT_TEST_F(ThereAreGapsInTheOffsetRanges, TAddOffsetToTransactionFixture) { UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED); } -Y_UNIT_TEST_F(OnePartitionAndNoGapsInTheOffsets, TAddOffsetToTransactionFixture) { - auto response = CallAddOffsetsToTransaction({ +Y_UNIT_TEST_F(OnePartitionAndNoGapsInTheOffsets, TUpdateOffsetsInTransactionFixture) { + auto response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=1, .Offsets={ TOffsetRange{.Begin=0, .End=2} @@ -366,7 +367,7 @@ Y_UNIT_TEST_F(OnePartitionAndNoGapsInTheOffsets, TAddOffsetToTransactionFixture) }); UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - response = CallAddOffsetsToTransaction({ + response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=1, .Offsets={ TOffsetRange{.Begin=4, .End=6} @@ -375,7 +376,7 @@ Y_UNIT_TEST_F(OnePartitionAndNoGapsInTheOffsets, TAddOffsetToTransactionFixture) }); UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - response = CallAddOffsetsToTransaction({ + response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=1, .Offsets={ TOffsetRange{.Begin=2, .End=4} @@ -391,8 +392,8 @@ Y_UNIT_TEST_F(OnePartitionAndNoGapsInTheOffsets, TAddOffsetToTransactionFixture) UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST); } -Y_UNIT_TEST_F(MultiplePartitionsAndNoGapsInTheOffsets, TAddOffsetToTransactionFixture) { - auto response = CallAddOffsetsToTransaction({ +Y_UNIT_TEST_F(MultiplePartitionsAndNoGapsInTheOffsets, TUpdateOffsetsInTransactionFixture) { + auto response = Call_UpdateOffsetsInTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=1, .Offsets={ TOffsetRange{.Begin=0, .End=2} |