diff options
| author | Nikolay Shestakov <[email protected]> | 2025-06-24 16:38:02 +0500 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-06-24 11:38:02 +0000 |
| commit | 2230a31f979ec20ddf31ee7144ea741e9b9dfa63 (patch) | |
| tree | 05750384d6919e4a8667c656bf2d2195b9157b56 | |
| parent | 8500123ad3e938fd342a7c2150aadc0b57515140 (diff) | |
Fixed commit offset for pqv0 and pqv1 in federated logbroker (#20084)
| -rw-r--r-- | ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp | 5 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.cpp | 2 |
2 files changed, 3 insertions, 4 deletions
diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp index df203a86c67..733c4f70bd0 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp @@ -752,7 +752,6 @@ void TReadSessionActor::SendAuthRequest(const TActorContext& ctx) { CreateInitAndAuthActor(ctx); return; } - auto database = Database.empty() ? NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig) : Database; Y_ABORT_UNLESS(TopicsList.IsValid); TVector<TDiscoveryConverterPtr> topics; for(const auto& t : TopicsList.Topics) { @@ -801,7 +800,6 @@ void TReadSessionActor::HandleDescribeTopicsResponse(TEvDescribeTopicsResponse:: } void TReadSessionActor::CreateInitAndAuthActor(const TActorContext& ctx) { - auto database = Database.empty() ? NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig) : Database; AuthInitActor = ctx.Register(new V1::TReadInitAndAuthActor( ctx, ctx.SelfID, InternalClientId, Cookie, Session, PqMetaCache, NewSchemeCache, Counters, Token, TopicsList, TopicsHandler.GetLocalCluster() @@ -1090,9 +1088,10 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T auto it = TopicCounters.find(intName); Y_ABORT_UNLESS(it != TopicCounters.end()); + auto database = jt->second.DbPath; IActor* partitionActor = new TPartitionActor( ctx.SelfID, InternalClientId, Cookie, Session, record.GetGeneration(), - record.GetStep(), jt->second.FullConverter, Database.empty() ? NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig) : Database, record.GetPartition(), record.GetTabletId(), it->second, + record.GetStep(), jt->second.FullConverter, database, record.GetPartition(), record.GetTabletId(), it->second, ClientDC, jt->second.PartitionGraph->GetPartition(record.GetPartition())->AllParents ); diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp index 16352cc0407..d15f7fcc592 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp @@ -1287,7 +1287,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvLockPartit } const auto& parentPartitions = partitionNode->AllParents; - const auto database = Request->GetDatabaseName().GetOrElse(AppData(ctx)->PQConfig.GetDatabase()); + const auto database = topic.DbPath; const TActorId actorId = ctx.Register(new TPartitionActor( ctx.SelfID, ClientId, ClientPath, Cookie, Session, partitionId, record.GetGeneration(), record.GetStep(), record.GetTabletId(), it->second, CommitsDisabled, ClientDC, RangesMode, |
