summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <[email protected]>2025-06-24 16:38:02 +0500
committerGitHub <[email protected]>2025-06-24 11:38:02 +0000
commit2230a31f979ec20ddf31ee7144ea741e9b9dfa63 (patch)
tree05750384d6919e4a8667c656bf2d2195b9157b56
parent8500123ad3e938fd342a7c2150aadc0b57515140 (diff)
Fixed commit offset for pqv0 and pqv1 in federated logbroker (#20084)
-rw-r--r--ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp5
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.cpp2
2 files changed, 3 insertions, 4 deletions
diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp
index df203a86c67..733c4f70bd0 100644
--- a/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp
+++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp
@@ -752,7 +752,6 @@ void TReadSessionActor::SendAuthRequest(const TActorContext& ctx) {
CreateInitAndAuthActor(ctx);
return;
}
- auto database = Database.empty() ? NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig) : Database;
Y_ABORT_UNLESS(TopicsList.IsValid);
TVector<TDiscoveryConverterPtr> topics;
for(const auto& t : TopicsList.Topics) {
@@ -801,7 +800,6 @@ void TReadSessionActor::HandleDescribeTopicsResponse(TEvDescribeTopicsResponse::
}
void TReadSessionActor::CreateInitAndAuthActor(const TActorContext& ctx) {
- auto database = Database.empty() ? NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig) : Database;
AuthInitActor = ctx.Register(new V1::TReadInitAndAuthActor(
ctx, ctx.SelfID, InternalClientId, Cookie, Session, PqMetaCache, NewSchemeCache, Counters, Token,
TopicsList, TopicsHandler.GetLocalCluster()
@@ -1090,9 +1088,10 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T
auto it = TopicCounters.find(intName);
Y_ABORT_UNLESS(it != TopicCounters.end());
+ auto database = jt->second.DbPath;
IActor* partitionActor = new TPartitionActor(
ctx.SelfID, InternalClientId, Cookie, Session, record.GetGeneration(),
- record.GetStep(), jt->second.FullConverter, Database.empty() ? NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig) : Database, record.GetPartition(), record.GetTabletId(), it->second,
+ record.GetStep(), jt->second.FullConverter, database, record.GetPartition(), record.GetTabletId(), it->second,
ClientDC, jt->second.PartitionGraph->GetPartition(record.GetPartition())->AllParents
);
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp
index 16352cc0407..d15f7fcc592 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp
@@ -1287,7 +1287,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvLockPartit
}
const auto& parentPartitions = partitionNode->AllParents;
- const auto database = Request->GetDatabaseName().GetOrElse(AppData(ctx)->PQConfig.GetDatabase());
+ const auto database = topic.DbPath;
const TActorId actorId = ctx.Register(new TPartitionActor(
ctx.SelfID, ClientId, ClientPath, Cookie, Session, partitionId, record.GetGeneration(),
record.GetStep(), record.GetTabletId(), it->second, CommitsDisabled, ClientDC, RangesMode,