diff options
author | ildar-khisam <[email protected]> | 2023-02-06 11:00:01 +0300 |
---|---|---|
committer | ildar-khisam <[email protected]> | 2023-02-06 11:00:01 +0300 |
commit | f60b1f4471381f9dbf366d94de6e4c713c1a4772 (patch) | |
tree | 8a2cdcf93e01460b48be5f0efe8cbeb40b3f468d | |
parent | 5dbceb1380cee5f5e1623fff06beaf9e6012e2c2 (diff) |
propose commit offset api
propose commit offset api
25 files changed, 706 insertions, 14 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 28b75d5fc08..777f082c8cc 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -105,6 +105,7 @@ struct TRpcServices { EvStreamTopicWrite, EvStreamTopicRead, EvPQReadInfo, + EvTopicCommitOffset, EvListOperations, EvExportToYt, EvDiscoverPQClusters, diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index 778b424ab1c..beac70be059 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -536,6 +536,7 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo HFunc(TEvStreamPQMigrationReadRequest, PreHandle); HFunc(TEvStreamTopicWriteRequest, PreHandle); HFunc(TEvStreamTopicReadRequest, PreHandle); + HFunc(TEvCommitOffsetRequest, PreHandle); HFunc(TEvPQReadInfoRequest, PreHandle); HFunc(TEvPQDropTopicRequest, PreHandle); HFunc(TEvPQCreateTopicRequest, PreHandle); diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h index d1390c47b81..fe75617184f 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.h +++ b/ydb/core/grpc_services/grpc_request_proxy.h @@ -55,6 +55,7 @@ protected: void Handle(TEvStreamPQMigrationReadRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvStreamTopicWriteRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvStreamTopicReadRequest::TPtr& ev, const TActorContext& ctx); + void Handle(TEvCommitOffsetRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h index 332e21ff47c..5a666d68ed3 100644 --- a/ydb/core/grpc_services/rpc_calls.h +++ b/ydb/core/grpc_services/rpc_calls.h @@ -53,6 +53,7 @@ using TEvStreamPQWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStre using TEvStreamPQMigrationReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQMigrationRead, Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>; using TEvStreamTopicWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicWrite, Ydb::Topic::StreamWriteMessage::FromClient, Ydb::Topic::StreamWriteMessage::FromServer, TRateLimiterMode::RuManual>; using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicRead, Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer, TRateLimiterMode::RuManual>; +using TEvCommitOffsetRequest = TGRpcRequestWrapper<TRpcServices::EvTopicCommitOffset, Ydb::Topic::CommitOffsetRequest, Ydb::Topic::CommitOffsetResponse, true>; using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>; using TEvPQDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDropTopic, Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse, true>; using TEvPQCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQCreateTopic, Ydb::PersQueue::V1::CreateTopicRequest, Ydb::PersQueue::V1::CreateTopicResponse, true>; diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index bd0b5c7894e..83cbd94bd59 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -253,7 +253,8 @@ struct TEvPQ { }; TEvSetClientInfo(const ui64 cookie, const TString& clientId, const ui64 offset, const TString& sessionId, - const ui32 generation, const ui32 step, ESetClientInfoType type = ESCI_OFFSET, ui64 readRuleGeneration = 0) + const ui32 generation, const ui32 step, ESetClientInfoType type = ESCI_OFFSET, + ui64 readRuleGeneration = 0, bool strict = false) : Cookie(cookie) , ClientId(clientId) , Offset(offset) @@ -262,6 +263,7 @@ struct TEvPQ { , Step(step) , Type(type) , ReadRuleGeneration(readRuleGeneration) + , Strict(strict) { } @@ -273,6 +275,7 @@ struct TEvPQ { ui32 Step; ESetClientInfoType Type; ui64 ReadRuleGeneration; + bool Strict; }; struct TEvGetClientOffset : public TEventLocal<TEvGetClientOffset, EvGetClientOffset> { @@ -630,12 +633,12 @@ struct TEvPQ { TEvInitCredentials() {} }; - + struct TEvCredentialsCreated : public TEventLocal<TEvCredentialsCreated, EvCredentialsCreated> { TEvCredentialsCreated(const TString& error) : Error(error) {} - + TEvCredentialsCreated(std::shared_ptr<NYdb::ICredentialsProviderFactory> credentials) : Credentials(credentials) {} diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 50e1a30a3a4..eb250216446 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -3723,7 +3723,7 @@ size_t TPartition::GetUserActCount(const TString& consumer) const return i->second; } else { return 0; - } + } } void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx) @@ -4083,7 +4083,7 @@ void TPartition::ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx, return; } - TUserInfo& userInfo = GetOrCreatePendingUser(user, ctx); + TUserInfo& userInfo = GetOrCreatePendingUser(user, ctx); if (operation.GetBegin() > operation.GetEnd()) { ScheduleReplyPropose(tx, @@ -4128,6 +4128,7 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act, Y_VERIFY(!UsersInfoWriteInProgress); const TString& user = act.ClientId; + const bool strictCommitOffset = (act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET && act.Strict); if (!PendingUsersInfo.contains(user) && AffectedUsers.contains(user)) { switch (act.Type) { @@ -4187,6 +4188,16 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act, return; } + if (strictCommitOffset && act.Offset < StartOffset) { + // strict commit to past, reply error + TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1); + ScheduleReplyError(act.Cookie, + NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_PAST, + TStringBuilder() << "set offset " << act.Offset << " to past for consumer " << act.ClientId << " actual start offset is " << StartOffset); + + return; + } + //request in correct session - make it ui64 offset = (act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET ? act.Offset : userInfo.Offset); @@ -4205,7 +4216,15 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act, Y_VERIFY(offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, offset); if (offset > EndOffset) { - LOG_ERROR_S( + if (strictCommitOffset) { + TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1); + ScheduleReplyError(act.Cookie, + NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_FUTURE, + TStringBuilder() << "strict commit can't set offset " << act.Offset << " to future, consumer " << act.ClientId << ", actual end offset is " << EndOffset); + + return; + } + LOG_WARN_S( ctx, NKikimrServices::PERSQUEUE, "commit to future - topic " << TopicConverter->GetClientsideName() << " partition " << Partition << " client " << act.ClientId << " EndOffset " << EndOffset << " offset " << offset @@ -4237,6 +4256,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act, bool setSession = act.Type == TEvPQ::TEvSetClientInfo::ESCI_CREATE_SESSION; bool dropSession = act.Type == TEvPQ::TEvSetClientInfo::ESCI_DROP_SESSION; + bool strictCommitOffset = (act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET && act.SessionId.empty()); if (act.Type == TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE) { userInfo.ReadRuleGeneration = 0; @@ -4283,7 +4303,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act, userInfo.Session = session; userInfo.Generation = generation; userInfo.Step = step; - } else if (dropSession) { + } else if (dropSession || strictCommitOffset) { userInfo.Session = ""; userInfo.Generation = 0; userInfo.Step = 0; diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 38682b63799..b4abb5aaab8 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -179,7 +179,7 @@ private: if (rr->GetSeqNo() != res.GetResult(i).GetSeqNo() || rr->GetPartNo() + 1 != res.GetResult(i).GetPartNo()) { LOG_CRIT_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvRead tablet: " << Tablet << " last read pos (seqno/parno): " << rr->GetSeqNo() << "," << rr->GetPartNo() << " readed now " - << res.GetResult(i).GetSeqNo() << ", " << res.GetResult(i).GetPartNo() + << res.GetResult(i).GetSeqNo() << ", " << res.GetResult(i).GetPartNo() << " full request(now): " << Request); } Y_VERIFY(rr->GetSeqNo() == res.GetResult(i).GetSeqNo()); @@ -1538,7 +1538,8 @@ void TPersQueue::HandleSetClientOffsetRequest(const ui64 responseCookie, const T InitResponseBuilder(responseCookie, 1, COUNTER_LATENCY_PQ_SET_OFFSET); THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(responseCookie, cmd.GetClientId(), cmd.GetOffset(), - cmd.HasSessionId() ? cmd.GetSessionId() : "", 0, 0); + cmd.HasSessionId() ? cmd.GetSessionId() : "", 0, 0, + TEvPQ::TEvSetClientInfo::ESCI_OFFSET, 0, cmd.GetStrict()); ctx.Send(partActor, event.Release()); } } diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index 6cec1d738b9..ba7c162d322 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -46,6 +46,7 @@ message TPersQueuePartitionRequest { optional string SessionId = 4; // if not set, then no checks optional bool MirrorerRequest = 10 [default = false]; + optional bool Strict = 11 [default = false]; } message TCmdGetClientOffset { diff --git a/ydb/public/api/grpc/ydb_topic_v1.proto b/ydb/public/api/grpc/ydb_topic_v1.proto index a65012e4148..c991df62086 100644 --- a/ydb/public/api/grpc/ydb_topic_v1.proto +++ b/ydb/public/api/grpc/ydb_topic_v1.proto @@ -65,6 +65,8 @@ service TopicService { // <---------------- rpc StreamRead(stream StreamReadMessage.FromClient) returns (stream StreamReadMessage.FromServer); + // Single commit offset request. + rpc CommitOffset(CommitOffsetRequest) returns (CommitOffsetResponse); // Create topic command. rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse); diff --git a/ydb/public/api/protos/draft/persqueue_error_codes.proto b/ydb/public/api/protos/draft/persqueue_error_codes.proto index 8b2a0986314..f637fba4682 100644 --- a/ydb/public/api/protos/draft/persqueue_error_codes.proto +++ b/ydb/public/api/protos/draft/persqueue_error_codes.proto @@ -40,5 +40,7 @@ enum EErrorCode { CREATE_TIMEOUT = 22; // TODO: move to pqlib codes IDLE_TIMEOUT = 23; // TODO: move to pqlib codes + SET_OFFSET_ERROR_COMMIT_TO_PAST = 25; + ERROR = 100; } diff --git a/ydb/public/api/protos/persqueue_error_codes_v1.proto b/ydb/public/api/protos/persqueue_error_codes_v1.proto index fab4aece53b..0db64f69fed 100644 --- a/ydb/public/api/protos/persqueue_error_codes_v1.proto +++ b/ydb/public/api/protos/persqueue_error_codes_v1.proto @@ -41,6 +41,7 @@ enum ErrorCode { PREFERRED_CLUSTER_MISMATCHED = 500022; TABLET_PIPE_DISCONNECTED = 500023; + SET_OFFSET_ERROR_COMMIT_TO_PAST = 500025; ERROR = 500100; } diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index a5a6c5abde6..5ade5338e76 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -540,9 +540,41 @@ message AddOffsetsToTransactionResponse { message AddOffsetsToTransactionResult { } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// CommitOffset + + +// Commit offset request sent from client to server. +message CommitOffsetRequest { + Ydb.Operations.OperationParams operation_params = 1; + + // Topic path of partition. + string path = 2; + // Partition identifier. + int64 partition_id = 3; + // Path of consumer. + string consumer = 4; + + // Processed offset. + int64 offset = 5; +} + +// Commit offset response sent from server to client. +message CommitOffsetResponse { + // Result of request will be inside operation. + Ydb.Operations.Operation operation = 1; +} + +// Commit offset result message inside CommitOffsetResponse.operation. +message CommitOffsetResult { +} + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Control messages + // message representing statistics by seleveral windows message MultipleWindowsStat { int64 per_minute = 1; @@ -768,7 +800,7 @@ message DescribeTopicResult { // Minimum of timestamps of last write among all partitions. google.protobuf.Timestamp min_last_write_time = 2; - // Maximum of differences between write timestamp and create timestamp for all messages, written during last minute. + // Maximum of differences between write timestamp and create timestamp for all messages, written during last minute. google.protobuf.Duration max_write_time_lag = 3; // How much bytes were written statistics. MultipleWindowsStat bytes_written = 4; @@ -855,7 +887,7 @@ message PartitionStats { int64 store_size_bytes = 2; // Timestamp of last write. google.protobuf.Timestamp last_write_time = 3; - // Maximum of differences between write timestamp and create timestamp for all messages, written during last minute. + // Maximum of differences between write timestamp and create timestamp for all messages, written during last minute. google.protobuf.Duration max_write_time_lag = 4; // How much bytes were written during several windows in this partition. MultipleWindowsStat bytes_written = 5; diff --git a/ydb/services/lib/actors/type_definitions.h b/ydb/services/lib/actors/type_definitions.h index 9db7e6e6864..f0435d7835e 100644 --- a/ydb/services/lib/actors/type_definitions.h +++ b/ydb/services/lib/actors/type_definitions.h @@ -20,6 +20,7 @@ struct TTopicInitInfo { bool IsServerless = false; TString FolderId; NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode; + THashMap<ui32, ui64> PartitionIdToTabletId; }; using TTopicInitInfoMap = THashMap<TString, TTopicInitInfo>; @@ -40,6 +41,8 @@ struct TTopicHolder { TVector<ui32> Groups; TMap<ui64, ui64> Partitions; + THashMap<ui32, ui64> PartitionIdToTabletId; + inline static TTopicHolder FromTopicInfo(const TTopicInitInfo& info) { return TTopicHolder{ @@ -52,6 +55,7 @@ struct TTopicHolder { .FolderId = info.FolderId, .MeteringMode = info.MeteringMode, .FullConverter = info.TopicNameConverter, + .PartitionIdToTabletId = info.PartitionIdToTabletId, }; } }; diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt index abc9a2c625f..a43e8508cc0 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt @@ -30,6 +30,7 @@ target_link_libraries(services-persqueue_v1-actors PUBLIC ) target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/codecs.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/persqueue_utils.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_actor.cpp diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt index 7dfbde6a5ab..afde3c630d4 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt @@ -31,6 +31,7 @@ target_link_libraries(services-persqueue_v1-actors PUBLIC ) target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/codecs.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/persqueue_utils.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_actor.cpp diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt index 7dfbde6a5ab..afde3c630d4 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt @@ -31,6 +31,7 @@ target_link_libraries(services-persqueue_v1-actors PUBLIC ) target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/codecs.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/persqueue_utils.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_actor.cpp diff --git a/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp b/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp new file mode 100644 index 00000000000..755aa9dd3ba --- /dev/null +++ b/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp @@ -0,0 +1,190 @@ +#include "commit_offset_actor.h" + +#include "persqueue_utils.h" +#include "read_init_auth_actor.h" + +#include <ydb/core/client/server/msgbus_server_persqueue.h> + +#include <ydb/public/api/protos/ydb_persqueue_v1.pb.h> +#include <ydb/public/lib/base/msgbus_status.h> + +namespace NKikimr::NGRpcProxy::V1 { + +using namespace PersQueue::V1; + + +TCommitOffsetActor::TCommitOffsetActor( + TEvCommitOffsetRequest* request, const NPersQueue::TTopicsListController& topicsHandler, + const TActorId& schemeCache, const TActorId& newSchemeCache, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters +) + : TBase(request) + , SchemeCache(schemeCache) + , NewSchemeCache(newSchemeCache) + , AuthInitActor() + , Counters(counters) + , TopicsHandler(topicsHandler) +{ + Y_ASSERT(request); +} + + + +TCommitOffsetActor::~TCommitOffsetActor() = default; + + +void TCommitOffsetActor::Bootstrap(const TActorContext& ctx) { + TBase::Bootstrap(ctx); + Become(&TThis::StateFunc); + + auto request = dynamic_cast<const Ydb::Topic::CommitOffsetRequest*>(GetProtoRequest()); + Y_VERIFY(request); + ClientId = NPersQueue::ConvertNewConsumerName(request->consumer(), ctx); + PartitionId = request->Getpartition_id(); + + TIntrusivePtr<NACLib::TUserToken> token; + if (Request_->GetInternalToken().empty()) { + if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { + AnswerError("Unauthenticated access is forbidden, please provide credentials", PersQueue::ErrorCode::ACCESS_DENIED, ctx); + return; + } + } else { + token = new NACLib::TUserToken(Request_->GetInternalToken()); + } + + THashSet<TString> topicsToResolve; + + if (request->path().empty()) { + AnswerError("empty topic in commit offset request", PersQueue::ErrorCode::BAD_REQUEST, ctx); + return; + } + topicsToResolve.insert(request->path()); + + auto topicsList = TopicsHandler.GetReadTopicsList( + topicsToResolve, true, Request().GetDatabaseName().GetOrElse(TString()) + ); + if (!topicsList.IsValid) { + return AnswerError( + topicsList.Reason, + PersQueue::ErrorCode::BAD_REQUEST, ctx + ); + } + + AuthInitActor = ctx.Register(new TReadInitAndAuthActor( + ctx, ctx.SelfID, ClientId, 0, TString("read_info:") + Request().GetPeerName(), + SchemeCache, NewSchemeCache, Counters, token, topicsList, TopicsHandler.GetLocalCluster() + )); +} + + +void TCommitOffsetActor::Die(const TActorContext& ctx) { + if (PipeClient) + NTabletPipe::CloseClient(ctx, PipeClient); + + ctx.Send(AuthInitActor, new TEvents::TEvPoisonPill()); + + TActorBootstrapped<TCommitOffsetActor>::Die(ctx); +} + +void TCommitOffsetActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx) { + + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "CommitOffset auth ok, got " << ev->Get()->TopicAndTablets.size() << " topics"); + TopicAndTablets = std::move(ev->Get()->TopicAndTablets); + if (TopicAndTablets.empty()) { + AnswerError("empty list of topics", PersQueue::ErrorCode::UNKNOWN_TOPIC, ctx); + return; + } + Y_VERIFY(TopicAndTablets.size() == 1); + auto& [topic, topicInitInfo] = *TopicAndTablets.begin(); + + // AnswerError("test auth ok", Ydb::PersQueue::ErrorCode::ERROR, ctx); + + if (topicInitInfo.PartitionIdToTabletId.find(PartitionId) == topicInitInfo.PartitionIdToTabletId.end()) { + AnswerError("partition id not found in topic", PersQueue::ErrorCode::WRONG_PARTITION_NUMBER, ctx); + } + + ui64 tabletId = topicInitInfo.PartitionIdToTabletId.at(PartitionId); + + NTabletPipe::TClientConfig clientConfig; + clientConfig.RetryPolicy = { + .RetryLimitCount = 6, + .MinRetryTime = TDuration::MilliSeconds(10), + .MaxRetryTime = TDuration::MilliSeconds(100), + .BackoffMultiplier = 2, + .DoFirstRetryInstantly = true + }; + + PipeClient = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig)); + + auto client_req = dynamic_cast<const Ydb::Topic::CommitOffsetRequest*>(GetProtoRequest()); + + NKikimrClient::TPersQueueRequest request; + request.MutablePartitionRequest()->SetTopic(topicInitInfo.TopicNameConverter->GetPrimaryPath()); + request.MutablePartitionRequest()->SetPartition(client_req->partition_id()); + + Y_VERIFY(PipeClient); + + auto commit = request.MutablePartitionRequest()->MutableCmdSetClientOffset(); + commit->SetClientId(ClientId); + commit->SetOffset(client_req->offset()); + commit->SetStrict(true); + + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "strict CommitOffset, partition " << client_req->partition_id() + << " committing to position " << client_req->offset() /*<< " prev " << CommittedOffset + << " end " << EndOffset << " by cookie " << readId*/); + + TAutoPtr<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest); + req->Record.Swap(&request); + + NTabletPipe::SendData(ctx, PipeClient, req.Release()); +} + + +void TCommitOffsetActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) { + if (ev->Get()->Record.GetStatus() != NMsgBusProxy::MSTATUS_OK) { + return AnswerError(ev->Get()->Record.GetErrorReason(), PersQueue::ErrorCode::ERROR, ctx); + } + + // Convert to correct response. + + const auto& partitionResult = ev->Get()->Record.GetPartitionResponse(); + Y_VERIFY(!partitionResult.HasCmdReadResult()); + + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "CommitOffset, commit done."); + + Ydb::Topic::CommitOffsetResult result; + Request().SendResult(result, Ydb::StatusIds::SUCCESS); + Die(ctx); +} + + +void TCommitOffsetActor::AnswerError(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, const NActors::TActorContext& ctx) { + + Ydb::Topic::CommitOffsetResponse response; + response.mutable_operation()->set_ready(true); + auto issue = response.mutable_operation()->add_issues(); + FillIssue(issue, errorCode, errorReason); + response.mutable_operation()->set_status(ConvertPersQueueInternalCodeToStatus(errorCode)); + Reply(ConvertPersQueueInternalCodeToStatus(errorCode), response.operation().issues(), ctx); +} + + +void TCommitOffsetActor::Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx) { + AnswerError(ev->Get()->Reason, ev->Get()->ErrorCode, ctx); +} + +void TCommitOffsetActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { + TEvTabletPipe::TEvClientConnected *msg = ev->Get(); + + if (msg->Status != NKikimrProto::OK) { + AnswerError(TStringBuilder() <<"pipe to tablet is dead" << msg->TabletId, PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, ctx); + return; + } +} + +void TCommitOffsetActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { + AnswerError(TStringBuilder() <<"pipe to tablet destroyed" << ev->Get()->TabletId, PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, ctx); +} + + +} diff --git a/ydb/services/persqueue_v1/actors/commit_offset_actor.h b/ydb/services/persqueue_v1/actors/commit_offset_actor.h new file mode 100644 index 00000000000..6eefc867ebb --- /dev/null +++ b/ydb/services/persqueue_v1/actors/commit_offset_actor.h @@ -0,0 +1,89 @@ +#pragma once + +#include "events.h" + +#include <ydb/core/grpc_services/rpc_deferrable.h> +#include <ydb/core/client/server/msgbus_server_pq_metacache.h> + +#include <ydb/core/persqueue/events/global.h> + + +namespace NKikimr::NGRpcProxy::V1 { + +using namespace NKikimr::NGRpcService; + +class TCommitOffsetActor : public TRpcOperationRequestActor<TCommitOffsetActor, TEvCommitOffsetRequest> { + + using TBase = TRpcOperationRequestActor<TCommitOffsetActor, TEvCommitOffsetRequest>; + + using TEvDescribeTopicsResponse = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse; + using TEvDescribeTopicsRequest = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsRequest; + +public: + static constexpr ui32 MAX_PIPE_RESTARTS = 100; //after 100 restarts without progress kill session + +public: + TCommitOffsetActor( + NKikimr::NGRpcService::TEvCommitOffsetRequest* request, const NPersQueue::TTopicsListController& topicsHandler, + const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters + ); + ~TCommitOffsetActor(); + + void Bootstrap(const NActors::TActorContext& ctx); + + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::FRONT_PQ_COMMIT; } + + bool HasCancelOperation() { + return false; + } + +private: + + void Die(const NActors::TActorContext& ctx) override; + + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvPQProxy::TEvAuthResultOk, Handle); // from auth actor + HFunc(TEvPQProxy::TEvCloseSession, Handle); // from auth actor + + HFunc(TEvTabletPipe::TEvClientConnected, Handle); + HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + + HFunc(TEvPersQueue::TEvResponse, Handle); + default: + break; + }; + } + + void Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const NActors::TActorContext& ctx); + void Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const NActors::TActorContext& ctx); + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx); + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx); + + void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx); + + void AnswerError(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, const NActors::TActorContext& ctx); + void ProcessAnswers(const TActorContext& ctx); + +private: + TActorId SchemeCache; + TActorId NewSchemeCache; + + TActorId AuthInitActor; + + TTopicInitInfoMap TopicAndTablets; + + TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; + + TString ClientId; + ui64 PartitionId; + + TActorId PipeClient; + + NPersQueue::TTopicsListController TopicsHandler; +}; + +} diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp index d1b18bf5a2b..402a93cf514 100644 --- a/ydb/services/persqueue_v1/actors/partition_actor.cpp +++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp @@ -440,7 +440,9 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo if (ev->Get()->Record.HasErrorCode() && ev->Get()->Record.GetErrorCode() != NPersQueue::NErrorCode::OK) { const auto errorCode = ev->Get()->Record.GetErrorCode(); - if (errorCode == NPersQueue::NErrorCode::WRONG_COOKIE || errorCode == NPersQueue::NErrorCode::BAD_REQUEST) { + if (errorCode == NPersQueue::NErrorCode::WRONG_COOKIE + || errorCode == NPersQueue::NErrorCode::BAD_REQUEST + || errorCode == NPersQueue::NErrorCode::READ_ERROR_NO_SESSION) { Counters.Errors.Inc(); ctx.Send(ParentId, new TEvPQProxy::TEvCloseSession("status is not ok: " + ev->Get()->Record.GetErrorReason(), ConvertOldCode(ev->Get()->Record.GetErrorCode()))); } else { @@ -925,6 +927,7 @@ void TPartitionActor::Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext& auto read = request.MutablePartitionRequest()->MutableCmdRead(); read->SetClientId(ClientId); read->SetClientDC(ClientDC); + read->SetSessionId(Session); if (req->MaxCount) { read->SetCount(req->MaxCount); } diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.cpp b/ydb/services/persqueue_v1/actors/persqueue_utils.cpp index 986801f5f65..8ca421a3e93 100644 --- a/ydb/services/persqueue_v1/actors/persqueue_utils.cpp +++ b/ydb/services/persqueue_v1/actors/persqueue_utils.cpp @@ -115,6 +115,7 @@ Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const Ydb::PersQ case READ_ERROR_TOO_SMALL_OFFSET: case READ_ERROR_TOO_BIG_OFFSET: case SET_OFFSET_ERROR_COMMIT_TO_FUTURE: + case SET_OFFSET_ERROR_COMMIT_TO_PAST: case READ_NOT_DONE: return Ydb::StatusIds::GENERIC_ERROR; case TABLET_IS_DROPPED: diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index 587dcf90424..72f6946c181 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -101,6 +101,11 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse( topicsIter->second.DbPath = pqDescr.GetPQTabletConfig().GetYdbDatabasePath(); topicsIter->second.IsServerless = entry.DomainInfo->IsServerless(); + for (const auto& partitionDescription : pqDescr.GetPartitions()) { + topicsIter->second.PartitionIdToTabletId[partitionDescription.GetPartitionId()] = + partitionDescription.GetTabletId(); + } + if (!topicsIter->second.DiscoveryConverter->IsValid()) { TString errorReason = Sprintf("Internal server error with topic '%s', Marker# PQ503", topicsIter->second.DiscoveryConverter->GetPrintableString().c_str()); @@ -265,7 +270,7 @@ void TReadInitAndAuthActor::FinishInitialization(const TActorContext& ctx) { TTopicInitInfoMap res; for (auto& [name, holder] : Topics) { res.insert(std::make_pair(name, TTopicInitInfo{ - holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.DbPath, holder.IsServerless, holder.FolderId, holder.MeteringMode + holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.DbPath, holder.IsServerless, holder.FolderId, holder.MeteringMode, holder.PartitionIdToTabletId })); } ctx.Send(ParentId, new TEvPQProxy::TEvAuthResultOk(std::move(res))); diff --git a/ydb/services/persqueue_v1/grpc_pq_read.cpp b/ydb/services/persqueue_v1/grpc_pq_read.cpp index 33b0723c306..0e2b67f8aab 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read.cpp @@ -1,6 +1,7 @@ #include "grpc_pq_read.h" #include "actors/read_info_actor.h" +#include "actors/commit_offset_actor.h" #include <ydb/core/grpc_services/grpc_helper.h> #include <ydb/core/tx/scheme_board/cache.h> @@ -121,11 +122,25 @@ void TPQReadService::Handle(NGRpcService::TEvStreamPQMigrationReadRequest::TPtr& HandleStreamPQReadRequest<NGRpcService::TEvStreamPQMigrationReadRequest>(ev, ctx); } +void TPQReadService::Handle(NGRpcService::TEvCommitOffsetRequest::TPtr& ev, const TActorContext& ctx) { + + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new commit offset request"); + + if (HaveClusters && (Clusters.empty() || LocalCluster.empty())) { + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, "new commit offset request failed - cluster is not known yet"); + + ev->Get()->SendResult(ConvertPersQueueInternalCodeToStatus(PersQueue::ErrorCode::INITIALIZING), FillInfoResponse("cluster initializing", PersQueue::ErrorCode::INITIALIZING)); //CANCELLED + return; + } else { + ctx.Register(new TCommitOffsetActor(ev->Release().Release(), *TopicsHandler, SchemeCache, NewSchemeCache, Counters)); + } +} + void TPQReadService::Handle(NGRpcService::TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new read info request"); - if (Clusters.empty() || LocalCluster.empty()) { + if (HaveClusters && (Clusters.empty() || LocalCluster.empty())) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, "new read info request failed - cluster is not known yet"); ev->Get()->SendResult(ConvertPersQueueInternalCodeToStatus(PersQueue::ErrorCode::INITIALIZING), FillInfoResponse("cluster initializing", PersQueue::ErrorCode::INITIALIZING)); //CANCELLED @@ -156,6 +171,10 @@ void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEv ctx.Send(NKikimr::NGRpcProxy::V1::GetPQReadServiceActorID(), ev->Release().Release()); } +void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvCommitOffsetRequest::TPtr& ev, const TActorContext& ctx) { + ctx.Send(NKikimr::NGRpcProxy::V1::GetPQReadServiceActorID(), ev->Release().Release()); +} + void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx) { ctx.Send(NKikimr::NGRpcProxy::V1::GetPQReadServiceActorID(), ev->Release().Release()); } diff --git a/ydb/services/persqueue_v1/grpc_pq_read.h b/ydb/services/persqueue_v1/grpc_pq_read.h index 9ab9c57d92c..36a9dfa1376 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read.h +++ b/ydb/services/persqueue_v1/grpc_pq_read.h @@ -43,6 +43,7 @@ private: switch (ev->GetTypeRewrite()) { HFunc(NGRpcService::TEvStreamTopicReadRequest, Handle); HFunc(NGRpcService::TEvStreamPQMigrationReadRequest, Handle); + HFunc(NGRpcService::TEvCommitOffsetRequest, Handle); HFunc(NGRpcService::TEvPQReadInfoRequest, Handle); HFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle); HFunc(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate, Handle); @@ -56,6 +57,7 @@ private: private: void Handle(NGRpcService::TEvStreamTopicReadRequest::TPtr& ev, const TActorContext& ctx); void Handle(NGRpcService::TEvStreamPQMigrationReadRequest::TPtr& ev, const TActorContext& ctx); + void Handle(NGRpcService::TEvCommitOffsetRequest::TPtr& ev, const TActorContext& ctx); void Handle(NGRpcService::TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx); void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev, const TActorContext& ctx); void Handle(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 79e10f13dab..c0ea42c90cd 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -787,6 +787,312 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } } + Y_UNIT_TEST(TopicServiceCommitOffset) { + TPersQueueV1TestServer server; + SET_LOCALS; + MAKE_INSECURE_STUB(Ydb::Topic::V1::TopicService); + server.EnablePQLogs({ NKikimrServices::PQ_METACACHE, NKikimrServices::PQ_READ_PROXY }); + server.EnablePQLogs({ NKikimrServices::KQP_PROXY }, NLog::EPriority::PRI_EMERG); + server.EnablePQLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD }, NLog::EPriority::PRI_ERROR); + server.EnablePQLogs({ NKikimrServices::PERSQUEUE }, NLog::EPriority::PRI_DEBUG); + + auto driver = pqClient->GetDriver(); + { + auto writer = CreateSimpleWriter(*driver, "acc/topic1", "source"); + for (int i = 1; i < 17; ++i) { + bool res = writer->Write("valuevaluevalue" + ToString(i), i); + UNIT_ASSERT(res); + } + bool res = writer->Close(TDuration::Seconds(10)); + UNIT_ASSERT(res); + } + + auto TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_); + + grpc::ClientContext readContext; + auto readStream = TopicStubP_ -> StreamRead(&readContext); + UNIT_ASSERT(readStream); + + i64 assignId = 0; + // init read session + { + Ydb::Topic::StreamReadMessage::FromClient req; + Ydb::Topic::StreamReadMessage::FromServer resp; + + req.mutable_init_request()->add_topics_read_settings()->set_path("acc/topic1"); + + req.mutable_init_request()->set_consumer("user"); + + if (!readStream->Write(req)) { + ythrow yexception() << "write fail"; + } + UNIT_ASSERT(readStream->Read(&resp)); + Cerr << "===Got response: " << resp.ShortDebugString() << Endl; + UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse); + + req.Clear(); + // await and confirm StartPartitionSessionRequest from server + UNIT_ASSERT(readStream->Read(&resp)); + UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest); + UNIT_ASSERT_VALUES_EQUAL(resp.start_partition_session_request().partition_session().path(), "acc/topic1"); + UNIT_ASSERT(resp.start_partition_session_request().partition_session().partition_id() == 0); + UNIT_ASSERT(resp.start_partition_session_request().committed_offset() == 0); + + assignId = resp.start_partition_session_request().partition_session().partition_session_id(); + req.Clear(); + req.mutable_start_partition_session_response()->set_partition_session_id(assignId); + + req.mutable_start_partition_session_response()->set_read_offset(0); + if (!readStream->Write(req)) { + ythrow yexception() << "write fail"; + } + + //send some reads + req.Clear(); + req.mutable_read_request()->set_bytes_size(1); + if (!readStream->Write(req)) { + ythrow yexception() << "write fail"; + } + + resp.Clear(); + UNIT_ASSERT(readStream->Read(&resp)); + Cerr << "Got read response " << resp << "\n"; + UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp); + UNIT_ASSERT(resp.read_response().partition_data_size() == 1); + UNIT_ASSERT(resp.read_response().partition_data(0).batches_size() == 1); + UNIT_ASSERT(resp.read_response().partition_data(0).batches(0).message_data_size() >= 1); + } + + // commit offset + { + Ydb::Topic::CommitOffsetRequest req; + Ydb::Topic::CommitOffsetResponse resp; + + req.set_path("acc/topic1"); + req.set_consumer("user"); + req.set_offset(5); + + auto status = TopicStubP_->CommitOffset(&rcontext, req, &resp); + + Cerr << resp << "\n"; + } + + { + Ydb::Topic::StreamReadMessage::FromClient req; + Ydb::Topic::StreamReadMessage::FromServer resp; + + req.mutable_read_request()->set_bytes_size(10000); + + // auto commit = req.mutable_commit_offset_request()->add_commit_offsets(); + // commit->set_partition_session_id(assignId); + + // auto offsets = commit->add_offsets(); + // offsets->set_start(0); + // offsets->set_end(7); + + if (!readStream->Write(req)) { + ythrow yexception() << "write fail"; + } + + UNIT_ASSERT(readStream->Read(&resp)); + Cerr << "=== Got response (expect session expired): " << resp.ShortDebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(resp.status(), Ydb::StatusIds::SESSION_EXPIRED); + } + } + + Y_UNIT_TEST(TopicServiceCommitOffsetBadOffsets) { + TPersQueueV1TestServer server; + SET_LOCALS; + MAKE_INSECURE_STUB(Ydb::Topic::V1::TopicService); + server.EnablePQLogs({ NKikimrServices::PQ_METACACHE, NKikimrServices::PQ_READ_PROXY }); + server.EnablePQLogs({ NKikimrServices::KQP_PROXY }, NLog::EPriority::PRI_EMERG); + server.EnablePQLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD }, NLog::EPriority::PRI_ERROR); + server.EnablePQLogs({ NKikimrServices::PERSQUEUE }, NLog::EPriority::PRI_DEBUG); + + auto TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_); + + { + Ydb::Topic::CreateTopicRequest request; + Ydb::Topic::CreateTopicResponse response; + request.set_path(TStringBuilder() << "/Root/PQ/rt3.dc1--acc--topic2"); + + request.mutable_retention_period()->set_seconds(1); + + request.mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_RAW); + request.mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_GZIP); + + grpc::ClientContext rcontext; + + auto status = TopicStubP_->CreateTopic(&rcontext, request, &response); + + UNIT_ASSERT(status.ok()); + Ydb::Topic::CreateTopicResult res; + response.operation().result().UnpackTo(&res); + Cerr << response << "\n" << res << "\n"; + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + + server.Server->AnnoyingClient->WaitTopicInit("acc/topic2"); + server.Server->AnnoyingClient->AddTopic("acc/topic2"); + } + + auto driver = pqClient->GetDriver(); + { + auto writer = CreateSimpleWriter(*driver, "acc/topic2", "source", /*partitionGroup=*/{}, /*codec=*/{"raw"}); + TString blob{1_MB, 'x'}; + for (int i = 1; i <= 20; ++i) { + bool res = writer->Write(blob + ToString(i), i); + UNIT_ASSERT(res); + } + + bool res = writer->Close(TDuration::Seconds(10)); + UNIT_ASSERT(res); + } + + { + using namespace NYdb::NTopic; + auto settings = TDescribeTopicSettings().IncludeStats(true); + auto client = TTopicClient(server.Server->GetDriver()); + auto desc = client.DescribeTopic("/Root/PQ/rt3.dc1--acc--topic2", settings) + .ExtractValueSync() + .GetTopicDescription(); + Cerr << ">>>Describe result: partitions count is " << desc.GetTotalPartitionsCount() << Endl; + for (const auto& partInfo: desc.GetPartitions()) { + Cerr << ">>>Describe result: partition id = " << partInfo.GetPartitionId() << ", "; + auto stats = partInfo.GetPartitionStats(); + UNIT_ASSERT(stats.Defined()); + Cerr << "offsets: [ " << stats.Get()->GetStartOffset() << ", " << stats.Get()->GetEndOffset() << " )" << Endl; + } + + TAlterTopicSettings alterSettings; + alterSettings + .BeginAddConsumer("first-consumer") + .EndAddConsumer() + .BeginAddConsumer("second-consumer").Important(true) + .EndAddConsumer(); + auto res = client.AlterTopic("/Root/PQ/rt3.dc1--acc--topic2", alterSettings); + res.Wait(); + Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n"; + UNIT_ASSERT(res.GetValue().IsSuccess()); + + } + // unimportant consumer + // commit to future - expect bad request + { + Ydb::Topic::CommitOffsetRequest req; + Ydb::Topic::CommitOffsetResponse resp; + + req.set_path("acc/topic2"); + req.set_consumer("first-consumer"); + req.set_offset(25); + + grpc::ClientContext rcontext; + + auto status = TopicStubP_->CommitOffset(&rcontext, req, &resp); + + Cerr << resp << "\n"; + UNIT_ASSERT(status.ok()); + UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::GENERIC_ERROR); + // TODO: change to BAD_REQUEST + // UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST); + } + + // commit to past - expect bad request + { + Ydb::Topic::CommitOffsetRequest req; + Ydb::Topic::CommitOffsetResponse resp; + + req.set_path("acc/topic2"); + req.set_consumer("first-consumer"); + req.set_offset(3); + + grpc::ClientContext rcontext; + + auto status = TopicStubP_->CommitOffset(&rcontext, req, &resp); + + Cerr << resp << "\n"; + UNIT_ASSERT(status.ok()); + UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::GENERIC_ERROR); + // TODO: change to BAD_REQUEST + // UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST); + } + + // commit to valid offset - expect successful commit + { + Ydb::Topic::CommitOffsetRequest req; + Ydb::Topic::CommitOffsetResponse resp; + + req.set_path("acc/topic2"); + req.set_consumer("first-consumer"); + req.set_offset(15); + + grpc::ClientContext rcontext; + + auto status = TopicStubP_->CommitOffset(&rcontext, req, &resp); + + Cerr << resp << "\n"; + UNIT_ASSERT(status.ok()); + UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::SUCCESS); + } + + // important consumer + // normal commit - expect successful commit + { + Ydb::Topic::CommitOffsetRequest req; + Ydb::Topic::CommitOffsetResponse resp; + + req.set_path("acc/topic2"); + req.set_consumer("second-consumer"); + req.set_offset(15); + + grpc::ClientContext rcontext; + + auto status = TopicStubP_->CommitOffset(&rcontext, req, &resp); + + Cerr << resp << "\n"; + UNIT_ASSERT(status.ok()); + UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::SUCCESS); + } + + // commit to past - expect error + { + Ydb::Topic::CommitOffsetRequest req; + Ydb::Topic::CommitOffsetResponse resp; + + req.set_path("acc/topic2"); + req.set_consumer("second-consumer"); + req.set_offset(3); + + grpc::ClientContext rcontext; + + auto status = TopicStubP_->CommitOffset(&rcontext, req, &resp); + + Cerr << resp << "\n"; + UNIT_ASSERT(status.ok()); + UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::GENERIC_ERROR); + // TODO: change to BAD_REQUEST + // UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST); + } + + // commit to future - expect bad request + { + Ydb::Topic::CommitOffsetRequest req; + Ydb::Topic::CommitOffsetResponse resp; + + req.set_path("acc/topic2"); + req.set_consumer("second-consumer"); + req.set_offset(25); + + grpc::ClientContext rcontext; + + auto status = TopicStubP_->CommitOffset(&rcontext, req, &resp); + + Cerr << resp << "\n"; + UNIT_ASSERT(status.ok()); + UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::GENERIC_ERROR); + // TODO: change to BAD_REQUEST + // UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST); + } + } Y_UNIT_TEST(TopicServiceReadBudget) { diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp index 730b03dabe2..464c8a93412 100644 --- a/ydb/services/persqueue_v1/topic.cpp +++ b/ydb/services/persqueue_v1/topic.cpp @@ -108,6 +108,10 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { }, &Ydb::Topic::V1::SVC::AsyncService::Request ## NAME, \ "TopicService/"#NAME, logger, getCounterBlock("topic", #NAME))->Run(); + ADD_REQUEST(CommitOffset, TopicService, CommitOffsetRequest, CommitOffsetResponse, { + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvCommitOffsetRequest(ctx)); + }) + ADD_REQUEST(DropTopic, TopicService, DropTopicRequest, DropTopicResponse, { ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDropTopicRequest(ctx, IsRlAllowed())); }) |