summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <[email protected]>2023-02-06 11:00:01 +0300
committerildar-khisam <[email protected]>2023-02-06 11:00:01 +0300
commitf60b1f4471381f9dbf366d94de6e4c713c1a4772 (patch)
tree8a2cdcf93e01460b48be5f0efe8cbeb40b3f468d
parent5dbceb1380cee5f5e1623fff06beaf9e6012e2c2 (diff)
propose commit offset api
propose commit offset api
-rw-r--r--ydb/core/grpc_services/base/base.h1
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp1
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.h1
-rw-r--r--ydb/core/grpc_services/rpc_calls.h1
-rw-r--r--ydb/core/persqueue/events/internal.h9
-rw-r--r--ydb/core/persqueue/partition.cpp28
-rw-r--r--ydb/core/persqueue/pq_impl.cpp5
-rw-r--r--ydb/core/protos/msgbus_pq.proto1
-rw-r--r--ydb/public/api/grpc/ydb_topic_v1.proto2
-rw-r--r--ydb/public/api/protos/draft/persqueue_error_codes.proto2
-rw-r--r--ydb/public/api/protos/persqueue_error_codes_v1.proto1
-rw-r--r--ydb/public/api/protos/ydb_topic.proto36
-rw-r--r--ydb/services/lib/actors/type_definitions.h4
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt1
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.linux.txt1
-rw-r--r--ydb/services/persqueue_v1/actors/commit_offset_actor.cpp190
-rw-r--r--ydb/services/persqueue_v1/actors/commit_offset_actor.h89
-rw-r--r--ydb/services/persqueue_v1/actors/partition_actor.cpp5
-rw-r--r--ydb/services/persqueue_v1/actors/persqueue_utils.cpp1
-rw-r--r--ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp7
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read.cpp21
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read.h2
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp306
-rw-r--r--ydb/services/persqueue_v1/topic.cpp4
25 files changed, 706 insertions, 14 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 28b75d5fc08..777f082c8cc 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -105,6 +105,7 @@ struct TRpcServices {
EvStreamTopicWrite,
EvStreamTopicRead,
EvPQReadInfo,
+ EvTopicCommitOffset,
EvListOperations,
EvExportToYt,
EvDiscoverPQClusters,
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index 778b424ab1c..beac70be059 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -536,6 +536,7 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo
HFunc(TEvStreamPQMigrationReadRequest, PreHandle);
HFunc(TEvStreamTopicWriteRequest, PreHandle);
HFunc(TEvStreamTopicReadRequest, PreHandle);
+ HFunc(TEvCommitOffsetRequest, PreHandle);
HFunc(TEvPQReadInfoRequest, PreHandle);
HFunc(TEvPQDropTopicRequest, PreHandle);
HFunc(TEvPQCreateTopicRequest, PreHandle);
diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h
index d1390c47b81..fe75617184f 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.h
+++ b/ydb/core/grpc_services/grpc_request_proxy.h
@@ -55,6 +55,7 @@ protected:
void Handle(TEvStreamPQMigrationReadRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvStreamTopicWriteRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvStreamTopicReadRequest::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvCommitOffsetRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h
index 332e21ff47c..5a666d68ed3 100644
--- a/ydb/core/grpc_services/rpc_calls.h
+++ b/ydb/core/grpc_services/rpc_calls.h
@@ -53,6 +53,7 @@ using TEvStreamPQWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStre
using TEvStreamPQMigrationReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQMigrationRead, Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>;
using TEvStreamTopicWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicWrite, Ydb::Topic::StreamWriteMessage::FromClient, Ydb::Topic::StreamWriteMessage::FromServer, TRateLimiterMode::RuManual>;
using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicRead, Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer, TRateLimiterMode::RuManual>;
+using TEvCommitOffsetRequest = TGRpcRequestWrapper<TRpcServices::EvTopicCommitOffset, Ydb::Topic::CommitOffsetRequest, Ydb::Topic::CommitOffsetResponse, true>;
using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>;
using TEvPQDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDropTopic, Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse, true>;
using TEvPQCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQCreateTopic, Ydb::PersQueue::V1::CreateTopicRequest, Ydb::PersQueue::V1::CreateTopicResponse, true>;
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index bd0b5c7894e..83cbd94bd59 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -253,7 +253,8 @@ struct TEvPQ {
};
TEvSetClientInfo(const ui64 cookie, const TString& clientId, const ui64 offset, const TString& sessionId,
- const ui32 generation, const ui32 step, ESetClientInfoType type = ESCI_OFFSET, ui64 readRuleGeneration = 0)
+ const ui32 generation, const ui32 step, ESetClientInfoType type = ESCI_OFFSET,
+ ui64 readRuleGeneration = 0, bool strict = false)
: Cookie(cookie)
, ClientId(clientId)
, Offset(offset)
@@ -262,6 +263,7 @@ struct TEvPQ {
, Step(step)
, Type(type)
, ReadRuleGeneration(readRuleGeneration)
+ , Strict(strict)
{
}
@@ -273,6 +275,7 @@ struct TEvPQ {
ui32 Step;
ESetClientInfoType Type;
ui64 ReadRuleGeneration;
+ bool Strict;
};
struct TEvGetClientOffset : public TEventLocal<TEvGetClientOffset, EvGetClientOffset> {
@@ -630,12 +633,12 @@ struct TEvPQ {
TEvInitCredentials()
{}
};
-
+
struct TEvCredentialsCreated : public TEventLocal<TEvCredentialsCreated, EvCredentialsCreated> {
TEvCredentialsCreated(const TString& error)
: Error(error)
{}
-
+
TEvCredentialsCreated(std::shared_ptr<NYdb::ICredentialsProviderFactory> credentials)
: Credentials(credentials)
{}
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 50e1a30a3a4..eb250216446 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -3723,7 +3723,7 @@ size_t TPartition::GetUserActCount(const TString& consumer) const
return i->second;
} else {
return 0;
- }
+ }
}
void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx)
@@ -4083,7 +4083,7 @@ void TPartition::ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx,
return;
}
- TUserInfo& userInfo = GetOrCreatePendingUser(user, ctx);
+ TUserInfo& userInfo = GetOrCreatePendingUser(user, ctx);
if (operation.GetBegin() > operation.GetEnd()) {
ScheduleReplyPropose(tx,
@@ -4128,6 +4128,7 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
Y_VERIFY(!UsersInfoWriteInProgress);
const TString& user = act.ClientId;
+ const bool strictCommitOffset = (act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET && act.Strict);
if (!PendingUsersInfo.contains(user) && AffectedUsers.contains(user)) {
switch (act.Type) {
@@ -4187,6 +4188,16 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
return;
}
+ if (strictCommitOffset && act.Offset < StartOffset) {
+ // strict commit to past, reply error
+ TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1);
+ ScheduleReplyError(act.Cookie,
+ NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_PAST,
+ TStringBuilder() << "set offset " << act.Offset << " to past for consumer " << act.ClientId << " actual start offset is " << StartOffset);
+
+ return;
+ }
+
//request in correct session - make it
ui64 offset = (act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET ? act.Offset : userInfo.Offset);
@@ -4205,7 +4216,15 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
Y_VERIFY(offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, offset);
if (offset > EndOffset) {
- LOG_ERROR_S(
+ if (strictCommitOffset) {
+ TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1);
+ ScheduleReplyError(act.Cookie,
+ NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_FUTURE,
+ TStringBuilder() << "strict commit can't set offset " << act.Offset << " to future, consumer " << act.ClientId << ", actual end offset is " << EndOffset);
+
+ return;
+ }
+ LOG_WARN_S(
ctx, NKikimrServices::PERSQUEUE,
"commit to future - topic " << TopicConverter->GetClientsideName() << " partition " << Partition
<< " client " << act.ClientId << " EndOffset " << EndOffset << " offset " << offset
@@ -4237,6 +4256,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
bool setSession = act.Type == TEvPQ::TEvSetClientInfo::ESCI_CREATE_SESSION;
bool dropSession = act.Type == TEvPQ::TEvSetClientInfo::ESCI_DROP_SESSION;
+ bool strictCommitOffset = (act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET && act.SessionId.empty());
if (act.Type == TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE) {
userInfo.ReadRuleGeneration = 0;
@@ -4283,7 +4303,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
userInfo.Session = session;
userInfo.Generation = generation;
userInfo.Step = step;
- } else if (dropSession) {
+ } else if (dropSession || strictCommitOffset) {
userInfo.Session = "";
userInfo.Generation = 0;
userInfo.Step = 0;
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 38682b63799..b4abb5aaab8 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -179,7 +179,7 @@ private:
if (rr->GetSeqNo() != res.GetResult(i).GetSeqNo() || rr->GetPartNo() + 1 != res.GetResult(i).GetPartNo()) {
LOG_CRIT_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvRead tablet: " << Tablet
<< " last read pos (seqno/parno): " << rr->GetSeqNo() << "," << rr->GetPartNo() << " readed now "
- << res.GetResult(i).GetSeqNo() << ", " << res.GetResult(i).GetPartNo()
+ << res.GetResult(i).GetSeqNo() << ", " << res.GetResult(i).GetPartNo()
<< " full request(now): " << Request);
}
Y_VERIFY(rr->GetSeqNo() == res.GetResult(i).GetSeqNo());
@@ -1538,7 +1538,8 @@ void TPersQueue::HandleSetClientOffsetRequest(const ui64 responseCookie, const T
InitResponseBuilder(responseCookie, 1, COUNTER_LATENCY_PQ_SET_OFFSET);
THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(responseCookie, cmd.GetClientId(),
cmd.GetOffset(),
- cmd.HasSessionId() ? cmd.GetSessionId() : "", 0, 0);
+ cmd.HasSessionId() ? cmd.GetSessionId() : "", 0, 0,
+ TEvPQ::TEvSetClientInfo::ESCI_OFFSET, 0, cmd.GetStrict());
ctx.Send(partActor, event.Release());
}
}
diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto
index 6cec1d738b9..ba7c162d322 100644
--- a/ydb/core/protos/msgbus_pq.proto
+++ b/ydb/core/protos/msgbus_pq.proto
@@ -46,6 +46,7 @@ message TPersQueuePartitionRequest {
optional string SessionId = 4; // if not set, then no checks
optional bool MirrorerRequest = 10 [default = false];
+ optional bool Strict = 11 [default = false];
}
message TCmdGetClientOffset {
diff --git a/ydb/public/api/grpc/ydb_topic_v1.proto b/ydb/public/api/grpc/ydb_topic_v1.proto
index a65012e4148..c991df62086 100644
--- a/ydb/public/api/grpc/ydb_topic_v1.proto
+++ b/ydb/public/api/grpc/ydb_topic_v1.proto
@@ -65,6 +65,8 @@ service TopicService {
// <----------------
rpc StreamRead(stream StreamReadMessage.FromClient) returns (stream StreamReadMessage.FromServer);
+ // Single commit offset request.
+ rpc CommitOffset(CommitOffsetRequest) returns (CommitOffsetResponse);
// Create topic command.
rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse);
diff --git a/ydb/public/api/protos/draft/persqueue_error_codes.proto b/ydb/public/api/protos/draft/persqueue_error_codes.proto
index 8b2a0986314..f637fba4682 100644
--- a/ydb/public/api/protos/draft/persqueue_error_codes.proto
+++ b/ydb/public/api/protos/draft/persqueue_error_codes.proto
@@ -40,5 +40,7 @@ enum EErrorCode {
CREATE_TIMEOUT = 22; // TODO: move to pqlib codes
IDLE_TIMEOUT = 23; // TODO: move to pqlib codes
+ SET_OFFSET_ERROR_COMMIT_TO_PAST = 25;
+
ERROR = 100;
}
diff --git a/ydb/public/api/protos/persqueue_error_codes_v1.proto b/ydb/public/api/protos/persqueue_error_codes_v1.proto
index fab4aece53b..0db64f69fed 100644
--- a/ydb/public/api/protos/persqueue_error_codes_v1.proto
+++ b/ydb/public/api/protos/persqueue_error_codes_v1.proto
@@ -41,6 +41,7 @@ enum ErrorCode {
PREFERRED_CLUSTER_MISMATCHED = 500022;
TABLET_PIPE_DISCONNECTED = 500023;
+ SET_OFFSET_ERROR_COMMIT_TO_PAST = 500025;
ERROR = 500100;
}
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
index a5a6c5abde6..5ade5338e76 100644
--- a/ydb/public/api/protos/ydb_topic.proto
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -540,9 +540,41 @@ message AddOffsetsToTransactionResponse {
message AddOffsetsToTransactionResult {
}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// CommitOffset
+
+
+// Commit offset request sent from client to server.
+message CommitOffsetRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+
+ // Topic path of partition.
+ string path = 2;
+ // Partition identifier.
+ int64 partition_id = 3;
+ // Path of consumer.
+ string consumer = 4;
+
+ // Processed offset.
+ int64 offset = 5;
+}
+
+// Commit offset response sent from server to client.
+message CommitOffsetResponse {
+ // Result of request will be inside operation.
+ Ydb.Operations.Operation operation = 1;
+}
+
+// Commit offset result message inside CommitOffsetResponse.operation.
+message CommitOffsetResult {
+}
+
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Control messages
+
// message representing statistics by seleveral windows
message MultipleWindowsStat {
int64 per_minute = 1;
@@ -768,7 +800,7 @@ message DescribeTopicResult {
// Minimum of timestamps of last write among all partitions.
google.protobuf.Timestamp min_last_write_time = 2;
- // Maximum of differences between write timestamp and create timestamp for all messages, written during last minute.
+ // Maximum of differences between write timestamp and create timestamp for all messages, written during last minute.
google.protobuf.Duration max_write_time_lag = 3;
// How much bytes were written statistics.
MultipleWindowsStat bytes_written = 4;
@@ -855,7 +887,7 @@ message PartitionStats {
int64 store_size_bytes = 2;
// Timestamp of last write.
google.protobuf.Timestamp last_write_time = 3;
- // Maximum of differences between write timestamp and create timestamp for all messages, written during last minute.
+ // Maximum of differences between write timestamp and create timestamp for all messages, written during last minute.
google.protobuf.Duration max_write_time_lag = 4;
// How much bytes were written during several windows in this partition.
MultipleWindowsStat bytes_written = 5;
diff --git a/ydb/services/lib/actors/type_definitions.h b/ydb/services/lib/actors/type_definitions.h
index 9db7e6e6864..f0435d7835e 100644
--- a/ydb/services/lib/actors/type_definitions.h
+++ b/ydb/services/lib/actors/type_definitions.h
@@ -20,6 +20,7 @@ struct TTopicInitInfo {
bool IsServerless = false;
TString FolderId;
NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode;
+ THashMap<ui32, ui64> PartitionIdToTabletId;
};
using TTopicInitInfoMap = THashMap<TString, TTopicInitInfo>;
@@ -40,6 +41,8 @@ struct TTopicHolder {
TVector<ui32> Groups;
TMap<ui64, ui64> Partitions;
+ THashMap<ui32, ui64> PartitionIdToTabletId;
+
inline static TTopicHolder FromTopicInfo(const TTopicInitInfo& info) {
return TTopicHolder{
@@ -52,6 +55,7 @@ struct TTopicHolder {
.FolderId = info.FolderId,
.MeteringMode = info.MeteringMode,
.FullConverter = info.TopicNameConverter,
+ .PartitionIdToTabletId = info.PartitionIdToTabletId,
};
}
};
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt
index abc9a2c625f..a43e8508cc0 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt
@@ -30,6 +30,7 @@ target_link_libraries(services-persqueue_v1-actors PUBLIC
)
target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/codecs.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/persqueue_utils.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_actor.cpp
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt
index 7dfbde6a5ab..afde3c630d4 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt
@@ -31,6 +31,7 @@ target_link_libraries(services-persqueue_v1-actors PUBLIC
)
target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/codecs.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/persqueue_utils.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_actor.cpp
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt
index 7dfbde6a5ab..afde3c630d4 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt
@@ -31,6 +31,7 @@ target_link_libraries(services-persqueue_v1-actors PUBLIC
)
target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/codecs.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/persqueue_utils.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_actor.cpp
diff --git a/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp b/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp
new file mode 100644
index 00000000000..755aa9dd3ba
--- /dev/null
+++ b/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp
@@ -0,0 +1,190 @@
+#include "commit_offset_actor.h"
+
+#include "persqueue_utils.h"
+#include "read_init_auth_actor.h"
+
+#include <ydb/core/client/server/msgbus_server_persqueue.h>
+
+#include <ydb/public/api/protos/ydb_persqueue_v1.pb.h>
+#include <ydb/public/lib/base/msgbus_status.h>
+
+namespace NKikimr::NGRpcProxy::V1 {
+
+using namespace PersQueue::V1;
+
+
+TCommitOffsetActor::TCommitOffsetActor(
+ TEvCommitOffsetRequest* request, const NPersQueue::TTopicsListController& topicsHandler,
+ const TActorId& schemeCache, const TActorId& newSchemeCache,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters
+)
+ : TBase(request)
+ , SchemeCache(schemeCache)
+ , NewSchemeCache(newSchemeCache)
+ , AuthInitActor()
+ , Counters(counters)
+ , TopicsHandler(topicsHandler)
+{
+ Y_ASSERT(request);
+}
+
+
+
+TCommitOffsetActor::~TCommitOffsetActor() = default;
+
+
+void TCommitOffsetActor::Bootstrap(const TActorContext& ctx) {
+ TBase::Bootstrap(ctx);
+ Become(&TThis::StateFunc);
+
+ auto request = dynamic_cast<const Ydb::Topic::CommitOffsetRequest*>(GetProtoRequest());
+ Y_VERIFY(request);
+ ClientId = NPersQueue::ConvertNewConsumerName(request->consumer(), ctx);
+ PartitionId = request->Getpartition_id();
+
+ TIntrusivePtr<NACLib::TUserToken> token;
+ if (Request_->GetInternalToken().empty()) {
+ if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
+ AnswerError("Unauthenticated access is forbidden, please provide credentials", PersQueue::ErrorCode::ACCESS_DENIED, ctx);
+ return;
+ }
+ } else {
+ token = new NACLib::TUserToken(Request_->GetInternalToken());
+ }
+
+ THashSet<TString> topicsToResolve;
+
+ if (request->path().empty()) {
+ AnswerError("empty topic in commit offset request", PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return;
+ }
+ topicsToResolve.insert(request->path());
+
+ auto topicsList = TopicsHandler.GetReadTopicsList(
+ topicsToResolve, true, Request().GetDatabaseName().GetOrElse(TString())
+ );
+ if (!topicsList.IsValid) {
+ return AnswerError(
+ topicsList.Reason,
+ PersQueue::ErrorCode::BAD_REQUEST, ctx
+ );
+ }
+
+ AuthInitActor = ctx.Register(new TReadInitAndAuthActor(
+ ctx, ctx.SelfID, ClientId, 0, TString("read_info:") + Request().GetPeerName(),
+ SchemeCache, NewSchemeCache, Counters, token, topicsList, TopicsHandler.GetLocalCluster()
+ ));
+}
+
+
+void TCommitOffsetActor::Die(const TActorContext& ctx) {
+ if (PipeClient)
+ NTabletPipe::CloseClient(ctx, PipeClient);
+
+ ctx.Send(AuthInitActor, new TEvents::TEvPoisonPill());
+
+ TActorBootstrapped<TCommitOffsetActor>::Die(ctx);
+}
+
+void TCommitOffsetActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx) {
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "CommitOffset auth ok, got " << ev->Get()->TopicAndTablets.size() << " topics");
+ TopicAndTablets = std::move(ev->Get()->TopicAndTablets);
+ if (TopicAndTablets.empty()) {
+ AnswerError("empty list of topics", PersQueue::ErrorCode::UNKNOWN_TOPIC, ctx);
+ return;
+ }
+ Y_VERIFY(TopicAndTablets.size() == 1);
+ auto& [topic, topicInitInfo] = *TopicAndTablets.begin();
+
+ // AnswerError("test auth ok", Ydb::PersQueue::ErrorCode::ERROR, ctx);
+
+ if (topicInitInfo.PartitionIdToTabletId.find(PartitionId) == topicInitInfo.PartitionIdToTabletId.end()) {
+ AnswerError("partition id not found in topic", PersQueue::ErrorCode::WRONG_PARTITION_NUMBER, ctx);
+ }
+
+ ui64 tabletId = topicInitInfo.PartitionIdToTabletId.at(PartitionId);
+
+ NTabletPipe::TClientConfig clientConfig;
+ clientConfig.RetryPolicy = {
+ .RetryLimitCount = 6,
+ .MinRetryTime = TDuration::MilliSeconds(10),
+ .MaxRetryTime = TDuration::MilliSeconds(100),
+ .BackoffMultiplier = 2,
+ .DoFirstRetryInstantly = true
+ };
+
+ PipeClient = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig));
+
+ auto client_req = dynamic_cast<const Ydb::Topic::CommitOffsetRequest*>(GetProtoRequest());
+
+ NKikimrClient::TPersQueueRequest request;
+ request.MutablePartitionRequest()->SetTopic(topicInitInfo.TopicNameConverter->GetPrimaryPath());
+ request.MutablePartitionRequest()->SetPartition(client_req->partition_id());
+
+ Y_VERIFY(PipeClient);
+
+ auto commit = request.MutablePartitionRequest()->MutableCmdSetClientOffset();
+ commit->SetClientId(ClientId);
+ commit->SetOffset(client_req->offset());
+ commit->SetStrict(true);
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "strict CommitOffset, partition " << client_req->partition_id()
+ << " committing to position " << client_req->offset() /*<< " prev " << CommittedOffset
+ << " end " << EndOffset << " by cookie " << readId*/);
+
+ TAutoPtr<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest);
+ req->Record.Swap(&request);
+
+ NTabletPipe::SendData(ctx, PipeClient, req.Release());
+}
+
+
+void TCommitOffsetActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
+ if (ev->Get()->Record.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
+ return AnswerError(ev->Get()->Record.GetErrorReason(), PersQueue::ErrorCode::ERROR, ctx);
+ }
+
+ // Convert to correct response.
+
+ const auto& partitionResult = ev->Get()->Record.GetPartitionResponse();
+ Y_VERIFY(!partitionResult.HasCmdReadResult());
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "CommitOffset, commit done.");
+
+ Ydb::Topic::CommitOffsetResult result;
+ Request().SendResult(result, Ydb::StatusIds::SUCCESS);
+ Die(ctx);
+}
+
+
+void TCommitOffsetActor::AnswerError(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, const NActors::TActorContext& ctx) {
+
+ Ydb::Topic::CommitOffsetResponse response;
+ response.mutable_operation()->set_ready(true);
+ auto issue = response.mutable_operation()->add_issues();
+ FillIssue(issue, errorCode, errorReason);
+ response.mutable_operation()->set_status(ConvertPersQueueInternalCodeToStatus(errorCode));
+ Reply(ConvertPersQueueInternalCodeToStatus(errorCode), response.operation().issues(), ctx);
+}
+
+
+void TCommitOffsetActor::Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx) {
+ AnswerError(ev->Get()->Reason, ev->Get()->ErrorCode, ctx);
+}
+
+void TCommitOffsetActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
+ TEvTabletPipe::TEvClientConnected *msg = ev->Get();
+
+ if (msg->Status != NKikimrProto::OK) {
+ AnswerError(TStringBuilder() <<"pipe to tablet is dead" << msg->TabletId, PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, ctx);
+ return;
+ }
+}
+
+void TCommitOffsetActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
+ AnswerError(TStringBuilder() <<"pipe to tablet destroyed" << ev->Get()->TabletId, PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, ctx);
+}
+
+
+}
diff --git a/ydb/services/persqueue_v1/actors/commit_offset_actor.h b/ydb/services/persqueue_v1/actors/commit_offset_actor.h
new file mode 100644
index 00000000000..6eefc867ebb
--- /dev/null
+++ b/ydb/services/persqueue_v1/actors/commit_offset_actor.h
@@ -0,0 +1,89 @@
+#pragma once
+
+#include "events.h"
+
+#include <ydb/core/grpc_services/rpc_deferrable.h>
+#include <ydb/core/client/server/msgbus_server_pq_metacache.h>
+
+#include <ydb/core/persqueue/events/global.h>
+
+
+namespace NKikimr::NGRpcProxy::V1 {
+
+using namespace NKikimr::NGRpcService;
+
+class TCommitOffsetActor : public TRpcOperationRequestActor<TCommitOffsetActor, TEvCommitOffsetRequest> {
+
+ using TBase = TRpcOperationRequestActor<TCommitOffsetActor, TEvCommitOffsetRequest>;
+
+ using TEvDescribeTopicsResponse = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse;
+ using TEvDescribeTopicsRequest = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsRequest;
+
+public:
+ static constexpr ui32 MAX_PIPE_RESTARTS = 100; //after 100 restarts without progress kill session
+
+public:
+ TCommitOffsetActor(
+ NKikimr::NGRpcService::TEvCommitOffsetRequest* request, const NPersQueue::TTopicsListController& topicsHandler,
+ const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters
+ );
+ ~TCommitOffsetActor();
+
+ void Bootstrap(const NActors::TActorContext& ctx);
+
+
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::FRONT_PQ_COMMIT; }
+
+ bool HasCancelOperation() {
+ return false;
+ }
+
+private:
+
+ void Die(const NActors::TActorContext& ctx) override;
+
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvPQProxy::TEvAuthResultOk, Handle); // from auth actor
+ HFunc(TEvPQProxy::TEvCloseSession, Handle); // from auth actor
+
+ HFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+
+ HFunc(TEvPersQueue::TEvResponse, Handle);
+ default:
+ break;
+ };
+ }
+
+ void Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const NActors::TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const NActors::TActorContext& ctx);
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx);
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx);
+
+ void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx);
+
+ void AnswerError(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, const NActors::TActorContext& ctx);
+ void ProcessAnswers(const TActorContext& ctx);
+
+private:
+ TActorId SchemeCache;
+ TActorId NewSchemeCache;
+
+ TActorId AuthInitActor;
+
+ TTopicInitInfoMap TopicAndTablets;
+
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
+
+ TString ClientId;
+ ui64 PartitionId;
+
+ TActorId PipeClient;
+
+ NPersQueue::TTopicsListController TopicsHandler;
+};
+
+}
diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp
index d1b18bf5a2b..402a93cf514 100644
--- a/ydb/services/persqueue_v1/actors/partition_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp
@@ -440,7 +440,9 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
if (ev->Get()->Record.HasErrorCode() && ev->Get()->Record.GetErrorCode() != NPersQueue::NErrorCode::OK) {
const auto errorCode = ev->Get()->Record.GetErrorCode();
- if (errorCode == NPersQueue::NErrorCode::WRONG_COOKIE || errorCode == NPersQueue::NErrorCode::BAD_REQUEST) {
+ if (errorCode == NPersQueue::NErrorCode::WRONG_COOKIE
+ || errorCode == NPersQueue::NErrorCode::BAD_REQUEST
+ || errorCode == NPersQueue::NErrorCode::READ_ERROR_NO_SESSION) {
Counters.Errors.Inc();
ctx.Send(ParentId, new TEvPQProxy::TEvCloseSession("status is not ok: " + ev->Get()->Record.GetErrorReason(), ConvertOldCode(ev->Get()->Record.GetErrorCode())));
} else {
@@ -925,6 +927,7 @@ void TPartitionActor::Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext&
auto read = request.MutablePartitionRequest()->MutableCmdRead();
read->SetClientId(ClientId);
read->SetClientDC(ClientDC);
+ read->SetSessionId(Session);
if (req->MaxCount) {
read->SetCount(req->MaxCount);
}
diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.cpp b/ydb/services/persqueue_v1/actors/persqueue_utils.cpp
index 986801f5f65..8ca421a3e93 100644
--- a/ydb/services/persqueue_v1/actors/persqueue_utils.cpp
+++ b/ydb/services/persqueue_v1/actors/persqueue_utils.cpp
@@ -115,6 +115,7 @@ Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const Ydb::PersQ
case READ_ERROR_TOO_SMALL_OFFSET:
case READ_ERROR_TOO_BIG_OFFSET:
case SET_OFFSET_ERROR_COMMIT_TO_FUTURE:
+ case SET_OFFSET_ERROR_COMMIT_TO_PAST:
case READ_NOT_DONE:
return Ydb::StatusIds::GENERIC_ERROR;
case TABLET_IS_DROPPED:
diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
index 587dcf90424..72f6946c181 100644
--- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
@@ -101,6 +101,11 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse(
topicsIter->second.DbPath = pqDescr.GetPQTabletConfig().GetYdbDatabasePath();
topicsIter->second.IsServerless = entry.DomainInfo->IsServerless();
+ for (const auto& partitionDescription : pqDescr.GetPartitions()) {
+ topicsIter->second.PartitionIdToTabletId[partitionDescription.GetPartitionId()] =
+ partitionDescription.GetTabletId();
+ }
+
if (!topicsIter->second.DiscoveryConverter->IsValid()) {
TString errorReason = Sprintf("Internal server error with topic '%s', Marker# PQ503",
topicsIter->second.DiscoveryConverter->GetPrintableString().c_str());
@@ -265,7 +270,7 @@ void TReadInitAndAuthActor::FinishInitialization(const TActorContext& ctx) {
TTopicInitInfoMap res;
for (auto& [name, holder] : Topics) {
res.insert(std::make_pair(name, TTopicInitInfo{
- holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.DbPath, holder.IsServerless, holder.FolderId, holder.MeteringMode
+ holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.DbPath, holder.IsServerless, holder.FolderId, holder.MeteringMode, holder.PartitionIdToTabletId
}));
}
ctx.Send(ParentId, new TEvPQProxy::TEvAuthResultOk(std::move(res)));
diff --git a/ydb/services/persqueue_v1/grpc_pq_read.cpp b/ydb/services/persqueue_v1/grpc_pq_read.cpp
index 33b0723c306..0e2b67f8aab 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read.cpp
@@ -1,6 +1,7 @@
#include "grpc_pq_read.h"
#include "actors/read_info_actor.h"
+#include "actors/commit_offset_actor.h"
#include <ydb/core/grpc_services/grpc_helper.h>
#include <ydb/core/tx/scheme_board/cache.h>
@@ -121,11 +122,25 @@ void TPQReadService::Handle(NGRpcService::TEvStreamPQMigrationReadRequest::TPtr&
HandleStreamPQReadRequest<NGRpcService::TEvStreamPQMigrationReadRequest>(ev, ctx);
}
+void TPQReadService::Handle(NGRpcService::TEvCommitOffsetRequest::TPtr& ev, const TActorContext& ctx) {
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new commit offset request");
+
+ if (HaveClusters && (Clusters.empty() || LocalCluster.empty())) {
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, "new commit offset request failed - cluster is not known yet");
+
+ ev->Get()->SendResult(ConvertPersQueueInternalCodeToStatus(PersQueue::ErrorCode::INITIALIZING), FillInfoResponse("cluster initializing", PersQueue::ErrorCode::INITIALIZING)); //CANCELLED
+ return;
+ } else {
+ ctx.Register(new TCommitOffsetActor(ev->Release().Release(), *TopicsHandler, SchemeCache, NewSchemeCache, Counters));
+ }
+}
+
void TPQReadService::Handle(NGRpcService::TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new read info request");
- if (Clusters.empty() || LocalCluster.empty()) {
+ if (HaveClusters && (Clusters.empty() || LocalCluster.empty())) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, "new read info request failed - cluster is not known yet");
ev->Get()->SendResult(ConvertPersQueueInternalCodeToStatus(PersQueue::ErrorCode::INITIALIZING), FillInfoResponse("cluster initializing", PersQueue::ErrorCode::INITIALIZING)); //CANCELLED
@@ -156,6 +171,10 @@ void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEv
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQReadServiceActorID(), ev->Release().Release());
}
+void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvCommitOffsetRequest::TPtr& ev, const TActorContext& ctx) {
+ ctx.Send(NKikimr::NGRpcProxy::V1::GetPQReadServiceActorID(), ev->Release().Release());
+}
+
void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx) {
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQReadServiceActorID(), ev->Release().Release());
}
diff --git a/ydb/services/persqueue_v1/grpc_pq_read.h b/ydb/services/persqueue_v1/grpc_pq_read.h
index 9ab9c57d92c..36a9dfa1376 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read.h
+++ b/ydb/services/persqueue_v1/grpc_pq_read.h
@@ -43,6 +43,7 @@ private:
switch (ev->GetTypeRewrite()) {
HFunc(NGRpcService::TEvStreamTopicReadRequest, Handle);
HFunc(NGRpcService::TEvStreamPQMigrationReadRequest, Handle);
+ HFunc(NGRpcService::TEvCommitOffsetRequest, Handle);
HFunc(NGRpcService::TEvPQReadInfoRequest, Handle);
HFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle);
HFunc(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate, Handle);
@@ -56,6 +57,7 @@ private:
private:
void Handle(NGRpcService::TEvStreamTopicReadRequest::TPtr& ev, const TActorContext& ctx);
void Handle(NGRpcService::TEvStreamPQMigrationReadRequest::TPtr& ev, const TActorContext& ctx);
+ void Handle(NGRpcService::TEvCommitOffsetRequest::TPtr& ev, const TActorContext& ctx);
void Handle(NGRpcService::TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx);
void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev, const TActorContext& ctx);
void Handle(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 79e10f13dab..c0ea42c90cd 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -787,6 +787,312 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
}
}
+ Y_UNIT_TEST(TopicServiceCommitOffset) {
+ TPersQueueV1TestServer server;
+ SET_LOCALS;
+ MAKE_INSECURE_STUB(Ydb::Topic::V1::TopicService);
+ server.EnablePQLogs({ NKikimrServices::PQ_METACACHE, NKikimrServices::PQ_READ_PROXY });
+ server.EnablePQLogs({ NKikimrServices::KQP_PROXY }, NLog::EPriority::PRI_EMERG);
+ server.EnablePQLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD }, NLog::EPriority::PRI_ERROR);
+ server.EnablePQLogs({ NKikimrServices::PERSQUEUE }, NLog::EPriority::PRI_DEBUG);
+
+ auto driver = pqClient->GetDriver();
+ {
+ auto writer = CreateSimpleWriter(*driver, "acc/topic1", "source");
+ for (int i = 1; i < 17; ++i) {
+ bool res = writer->Write("valuevaluevalue" + ToString(i), i);
+ UNIT_ASSERT(res);
+ }
+ bool res = writer->Close(TDuration::Seconds(10));
+ UNIT_ASSERT(res);
+ }
+
+ auto TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_);
+
+ grpc::ClientContext readContext;
+ auto readStream = TopicStubP_ -> StreamRead(&readContext);
+ UNIT_ASSERT(readStream);
+
+ i64 assignId = 0;
+ // init read session
+ {
+ Ydb::Topic::StreamReadMessage::FromClient req;
+ Ydb::Topic::StreamReadMessage::FromServer resp;
+
+ req.mutable_init_request()->add_topics_read_settings()->set_path("acc/topic1");
+
+ req.mutable_init_request()->set_consumer("user");
+
+ if (!readStream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+ UNIT_ASSERT(readStream->Read(&resp));
+ Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
+ UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse);
+
+ req.Clear();
+ // await and confirm StartPartitionSessionRequest from server
+ UNIT_ASSERT(readStream->Read(&resp));
+ UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest);
+ UNIT_ASSERT_VALUES_EQUAL(resp.start_partition_session_request().partition_session().path(), "acc/topic1");
+ UNIT_ASSERT(resp.start_partition_session_request().partition_session().partition_id() == 0);
+ UNIT_ASSERT(resp.start_partition_session_request().committed_offset() == 0);
+
+ assignId = resp.start_partition_session_request().partition_session().partition_session_id();
+ req.Clear();
+ req.mutable_start_partition_session_response()->set_partition_session_id(assignId);
+
+ req.mutable_start_partition_session_response()->set_read_offset(0);
+ if (!readStream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+
+ //send some reads
+ req.Clear();
+ req.mutable_read_request()->set_bytes_size(1);
+ if (!readStream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+
+ resp.Clear();
+ UNIT_ASSERT(readStream->Read(&resp));
+ Cerr << "Got read response " << resp << "\n";
+ UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp);
+ UNIT_ASSERT(resp.read_response().partition_data_size() == 1);
+ UNIT_ASSERT(resp.read_response().partition_data(0).batches_size() == 1);
+ UNIT_ASSERT(resp.read_response().partition_data(0).batches(0).message_data_size() >= 1);
+ }
+
+ // commit offset
+ {
+ Ydb::Topic::CommitOffsetRequest req;
+ Ydb::Topic::CommitOffsetResponse resp;
+
+ req.set_path("acc/topic1");
+ req.set_consumer("user");
+ req.set_offset(5);
+
+ auto status = TopicStubP_->CommitOffset(&rcontext, req, &resp);
+
+ Cerr << resp << "\n";
+ }
+
+ {
+ Ydb::Topic::StreamReadMessage::FromClient req;
+ Ydb::Topic::StreamReadMessage::FromServer resp;
+
+ req.mutable_read_request()->set_bytes_size(10000);
+
+ // auto commit = req.mutable_commit_offset_request()->add_commit_offsets();
+ // commit->set_partition_session_id(assignId);
+
+ // auto offsets = commit->add_offsets();
+ // offsets->set_start(0);
+ // offsets->set_end(7);
+
+ if (!readStream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+
+ UNIT_ASSERT(readStream->Read(&resp));
+ Cerr << "=== Got response (expect session expired): " << resp.ShortDebugString() << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(resp.status(), Ydb::StatusIds::SESSION_EXPIRED);
+ }
+ }
+
+ Y_UNIT_TEST(TopicServiceCommitOffsetBadOffsets) {
+ TPersQueueV1TestServer server;
+ SET_LOCALS;
+ MAKE_INSECURE_STUB(Ydb::Topic::V1::TopicService);
+ server.EnablePQLogs({ NKikimrServices::PQ_METACACHE, NKikimrServices::PQ_READ_PROXY });
+ server.EnablePQLogs({ NKikimrServices::KQP_PROXY }, NLog::EPriority::PRI_EMERG);
+ server.EnablePQLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD }, NLog::EPriority::PRI_ERROR);
+ server.EnablePQLogs({ NKikimrServices::PERSQUEUE }, NLog::EPriority::PRI_DEBUG);
+
+ auto TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_);
+
+ {
+ Ydb::Topic::CreateTopicRequest request;
+ Ydb::Topic::CreateTopicResponse response;
+ request.set_path(TStringBuilder() << "/Root/PQ/rt3.dc1--acc--topic2");
+
+ request.mutable_retention_period()->set_seconds(1);
+
+ request.mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_RAW);
+ request.mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_GZIP);
+
+ grpc::ClientContext rcontext;
+
+ auto status = TopicStubP_->CreateTopic(&rcontext, request, &response);
+
+ UNIT_ASSERT(status.ok());
+ Ydb::Topic::CreateTopicResult res;
+ response.operation().result().UnpackTo(&res);
+ Cerr << response << "\n" << res << "\n";
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+
+ server.Server->AnnoyingClient->WaitTopicInit("acc/topic2");
+ server.Server->AnnoyingClient->AddTopic("acc/topic2");
+ }
+
+ auto driver = pqClient->GetDriver();
+ {
+ auto writer = CreateSimpleWriter(*driver, "acc/topic2", "source", /*partitionGroup=*/{}, /*codec=*/{"raw"});
+ TString blob{1_MB, 'x'};
+ for (int i = 1; i <= 20; ++i) {
+ bool res = writer->Write(blob + ToString(i), i);
+ UNIT_ASSERT(res);
+ }
+
+ bool res = writer->Close(TDuration::Seconds(10));
+ UNIT_ASSERT(res);
+ }
+
+ {
+ using namespace NYdb::NTopic;
+ auto settings = TDescribeTopicSettings().IncludeStats(true);
+ auto client = TTopicClient(server.Server->GetDriver());
+ auto desc = client.DescribeTopic("/Root/PQ/rt3.dc1--acc--topic2", settings)
+ .ExtractValueSync()
+ .GetTopicDescription();
+ Cerr << ">>>Describe result: partitions count is " << desc.GetTotalPartitionsCount() << Endl;
+ for (const auto& partInfo: desc.GetPartitions()) {
+ Cerr << ">>>Describe result: partition id = " << partInfo.GetPartitionId() << ", ";
+ auto stats = partInfo.GetPartitionStats();
+ UNIT_ASSERT(stats.Defined());
+ Cerr << "offsets: [ " << stats.Get()->GetStartOffset() << ", " << stats.Get()->GetEndOffset() << " )" << Endl;
+ }
+
+ TAlterTopicSettings alterSettings;
+ alterSettings
+ .BeginAddConsumer("first-consumer")
+ .EndAddConsumer()
+ .BeginAddConsumer("second-consumer").Important(true)
+ .EndAddConsumer();
+ auto res = client.AlterTopic("/Root/PQ/rt3.dc1--acc--topic2", alterSettings);
+ res.Wait();
+ Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n";
+ UNIT_ASSERT(res.GetValue().IsSuccess());
+
+ }
+ // unimportant consumer
+ // commit to future - expect bad request
+ {
+ Ydb::Topic::CommitOffsetRequest req;
+ Ydb::Topic::CommitOffsetResponse resp;
+
+ req.set_path("acc/topic2");
+ req.set_consumer("first-consumer");
+ req.set_offset(25);
+
+ grpc::ClientContext rcontext;
+
+ auto status = TopicStubP_->CommitOffset(&rcontext, req, &resp);
+
+ Cerr << resp << "\n";
+ UNIT_ASSERT(status.ok());
+ UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::GENERIC_ERROR);
+ // TODO: change to BAD_REQUEST
+ // UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST);
+ }
+
+ // commit to past - expect bad request
+ {
+ Ydb::Topic::CommitOffsetRequest req;
+ Ydb::Topic::CommitOffsetResponse resp;
+
+ req.set_path("acc/topic2");
+ req.set_consumer("first-consumer");
+ req.set_offset(3);
+
+ grpc::ClientContext rcontext;
+
+ auto status = TopicStubP_->CommitOffset(&rcontext, req, &resp);
+
+ Cerr << resp << "\n";
+ UNIT_ASSERT(status.ok());
+ UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::GENERIC_ERROR);
+ // TODO: change to BAD_REQUEST
+ // UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST);
+ }
+
+ // commit to valid offset - expect successful commit
+ {
+ Ydb::Topic::CommitOffsetRequest req;
+ Ydb::Topic::CommitOffsetResponse resp;
+
+ req.set_path("acc/topic2");
+ req.set_consumer("first-consumer");
+ req.set_offset(15);
+
+ grpc::ClientContext rcontext;
+
+ auto status = TopicStubP_->CommitOffset(&rcontext, req, &resp);
+
+ Cerr << resp << "\n";
+ UNIT_ASSERT(status.ok());
+ UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::SUCCESS);
+ }
+
+ // important consumer
+ // normal commit - expect successful commit
+ {
+ Ydb::Topic::CommitOffsetRequest req;
+ Ydb::Topic::CommitOffsetResponse resp;
+
+ req.set_path("acc/topic2");
+ req.set_consumer("second-consumer");
+ req.set_offset(15);
+
+ grpc::ClientContext rcontext;
+
+ auto status = TopicStubP_->CommitOffset(&rcontext, req, &resp);
+
+ Cerr << resp << "\n";
+ UNIT_ASSERT(status.ok());
+ UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::SUCCESS);
+ }
+
+ // commit to past - expect error
+ {
+ Ydb::Topic::CommitOffsetRequest req;
+ Ydb::Topic::CommitOffsetResponse resp;
+
+ req.set_path("acc/topic2");
+ req.set_consumer("second-consumer");
+ req.set_offset(3);
+
+ grpc::ClientContext rcontext;
+
+ auto status = TopicStubP_->CommitOffset(&rcontext, req, &resp);
+
+ Cerr << resp << "\n";
+ UNIT_ASSERT(status.ok());
+ UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::GENERIC_ERROR);
+ // TODO: change to BAD_REQUEST
+ // UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST);
+ }
+
+ // commit to future - expect bad request
+ {
+ Ydb::Topic::CommitOffsetRequest req;
+ Ydb::Topic::CommitOffsetResponse resp;
+
+ req.set_path("acc/topic2");
+ req.set_consumer("second-consumer");
+ req.set_offset(25);
+
+ grpc::ClientContext rcontext;
+
+ auto status = TopicStubP_->CommitOffset(&rcontext, req, &resp);
+
+ Cerr << resp << "\n";
+ UNIT_ASSERT(status.ok());
+ UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::GENERIC_ERROR);
+ // TODO: change to BAD_REQUEST
+ // UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST);
+ }
+ }
Y_UNIT_TEST(TopicServiceReadBudget) {
diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp
index 730b03dabe2..464c8a93412 100644
--- a/ydb/services/persqueue_v1/topic.cpp
+++ b/ydb/services/persqueue_v1/topic.cpp
@@ -108,6 +108,10 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
}, &Ydb::Topic::V1::SVC::AsyncService::Request ## NAME, \
"TopicService/"#NAME, logger, getCounterBlock("topic", #NAME))->Run();
+ ADD_REQUEST(CommitOffset, TopicService, CommitOffsetRequest, CommitOffsetResponse, {
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvCommitOffsetRequest(ctx));
+ })
+
ADD_REQUEST(DropTopic, TopicService, DropTopicRequest, DropTopicResponse, {
ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDropTopicRequest(ctx, IsRlAllowed()));
})