aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-02-07 12:47:29 +0300
committerabcdef <akotov@ydb.tech>2023-02-07 12:47:29 +0300
commit02d113168a3b5ec0f6ce872057cd39f1c6dc1300 (patch)
tree1a47678087de1acee8d4fc9390fafd87aa3719da
parent5bb4d425bb72b388d18ada7bac07e5f2d8f1b9d6 (diff)
downloadydb-02d113168a3b5ec0f6ce872057cd39f1c6dc1300.tar.gz
-rw-r--r--ydb/core/grpc_services/service_table.h2
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/kqp.proto2
-rw-r--r--ydb/core/testlib/basics/feature_flags.h1
-rw-r--r--ydb/core/testlib/test_client.cpp1
-rw-r--r--ydb/public/api/grpc/draft/CMakeLists.darwin.txt1
-rw-r--r--ydb/public/api/grpc/draft/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/public/api/grpc/draft/CMakeLists.linux.txt1
-rw-r--r--ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto16
-rw-r--r--ydb/public/api/grpc/ydb_topic_v1.proto3
-rw-r--r--ydb/public/api/protos/ydb_topic.proto28
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt2
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.linux.txt2
-rw-r--r--ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp (renamed from ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp)49
-rw-r--r--ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.h (renamed from ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.h)13
-rw-r--r--ydb/services/persqueue_v1/topic.cpp84
-rw-r--r--ydb/services/persqueue_v1/topic.h41
-rw-r--r--ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp50
-rw-r--r--ydb/services/persqueue_v1/ut/topic_service_ut.cpp101
20 files changed, 166 insertions, 235 deletions
diff --git a/ydb/core/grpc_services/service_table.h b/ydb/core/grpc_services/service_table.h
index dd7dd90035..adf8493daa 100644
--- a/ydb/core/grpc_services/service_table.h
+++ b/ydb/core/grpc_services/service_table.h
@@ -31,7 +31,5 @@ void DoDescribeTableOptionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFaci
void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
void DoExecuteScanQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&);
-void DoAddOffsetsToTransaction(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
-
}
}
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 96110e8f8f..24d42aa234 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -764,6 +764,7 @@ message TFeatureFlags {
reserved 83; // EnableKqpDataQuerySourceRead
optional bool EnableSmallDiskOptimization = 84 [default = true];
optional bool EnableDataShardVolatileTransactions = 85 [default = false];
+ optional bool EnableTopicServiceTx = 86 [default = false];
}
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 06485cb653..58c9ba8b75 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -69,7 +69,7 @@ enum EQueryReplyFlags {
message TTopicOperations {
optional string Consumer = 1;
- repeated Ydb.Topic.AddOffsetsToTransactionRequest.TopicOffsets Topics = 2;
+ repeated Ydb.Topic.UpdateOffsetsInTransactionRequest.TopicOffsets Topics = 2;
}
message TQueryRequest {
diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h
index 48eeae29cb..c4c47f2957 100644
--- a/ydb/core/testlib/basics/feature_flags.h
+++ b/ydb/core/testlib/basics/feature_flags.h
@@ -45,6 +45,7 @@ public:
FEATURE_FLAG_SETTER(EnableDataShardGenericReadSets)
FEATURE_FLAG_SETTER(EnableAlterDatabaseCreateHiveFirst)
FEATURE_FLAG_SETTER(EnableDataShardVolatileTransactions)
+ FEATURE_FLAG_SETTER(EnableTopicServiceTx)
TDerived& SetEnableMvcc(std::optional<bool> value) {
if (value) {
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index fadef0571b..6d7b66eadc 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -329,7 +329,6 @@ namespace Tests {
GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxyId, true));
GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true));
GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicServiceTx(system, counters, grpcRequestProxyId));
GRpcServer->AddService(new NGRpcService::TGRpcPQClusterDiscoveryService(system, counters, grpcRequestProxyId));
GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxyId, true));
GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxyId, true));
diff --git a/ydb/public/api/grpc/draft/CMakeLists.darwin.txt b/ydb/public/api/grpc/draft/CMakeLists.darwin.txt
index 6eadd10c9b..c1b1013216 100644
--- a/ydb/public/api/grpc/draft/CMakeLists.darwin.txt
+++ b/ydb/public/api/grpc/draft/CMakeLists.darwin.txt
@@ -27,7 +27,6 @@ target_proto_messages(api-grpc-draft PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_query_v1.proto
- ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto
)
target_proto_addincls(api-grpc-draft
./
diff --git a/ydb/public/api/grpc/draft/CMakeLists.linux-aarch64.txt b/ydb/public/api/grpc/draft/CMakeLists.linux-aarch64.txt
index a0e5a143d3..a3854476ba 100644
--- a/ydb/public/api/grpc/draft/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/api/grpc/draft/CMakeLists.linux-aarch64.txt
@@ -28,7 +28,6 @@ target_proto_messages(api-grpc-draft PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_query_v1.proto
- ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto
)
target_proto_addincls(api-grpc-draft
./
diff --git a/ydb/public/api/grpc/draft/CMakeLists.linux.txt b/ydb/public/api/grpc/draft/CMakeLists.linux.txt
index a0e5a143d3..a3854476ba 100644
--- a/ydb/public/api/grpc/draft/CMakeLists.linux.txt
+++ b/ydb/public/api/grpc/draft/CMakeLists.linux.txt
@@ -28,7 +28,6 @@ target_proto_messages(api-grpc-draft PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_query_v1.proto
- ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto
)
target_proto_addincls(api-grpc-draft
./
diff --git a/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto b/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto
deleted file mode 100644
index 120f32f500..0000000000
--- a/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto
+++ /dev/null
@@ -1,16 +0,0 @@
-syntax = "proto3";
-option cc_enable_arenas = true;
-
-package Ydb.Topic.V1;
-
-option java_package = "com.yandex.ydb.topic.v1";
-
-import "ydb/public/api/protos/ydb_topic.proto";
-
-service TopicServiceTx {
- // Add offsets to transaction
- rpc AddOffsetsToTransaction(AddOffsetsToTransactionRequest) returns (AddOffsetsToTransactionResponse);
-}
-
-
-
diff --git a/ydb/public/api/grpc/ydb_topic_v1.proto b/ydb/public/api/grpc/ydb_topic_v1.proto
index c991df6208..e2ecd59273 100644
--- a/ydb/public/api/grpc/ydb_topic_v1.proto
+++ b/ydb/public/api/grpc/ydb_topic_v1.proto
@@ -68,6 +68,9 @@ service TopicService {
// Single commit offset request.
rpc CommitOffset(CommitOffsetRequest) returns (CommitOffsetResponse);
+ // Add information about offset ranges to the transaction.
+ rpc UpdateOffsetsInTransaction(UpdateOffsetsInTransactionRequest) returns (UpdateOffsetsInTransactionResponse);
+
// Create topic command.
rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse);
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
index 5ade5338e7..2492635ead 100644
--- a/ydb/public/api/protos/ydb_topic.proto
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -133,6 +133,8 @@ message StreamWriteMessage {
// See enum Codec above for values.
int32 codec = 2;
+ optional TransactionIdentity tx = 3;
+
message MessageData {
// Message sequence number, provided by client for deduplication.
// Starts at 1
@@ -498,20 +500,24 @@ message StreamReadMessage {
}
}
-// Add offsets to transaction request sent from client to server.
-message AddOffsetsToTransactionRequest {
- Ydb.Operations.OperationParams operation_params = 1;
+message TransactionIdentity {
+ // Transaction identifier from TableService.
+ string id = 1;
// Session identifier from TableService.
- string session_id = 2;
+ string session = 2;
+}
- // Transaction identifier from TableService.
- Ydb.Table.TransactionControl tx_control = 3;
+// Add offsets to transaction request sent from client to server.
+message UpdateOffsetsInTransactionRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+
+ TransactionIdentity tx = 2;
// Ranges of offsets by topics.
- repeated TopicOffsets topics = 4;
+ repeated TopicOffsets topics = 3;
- string consumer = 5;
+ string consumer = 4;
message TopicOffsets {
// Topic path.
@@ -531,13 +537,13 @@ message AddOffsetsToTransactionRequest {
}
// Add offsets to transaction response sent from server to client.
-message AddOffsetsToTransactionResponse {
+message UpdateOffsetsInTransactionResponse {
// Result of request will be inside operation.
Ydb.Operations.Operation operation = 1;
}
-// Add offsets to transaction result message that will be inside AddOffsetsToTransactionResponse.operation.
-message AddOffsetsToTransactionResult {
+// Add offsets to transaction result message that will be inside UpdateOffsetsInTransactionResponse.operation.
+message UpdateOffsetsInTransactionResult {
}
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt
index a43e8508cc..0a9aa85772 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt
@@ -37,5 +37,5 @@ target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp
- ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp
)
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt
index afde3c630d..7c0aa49098 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt
@@ -38,5 +38,5 @@ target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp
- ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp
)
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt
index afde3c630d..7c0aa49098 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt
@@ -38,5 +38,5 @@ target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp
- ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp
)
diff --git a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp b/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp
index 5c962458e8..2586714c65 100644
--- a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp
@@ -1,30 +1,44 @@
-#include "add_offsets_to_transaction_actor.h"
+#include "update_offsets_in_transaction_actor.h"
namespace NKikimr::NGRpcService {
-TAddOffsetsToTransactionActor::TAddOffsetsToTransactionActor(IRequestOpCtx* request)
+TUpdateOffsetsInTransactionActor::TUpdateOffsetsInTransactionActor(IRequestOpCtx* request)
: TBase{request}
{
}
-void TAddOffsetsToTransactionActor::Bootstrap(const NActors::TActorContext& ctx)
+void TUpdateOffsetsInTransactionActor::Bootstrap(const NActors::TActorContext& ctx)
{
TBase::Bootstrap(ctx);
- Become(&TAddOffsetsToTransactionActor::StateWork);
+ Become(&TUpdateOffsetsInTransactionActor::StateWork);
Proceed(ctx);
}
-void TAddOffsetsToTransactionActor::Proceed(const NActors::TActorContext& ctx)
+void TUpdateOffsetsInTransactionActor::Proceed(const NActors::TActorContext& ctx)
{
+ if (!AppData(ctx)->FeatureFlags.GetEnableTopicServiceTx()) {
+ return Reply(Ydb::StatusIds::UNSUPPORTED,
+ "Disabled transaction support for TopicService.",
+ NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
+ ctx);
+ }
+
const auto req = GetProtoRequest();
+ if (!req->has_tx()) {
+ return Reply(Ydb::StatusIds::BAD_REQUEST,
+ "Empty tx.",
+ NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
+ ctx);
+ }
+
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
SetAuthToken(ev, *Request_);
SetDatabase(ev, *Request_);
NYql::TIssues issues;
- if (CheckSession(req->session_id(), issues)) {
- ev->Record.MutableRequest()->SetSessionId(req->session_id());
+ if (CheckSession(req->tx().session(), issues)) {
+ ev->Record.MutableRequest()->SetSessionId(req->tx().session());
} else {
return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
}
@@ -43,24 +57,16 @@ void TAddOffsetsToTransactionActor::Proceed(const NActors::TActorContext& ctx)
ev->Record.MutableRequest()->SetCancelAfterMs(GetCancelAfter().MilliSeconds());
ev->Record.MutableRequest()->SetTimeoutMs(GetOperationTimeout().MilliSeconds());
- if (!req->has_tx_control()) {
- NYql::TIssues issues;
- issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Empty tx_control."));
- return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
- }
-
- //
- // TODO: проверить комбинацию значений атрибутов tx_control
- //
+ ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(req->tx().id());
- ev->Record.MutableRequest()->MutableTxControl()->CopyFrom(req->tx_control());
ev->Record.MutableRequest()->MutableTopicOperations()->SetConsumer(req->consumer());
*ev->Record.MutableRequest()->MutableTopicOperations()->MutableTopics() = req->topics();
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
}
-void TAddOffsetsToTransactionActor::Handle(const NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx)
+void TUpdateOffsetsInTransactionActor::Handle(const NKqp::TEvKqp::TEvQueryResponse::TPtr& ev,
+ const NActors::TActorContext& ctx)
{
const auto& record = ev->Get()->Record.GetRef();
SetCost(record.GetConsumedRu());
@@ -69,7 +75,7 @@ void TAddOffsetsToTransactionActor::Handle(const NKqp::TEvKqp::TEvQueryResponse:
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
const auto& kqpResponse = record.GetResponse();
const auto& issueMessage = kqpResponse.GetQueryIssues();
- auto queryResult = TEvAddOffsetsToTransactionRequest::AllocateResult<Ydb::Topic::AddOffsetsToTransactionResult>(Request_);
+ auto queryResult = TEvUpdateOffsetsInTransactionRequest::AllocateResult<Ydb::Topic::UpdateOffsetsInTransactionResult>(Request_);
//
// TODO: сохранить результат
@@ -89,9 +95,4 @@ void TAddOffsetsToTransactionActor::Handle(const NKqp::TEvKqp::TEvQueryResponse:
}
}
-void DoAddOffsetsToTransaction(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &)
-{
- TActivationContext::AsActorContext().Register(new TAddOffsetsToTransactionActor(p.release()));
-}
-
}
diff --git a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.h b/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.h
index 2e82458709..3b905e294b 100644
--- a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.h
+++ b/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.h
@@ -12,15 +12,16 @@
namespace NKikimr::NGRpcService {
-using TEvAddOffsetsToTransactionRequest =
- TGrpcRequestOperationCall<Ydb::Topic::AddOffsetsToTransactionRequest, Ydb::Topic::AddOffsetsToTransactionResponse>;
+using TEvUpdateOffsetsInTransactionRequest =
+ TGrpcRequestOperationCall<Ydb::Topic::UpdateOffsetsInTransactionRequest, Ydb::Topic::UpdateOffsetsInTransactionResponse>;
-class TAddOffsetsToTransactionActor : public TRpcKqpRequestActor<TAddOffsetsToTransactionActor, TEvAddOffsetsToTransactionRequest> {
+class TUpdateOffsetsInTransactionActor :
+ public TRpcKqpRequestActor<TUpdateOffsetsInTransactionActor, TEvUpdateOffsetsInTransactionRequest> {
public:
- using TBase = TRpcKqpRequestActor<TAddOffsetsToTransactionActor, TEvAddOffsetsToTransactionRequest>;
- using TResult = Ydb::Topic::AddOffsetsToTransactionResult;
+ using TBase = TRpcKqpRequestActor<TUpdateOffsetsInTransactionActor, TEvUpdateOffsetsInTransactionRequest>;
+ using TResult = Ydb::Topic::UpdateOffsetsInTransactionResult;
- explicit TAddOffsetsToTransactionActor(IRequestOpCtx* msg);
+ explicit TUpdateOffsetsInTransactionActor(IRequestOpCtx* msg);
void Bootstrap(const NActors::TActorContext& ctx);
diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp
index 464c8a9341..cb286e2fa6 100644
--- a/ydb/services/persqueue_v1/topic.cpp
+++ b/ydb/services/persqueue_v1/topic.cpp
@@ -7,21 +7,26 @@
#include <ydb/core/grpc_services/service_table.h>
#include <ydb/core/tx/scheme_board/cache.h>
+#include "actors/update_offsets_in_transaction_actor.h"
+
#include "grpc_pq_read.h"
#include "grpc_pq_write.h"
#include "grpc_pq_schema.h"
-namespace NKikimr {
-namespace NGRpcService {
-namespace V1 {
+namespace NKikimr::NGRpcService::V1 {
static const ui32 TopicWriteSessionsMaxCount = 1000000;
static const ui32 TopicReadSessionsMaxCount = 100000;
-TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache,const NActors::TActorId& grpcRequestProxy, bool rlAllowed)
+TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const NActors::TActorId& schemeCache,
+ const NActors::TActorId& grpcRequestProxy,
+ bool rlAllowed)
: TGrpcServiceBase<Ydb::Topic::V1::TopicService>(system, counters, grpcRequestProxy, rlAllowed)
, SchemeCache(schemeCache)
-{ }
+{
+}
void TGRpcTopicService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
CQ_ = cq;
@@ -53,6 +58,11 @@ void TGRpcTopicService::InitNewSchemeCacheActor() {
TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
}
+void TGRpcTopicService::DoUpdateOffsetsInTransaction(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &)
+{
+ TActivationContext::AsActorContext().Register(new TUpdateOffsetsInTransactionActor(p.release()));
+}
+
void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters_, ActorSystem_);
@@ -128,71 +138,29 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribeConsumerRequest(ctx, IsRlAllowed()));
})
#undef ADD_REQUEST
-}
-
-void TGRpcTopicService::StopService() noexcept {
- TGrpcServiceBase::StopService();
-}
-
-//
-// TGRpcTopicServiceTx
-//
-TGRpcTopicServiceTx::TGRpcTopicServiceTx(NActors::TActorSystem *system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- const NActors::TActorId& grpcRequestProxy)
- : ActorSystem(system)
- , Counters(counters)
- , GRpcRequestProxy(grpcRequestProxy)
-{
-}
-
-void TGRpcTopicServiceTx::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ = cq;
-
- if (ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled()) {
- SetupIncomingRequests(std::move(logger));
- }
-}
-
-void TGRpcTopicServiceTx::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter = limiter;
-}
-
-bool TGRpcTopicServiceTx::IncRequest() {
- return Limiter->Inc();
-}
-
-void TGRpcTopicServiceTx::DecRequest() {
- Limiter->Dec();
-}
-
-void TGRpcTopicServiceTx::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
- auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters, ActorSystem);
#ifdef ADD_REQUEST_LIMIT
#error ADD_REQUEST_LIMIT macro already defined
#endif
#define ADD_REQUEST_LIMIT(NAME, CB, LIMIT_TYPE) \
- MakeIntrusive<TGRpcRequest<Ydb::Topic::NAME##Request, Ydb::Topic::NAME##Response, TGRpcTopicServiceTx>> \
- (this, this->GetService(), CQ, \
- [this](NGrpc::IRequestContextBase *ctx) { \
- NGRpcService::ReportGrpcReqToMon(*ActorSystem, ctx->GetPeer()); \
- ActorSystem->Send(GRpcRequestProxy, \
- new TGrpcRequestOperationCall<Ydb::Topic::NAME##Request, Ydb::Topic::NAME##Response> \
- (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::LIMIT_TYPE, nullptr})); \
- }, &Ydb::Topic::V1::TopicServiceTx::AsyncService::Request ## NAME, \
+ MakeIntrusive<TGRpcRequest<Ydb::Topic::NAME##Request, Ydb::Topic::NAME##Response, TGRpcTopicService>> \
+ (this, this->GetService(), CQ_, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new TGrpcRequestOperationCall<Ydb::Topic::NAME##Request, Ydb::Topic::NAME##Response> \
+ (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::LIMIT_TYPE, nullptr})); \
+ }, &Ydb::Topic::V1::TopicService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("topic", #NAME))->Run();
- ADD_REQUEST_LIMIT(AddOffsetsToTransaction, DoAddOffsetsToTransaction, Ru)
+ ADD_REQUEST_LIMIT(UpdateOffsetsInTransaction, DoUpdateOffsetsInTransaction, Ru)
#undef ADD_REQUEST_LIMIT
}
-void TGRpcTopicServiceTx::StopService() noexcept {
+void TGRpcTopicService::StopService() noexcept {
TGrpcServiceBase::StopService();
}
-} // V1
-} // namespace NGRpcService
-} // namespace NKikimr
+} // namespace NKikimr::NGRpcService::V1
diff --git a/ydb/services/persqueue_v1/topic.h b/ydb/services/persqueue_v1/topic.h
index ee5f8be7b1..ba26ae6a6c 100644
--- a/ydb/services/persqueue_v1/topic.h
+++ b/ydb/services/persqueue_v1/topic.h
@@ -3,17 +3,14 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
-#include <ydb/public/api/grpc/draft/ydb_topic_tx_v1.grpc.pb.h>
#include <library/cpp/grpc/server/grpc_server.h>
#include <ydb/core/grpc_services/base/base_service.h>
+#include <ydb/core/grpc_services/base/base.h>
-namespace NKikimr {
-
-namespace NGRpcService {
-namespace V1 {
+namespace NKikimr::NGRpcService::V1 {
class TGRpcTopicService
: public TGrpcServiceBase<Ydb::Topic::V1::TopicService>
@@ -26,44 +23,18 @@ public:
using NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicService>::GetService;
-
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger) override;
void InitNewSchemeCacheActor();
+ static void DoUpdateOffsetsInTransaction(std::unique_ptr<IRequestOpCtx> p,
+ const IFacilityProvider &);
+
NActors::TActorId SchemeCache;
NActors::TActorId NewSchemeCache;
-};
-
-class TGRpcTopicServiceTx
- : public NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicServiceTx>
-{
-public:
- TGRpcTopicServiceTx(NActors::TActorSystem* system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- const NActors::TActorId& grpcRequestProxy);
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
- void StopService() noexcept override;
-
- using NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicServiceTx>::GetService;
-
- bool IncRequest();
- void DecRequest();
-
-private:
- void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
-
- NActors::TActorSystem* ActorSystem;
- grpc::ServerCompletionQueue* CQ = nullptr;
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
- NGrpc::TGlobalLimiter* Limiter = nullptr;
- NActors::TActorId GRpcRequestProxy;
};
-} // namespace V1
-} // namespace NGRpcService
-} // namespace NKikimr
+} // namespace NKikimr::NGRpcService::V1
diff --git a/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp b/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp
index 7a1ab22113..88b3943cd1 100644
--- a/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp
+++ b/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp
@@ -1,4 +1,4 @@
-#include <ydb/public/api/grpc/draft/ydb_topic_tx_v1.grpc.pb.h>
+#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h>
@@ -41,11 +41,11 @@ protected:
void Wait_DataReceivedEvent(TTopicReadSession& reader,
ui64 offset);
- void Call_AddOffsetsToTransaction(const TString& sessionId,
- const TString& txId,
- const TString& consumer,
- ui64 rangeBegin,
- ui64 rangeEnd);
+ void Call_UpdateOffsetsInTransaction(const TString& sessionId,
+ const TString& txId,
+ const TString& consumer,
+ ui64 rangeBegin,
+ ui64 rangeEnd);
const TString CONSUMER = "user";
const TString SHORT_TOPIC_NAME = "demo";
@@ -57,7 +57,7 @@ protected:
const TString TOPIC_PATH = TOPIC_PARENT + "/" + FULL_TOPIC_NAME;
TMaybe<NPersQueue::TTestServer> Server;
- std::unique_ptr<Ydb::Topic::V1::TopicServiceTx::Stub> TopicTxStub;
+ std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> TopicStub;
};
void TImmediateTxFixture::SetUp(NUnitTest::TTestContext&)
@@ -69,7 +69,7 @@ void TImmediateTxFixture::SetUp(NUnitTest::TTestContext&)
void TImmediateTxFixture::CreateTestServer()
{
- Server.ConstructInPlace(PQSettings(0).SetDomainName("Root"));
+ Server.ConstructInPlace(PQSettings(0).SetDomainName("Root").SetEnableTopicServiceTx(true));
Server->EnableLogs({NKikimrServices::FLAT_TX_SCHEMESHARD
, NKikimrServices::PERSQUEUE});
@@ -99,7 +99,7 @@ void TImmediateTxFixture::CreateTopic()
void TImmediateTxFixture::CreateTopicTxStub()
{
auto channel = grpc::CreateChannel("localhost:" + ToString(Server->GrpcPort), grpc::InsecureChannelCredentials());
- TopicTxStub = Ydb::Topic::V1::TopicServiceTx::NewStub(channel);
+ TopicStub = Ydb::Topic::V1::TopicService::NewStub(channel);
}
NYdb::NTable::TSession TImmediateTxFixture::CreateSession()
@@ -178,21 +178,21 @@ void TImmediateTxFixture::Wait_DataReceivedEvent(TTopicReadSession& reader,
UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetOffset(), offset);
}
-void TImmediateTxFixture::Call_AddOffsetsToTransaction(const TString& sessionId,
- const TString& txId,
- const TString& consumer,
- ui64 rangeBegin,
- ui64 rangeEnd)
+void TImmediateTxFixture::Call_UpdateOffsetsInTransaction(const TString& sessionId,
+ const TString& txId,
+ const TString& consumer,
+ ui64 rangeBegin,
+ ui64 rangeEnd)
{
grpc::ClientContext rcontext;
rcontext.AddMetadata("x-ydb-auth-ticket", AUTH_TOKEN);
rcontext.AddMetadata("x-ydb-database", DATABASE);
- Ydb::Topic::AddOffsetsToTransactionRequest request;
- Ydb::Topic::AddOffsetsToTransactionResponse response;
+ Ydb::Topic::UpdateOffsetsInTransactionRequest request;
+ Ydb::Topic::UpdateOffsetsInTransactionResponse response;
- request.set_session_id(sessionId);
- request.mutable_tx_control()->set_tx_id(txId);
+ request.mutable_tx()->set_id(txId);
+ request.mutable_tx()->set_session(sessionId);
request.set_consumer(consumer);
auto *topic = request.mutable_topics()->Add();
@@ -205,9 +205,9 @@ void TImmediateTxFixture::Call_AddOffsetsToTransaction(const TString& sessionId,
range->set_start(rangeBegin);
range->set_end(rangeEnd);
- grpc::Status status = TopicTxStub->AddOffsetsToTransaction(&rcontext,
- request,
- &response);
+ grpc::Status status = TopicStub->UpdateOffsetsInTransaction(&rcontext,
+ request,
+ &response);
UNIT_ASSERT(status.ok());
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
@@ -226,7 +226,7 @@ Y_UNIT_TEST_F(Scenario_1, TImmediateTxFixture)
Wait_DataReceivedEvent(*reader, 0);
Wait_DataReceivedEvent(*reader, 1);
- Call_AddOffsetsToTransaction(session.GetId(), tx.GetId(), CONSUMER, 0, 2);
+ Call_UpdateOffsetsInTransaction(session.GetId(), tx.GetId(), CONSUMER, 0, 2);
CommitTx(tx, NYdb::EStatus::SUCCESS);
}
@@ -242,7 +242,7 @@ Y_UNIT_TEST_F(Scenario_1, TImmediateTxFixture)
Wait_DataReceivedEvent(*reader, 2);
Wait_DataReceivedEvent(*reader, 3);
- Call_AddOffsetsToTransaction(session.GetId(), tx.GetId(), CONSUMER, 2, 4);
+ Call_UpdateOffsetsInTransaction(session.GetId(), tx.GetId(), CONSUMER, 2, 4);
}
{
@@ -266,7 +266,7 @@ Y_UNIT_TEST_F(Scenario_2, TImmediateTxFixture)
Wait_DataReceivedEvent(*reader, 1);
Wait_DataReceivedEvent(*reader, 2);
- Call_AddOffsetsToTransaction(s1.GetId(), t1.GetId(), CONSUMER, 0, 3);
+ Call_UpdateOffsetsInTransaction(s1.GetId(), t1.GetId(), CONSUMER, 0, 3);
}
NYdb::NTable::TSession s2 = CreateSession();
@@ -280,7 +280,7 @@ Y_UNIT_TEST_F(Scenario_2, TImmediateTxFixture)
Wait_DataReceivedEvent(*reader, 0);
Wait_DataReceivedEvent(*reader, 1);
- Call_AddOffsetsToTransaction(s2.GetId(), t2.GetId(), CONSUMER, 0, 2);
+ Call_UpdateOffsetsInTransaction(s2.GetId(), t2.GetId(), CONSUMER, 0, 2);
}
CommitTx(t2, NYdb::EStatus::SUCCESS);
diff --git a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
index 27a01277f3..16882a8c62 100644
--- a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
+++ b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
@@ -1,4 +1,4 @@
-#include <ydb/public/api/grpc/draft/ydb_topic_tx_v1.grpc.pb.h>
+#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h>
@@ -44,8 +44,8 @@ std::unique_ptr<typename T::Stub> CreateServiceStub(const NPersQueue::TTestServe
return stub;
}
-std::unique_ptr<Ydb::Topic::V1::TopicServiceTx::Stub> CreateTopicServiceTxStub(const NPersQueue::TTestServer &server) {
- return CreateServiceStub<Ydb::Topic::V1::TopicServiceTx>(server);
+std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> CreateTopicServiceStub(const NPersQueue::TTestServer &server) {
+ return CreateServiceStub<Ydb::Topic::V1::TopicService>(server);
}
struct TOffsetRange {
@@ -72,7 +72,7 @@ void AppendOffsetsRange(const TOffsetRange& r, google::protobuf::RepeatedPtrFiel
}
void AppendPartition(const TPartition& p,
- google::protobuf::RepeatedPtrField<Ydb::Topic::AddOffsetsToTransactionRequest_TopicOffsets_PartitionOffsets> *partitions)
+ google::protobuf::RepeatedPtrField<Ydb::Topic::UpdateOffsetsInTransactionRequest_TopicOffsets_PartitionOffsets> *partitions)
{
auto* partition = partitions->Add();
@@ -84,7 +84,7 @@ void AppendPartition(const TPartition& p,
}
void AppendTopic(const TTopic &t,
- google::protobuf::RepeatedPtrField<Ydb::Topic::AddOffsetsToTransactionRequest_TopicOffsets> *topics)
+ google::protobuf::RepeatedPtrField<Ydb::Topic::UpdateOffsetsInTransactionRequest_TopicOffsets> *topics)
{
auto* topic = topics->Add();
@@ -95,15 +95,15 @@ void AppendTopic(const TTopic &t,
}
}
-Ydb::Topic::AddOffsetsToTransactionRequest CreateRequest(const TString& session_id,
- const TString& tx_id,
- const TString& consumer,
- const TVector<TTopic>& topics)
+Ydb::Topic::UpdateOffsetsInTransactionRequest CreateRequest(const TString& session_id,
+ const TString& tx_id,
+ const TString& consumer,
+ const TVector<TTopic>& topics)
{
- Ydb::Topic::AddOffsetsToTransactionRequest request;
+ Ydb::Topic::UpdateOffsetsInTransactionRequest request;
- request.set_session_id(session_id);
- request.mutable_tx_control()->set_tx_id(tx_id);
+ request.mutable_tx()->set_id(tx_id);
+ request.mutable_tx()->set_session(session_id);
request.set_consumer(consumer);
@@ -114,12 +114,12 @@ Ydb::Topic::AddOffsetsToTransactionRequest CreateRequest(const TString& session_
return request;
}
-class TAddOffsetToTransactionFixture : public NUnitTest::TBaseFixture {
+class TUpdateOffsetsInTransactionFixture : public NUnitTest::TBaseFixture {
protected:
TMaybe<NPersQueue::TTestServer> server;
TMaybe<NYdb::NTable::TSession> session;
TMaybe<NYdb::NTable::TTransaction> tx;
- std::unique_ptr<Ydb::Topic::V1::TopicServiceTx::Stub> stub;
+ std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> stub;
const TString DATABASE = "/Root";
const TString TOPIC_PARENT = "/Root/PQ";
@@ -137,6 +137,7 @@ protected:
void SetUp(NUnitTest::TTestContext&) override {
server = NPersQueue::TTestServer(false);
server->ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(true);
+ server->ServerSettings.SetEnableTopicServiceTx(true);
server->StartServer();
server->EnableLogs({NKikimrServices::PQ_WRITE_PROXY
, NKikimrServices::PQ_READ_PROXY
@@ -169,21 +170,21 @@ protected:
session = CreateSession(*ydbDriver);
tx = BeginTransaction(*session);
- stub = CreateTopicServiceTxStub(*server);
+ stub = CreateTopicServiceStub(*server);
}
- Ydb::Topic::AddOffsetsToTransactionResponse CallAddOffsetsToTransaction(const TVector<TTopic>& topics,
- const TString& consumer = "c0nsumer") {
+ Ydb::Topic::UpdateOffsetsInTransactionResponse Call_UpdateOffsetsInTransaction(const TVector<TTopic>& topics,
+ const TString& consumer = "c0nsumer") {
grpc::ClientContext rcontext;
rcontext.AddMetadata("x-ydb-auth-ticket", AUTH_TOKEN);
rcontext.AddMetadata("x-ydb-database", DATABASE);
- Ydb::Topic::AddOffsetsToTransactionResponse response;
+ Ydb::Topic::UpdateOffsetsInTransactionResponse response;
- grpc::Status status = stub->AddOffsetsToTransaction(&rcontext,
- CreateRequest(session->GetId(), tx->GetId(),
- consumer, topics),
- &response);
+ grpc::Status status = stub->UpdateOffsetsInTransaction(&rcontext,
+ CreateRequest(session->GetId(), tx->GetId(),
+ consumer, topics),
+ &response);
UNIT_ASSERT(status.ok());
return response;
@@ -194,7 +195,7 @@ protected:
const auto BEGIN = 4;
const auto END = 7;
- auto response = CallAddOffsetsToTransaction({
+ auto response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=path1, .Partitions={
TPartition{.Id=PARTITION_ID, .Offsets={
TOffsetRange{.Begin=BEGIN, .End=END}
@@ -203,7 +204,7 @@ protected:
});
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
- response = CallAddOffsetsToTransaction({
+ response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=path2, .Partitions={
TPartition{.Id=PARTITION_ID, .Offsets={
TOffsetRange{.Begin=BEGIN, .End=END}
@@ -214,8 +215,8 @@ protected:
}
};
-Y_UNIT_TEST_F(OneConsumer_TheRangesDoNotOverlap, TAddOffsetToTransactionFixture) {
- Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({
+Y_UNIT_TEST_F(OneConsumer_TheRangesDoNotOverlap, TUpdateOffsetsInTransactionFixture) {
+ Ydb::Topic::UpdateOffsetsInTransactionResponse response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=4, .Offsets={
TOffsetRange{.Begin=1, .End=3},
@@ -228,7 +229,7 @@ Y_UNIT_TEST_F(OneConsumer_TheRangesDoNotOverlap, TAddOffsetToTransactionFixture)
});
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
- response = CallAddOffsetsToTransaction({
+ response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=1, .Offsets={
TOffsetRange{.Begin=8, .End=11}
@@ -238,8 +239,8 @@ Y_UNIT_TEST_F(OneConsumer_TheRangesDoNotOverlap, TAddOffsetToTransactionFixture)
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
}
-Y_UNIT_TEST_F(OneConsumer_TheRangesOverlap, TAddOffsetToTransactionFixture) {
- Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({
+Y_UNIT_TEST_F(OneConsumer_TheRangesOverlap, TUpdateOffsetsInTransactionFixture) {
+ Ydb::Topic::UpdateOffsetsInTransactionResponse response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=4, .Offsets={
TOffsetRange{.Begin=1, .End=3},
@@ -252,7 +253,7 @@ Y_UNIT_TEST_F(OneConsumer_TheRangesOverlap, TAddOffsetToTransactionFixture) {
});
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
- response = CallAddOffsetsToTransaction({
+ response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=4, .Offsets={
TOffsetRange{.Begin=4, .End=7}
@@ -262,8 +263,8 @@ Y_UNIT_TEST_F(OneConsumer_TheRangesOverlap, TAddOffsetToTransactionFixture) {
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::BAD_REQUEST);
}
-Y_UNIT_TEST_F(DifferentConsumers_TheRangesOverlap, TAddOffsetToTransactionFixture) {
- Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({
+Y_UNIT_TEST_F(DifferentConsumers_TheRangesOverlap, TUpdateOffsetsInTransactionFixture) {
+ Ydb::Topic::UpdateOffsetsInTransactionResponse response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=4, .Offsets={
TOffsetRange{.Begin=1, .End=3},
@@ -276,7 +277,7 @@ Y_UNIT_TEST_F(DifferentConsumers_TheRangesOverlap, TAddOffsetToTransactionFixtur
}, "consumer-1");
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
- response = CallAddOffsetsToTransaction({
+ response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=4, .Offsets={
TOffsetRange{.Begin=4, .End=7}
@@ -286,8 +287,8 @@ Y_UNIT_TEST_F(DifferentConsumers_TheRangesOverlap, TAddOffsetToTransactionFixtur
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
}
-Y_UNIT_TEST_F(UnknownConsumer, TAddOffsetToTransactionFixture) {
- auto response = CallAddOffsetsToTransaction({
+Y_UNIT_TEST_F(UnknownConsumer, TUpdateOffsetsInTransactionFixture) {
+ auto response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=4, .Offsets={
TOffsetRange{.Begin=4, .End=7}
@@ -297,8 +298,8 @@ Y_UNIT_TEST_F(UnknownConsumer, TAddOffsetToTransactionFixture) {
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::BAD_REQUEST);
}
-Y_UNIT_TEST_F(UnknownTopic, TAddOffsetToTransactionFixture) {
- auto response = CallAddOffsetsToTransaction({
+Y_UNIT_TEST_F(UnknownTopic, TUpdateOffsetsInTransactionFixture) {
+ auto response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=INVALID_TOPIC_PATH, .Partitions={
TPartition{.Id=4, .Offsets={
TOffsetRange{.Begin=4, .End=7}
@@ -308,16 +309,16 @@ Y_UNIT_TEST_F(UnknownTopic, TAddOffsetToTransactionFixture) {
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SCHEME_ERROR);
}
-Y_UNIT_TEST_F(UseDoubleSlashInTopicPath, TAddOffsetToTransactionFixture) {
+Y_UNIT_TEST_F(UseDoubleSlashInTopicPath, TUpdateOffsetsInTransactionFixture) {
TestTopicPaths("//Root//PQ//rt3.dc1--topic1", "/Root/PQ/rt3.dc1--topic1");
}
-Y_UNIT_TEST_F(RelativePath, TAddOffsetToTransactionFixture) {
+Y_UNIT_TEST_F(RelativePath, TUpdateOffsetsInTransactionFixture) {
TestTopicPaths("PQ/rt3.dc1--topic1", "/Root/PQ/rt3.dc1--topic1");
}
-Y_UNIT_TEST_F(AccessRights, TAddOffsetToTransactionFixture) {
- auto response = CallAddOffsetsToTransaction({
+Y_UNIT_TEST_F(AccessRights, TUpdateOffsetsInTransactionFixture) {
+ auto response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=4, .Offsets={
TOffsetRange{.Begin=4, .End=7}
@@ -330,7 +331,7 @@ Y_UNIT_TEST_F(AccessRights, TAddOffsetToTransactionFixture) {
acl.RemoveAccess(NACLib::EAccessType::Allow, NACLib::ReadAttributes, AUTH_TOKEN);
server->AnnoyingClient->ModifyACL(TOPIC_PARENT, VALID_TOPIC_NAME, acl.SerializeAsString());
- response = CallAddOffsetsToTransaction({
+ response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=4, .Offsets={
TOffsetRange{.Begin=14, .End=17}
@@ -340,8 +341,8 @@ Y_UNIT_TEST_F(AccessRights, TAddOffsetToTransactionFixture) {
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::UNAUTHORIZED);
}
-Y_UNIT_TEST_F(ThereAreGapsInTheOffsetRanges, TAddOffsetToTransactionFixture) {
- auto response = CallAddOffsetsToTransaction({
+Y_UNIT_TEST_F(ThereAreGapsInTheOffsetRanges, TUpdateOffsetsInTransactionFixture) {
+ auto response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=1, .Offsets={
TOffsetRange{.Begin=0, .End=2},
@@ -356,8 +357,8 @@ Y_UNIT_TEST_F(ThereAreGapsInTheOffsetRanges, TAddOffsetToTransactionFixture) {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED);
}
-Y_UNIT_TEST_F(OnePartitionAndNoGapsInTheOffsets, TAddOffsetToTransactionFixture) {
- auto response = CallAddOffsetsToTransaction({
+Y_UNIT_TEST_F(OnePartitionAndNoGapsInTheOffsets, TUpdateOffsetsInTransactionFixture) {
+ auto response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=1, .Offsets={
TOffsetRange{.Begin=0, .End=2}
@@ -366,7 +367,7 @@ Y_UNIT_TEST_F(OnePartitionAndNoGapsInTheOffsets, TAddOffsetToTransactionFixture)
});
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
- response = CallAddOffsetsToTransaction({
+ response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=1, .Offsets={
TOffsetRange{.Begin=4, .End=6}
@@ -375,7 +376,7 @@ Y_UNIT_TEST_F(OnePartitionAndNoGapsInTheOffsets, TAddOffsetToTransactionFixture)
});
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
- response = CallAddOffsetsToTransaction({
+ response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=1, .Offsets={
TOffsetRange{.Begin=2, .End=4}
@@ -391,8 +392,8 @@ Y_UNIT_TEST_F(OnePartitionAndNoGapsInTheOffsets, TAddOffsetToTransactionFixture)
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST);
}
-Y_UNIT_TEST_F(MultiplePartitionsAndNoGapsInTheOffsets, TAddOffsetToTransactionFixture) {
- auto response = CallAddOffsetsToTransaction({
+Y_UNIT_TEST_F(MultiplePartitionsAndNoGapsInTheOffsets, TUpdateOffsetsInTransactionFixture) {
+ auto response = Call_UpdateOffsetsInTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=1, .Offsets={
TOffsetRange{.Begin=0, .End=2}