diff options
author | tesseract <[email protected]> | 2023-09-06 18:00:47 +0300 |
---|---|---|
committer | tesseract <[email protected]> | 2023-09-06 18:19:54 +0300 |
commit | f37a025ab3c1e0a083536193434e0442181d854a (patch) | |
tree | 6553671ab3916142875f6d3471a32aeef24e3ff0 | |
parent | 990d611aed1384d97f76e99eb2a91159635a48ca (diff) |
Fix SchemaActor VERIFY
-rw-r--r-- | ydb/services/persqueue_v1/actors/schema_actors.cpp | 193 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/schema_actors.h | 12 |
2 files changed, 115 insertions, 90 deletions
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 99a2e3b0ee6..b45283baeec 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -504,7 +504,7 @@ bool TDescribeTopicActorImpl::StateWork(TAutoPtr<IEventHandle>& ev, const TActor HFuncCtx(NKikimr::TEvPersQueue::TEvStatusResponse, Handle, ctx); HFuncCtx(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse, Handle, ctx); HFuncCtx(TEvPersQueue::TEvGetPartitionsLocationResponse, Handle, ctx); - HFuncCtx(TEvents::TEvWakeup, HandleWakeup, ctx); + HFuncCtx(TEvPQProxy::TEvRequestTablet, Handle, ctx); default: return false; } return true; @@ -550,33 +550,53 @@ void TDescribeConsumerActor::RaiseError(const TString& error, const Ydb::PersQue void TDescribeTopicActorImpl::RestartTablet(ui64 tabletId, const TActorContext& ctx, TActorId pipe, const TDuration& delay) { auto it = Tablets.find(tabletId); if (it == Tablets.end()) return; - if (pipe && pipe != it->second.Pipe) return; - if (--it->second.RetriesLeft == 0) { - return RaiseError(TStringBuilder() << "Tablet " << tabletId << " unresponsible", Ydb::PersQueue::ErrorCode::ERROR, Ydb::StatusIds::INTERNAL_ERROR, ctx); - } - it->second.Pipe = TActorId{}; + + auto& tabletInfo = it->second; + if (pipe && pipe != tabletInfo.Pipe) return; + if (tabletInfo.ResultRecived) return; + if (tabletId == BalancerTabletId) { + if (GotLocation && GotReadSessions) { + return; + } BalancerPipe = nullptr; } - Y_VERIFY(RequestsInfly > 0); - --RequestsInfly; - if (delay == TDuration::Zero()) { - RequestTablet(it->second, ctx); - } else { - ++RequestsInfly; - ctx.Schedule(delay, new TEvPQProxy::TEvRequestTablet(tabletId)); + + NTabletPipe::CloseClient(ctx, tabletInfo.Pipe); + tabletInfo.Pipe = TActorId{}; + + if (--it->second.RetriesLeft == 0) { + return RaiseError(TStringBuilder() << "Tablet " << tabletId << " unresponsible", Ydb::PersQueue::ErrorCode::ERROR, Ydb::StatusIds::INTERNAL_ERROR, ctx); } + + ctx.Schedule(delay, new TEvPQProxy::TEvRequestTablet(tabletId)); } void TDescribeTopicActorImpl::Handle(TEvPQProxy::TEvRequestTablet::TPtr& ev, const TActorContext& ctx) { - --RequestsInfly; auto it = Tablets.find(ev->Get()->TabletId); if (it == Tablets.end()) return; - if (ev->Get()->TabletId == BalancerTabletId && PendingLocation) { - PendingLocation = false; + auto& tabletInfo = it->second; + + if (ev->Get()->TabletId == BalancerTabletId) { + if (GotLocation && GotReadSessions) { + return; + } + if (!GotLocation) { + Y_VERIFY(RequestsInfly > 0); + --RequestsInfly; + } + if (!GotReadSessions) { + Y_VERIFY(RequestsInfly > 0); + --RequestsInfly; + } + } else if (tabletInfo.ResultRecived) { + return; + } else { + Y_VERIFY(RequestsInfly > 0); + --RequestsInfly; } - RequestTablet(it->second, ctx); + RequestTablet(tabletInfo, ctx); } void TDescribeTopicActorImpl::RequestTablet(ui64 tabletId, const TActorContext& ctx) { @@ -598,36 +618,43 @@ void TDescribeTopicActorImpl::RequestTablet(TTabletInfo& tablet, const TActorCon if (tablet.TabletId == BalancerTabletId) { BalancerPipe = &tablet.Pipe; - return RequestBalancer(ctx); + RequestBalancer(ctx); } else { - THolder<NKikimr::TEvPersQueue::TEvStatus> ev(new NKikimr::TEvPersQueue::TEvStatus( - Settings.Consumer.empty() ? "" : NPersQueue::ConvertNewConsumerName(Settings.Consumer, ctx), - Settings.Consumer.empty() - )); - NTabletPipe::SendData(ctx, tablet.Pipe, ev.Release()); + RequestPartitionStatus(tablet, ctx); } - ++RequestsInfly; } void TDescribeTopicActorImpl::RequestBalancer(const TActorContext& ctx) { Y_VERIFY(BalancerTabletId); - if (Settings.RequireLocation && !PendingLocation && !GotLocation) { - return RequestPartitionsLocationIfRequired(ctx); + if (Settings.RequireLocation) { + if (!GotLocation) { + RequestPartitionsLocation(ctx); + } + } else { + GotLocation = true; } + if (Settings.Mode == TDescribeTopicActorSettings::EMode::DescribeConsumer && Settings.RequireStats) { - NTabletPipe::SendData( - ctx, *BalancerPipe, - new TEvPersQueue::TEvGetReadSessionsInfo(NPersQueue::ConvertNewConsumerName(Settings.Consumer, ctx)) - ); - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Request sessions"); - ++RequestsInfly; + if (!GotReadSessions) { + RequestReadSessionsInfo(ctx); + } + } else { + GotReadSessions = true; } } -void TDescribeTopicActorImpl::RequestPartitionsLocationIfRequired(const TActorContext& ctx) { - if (!Settings.RequireLocation || PendingLocation) - return; +void TDescribeTopicActorImpl::RequestPartitionStatus(const TTabletInfo& tablet, const TActorContext& ctx) { + THolder<NKikimr::TEvPersQueue::TEvStatus> ev(new NKikimr::TEvPersQueue::TEvStatus( + Settings.Consumer.empty() ? "" : NPersQueue::ConvertNewConsumerName(Settings.Consumer, ctx), + Settings.Consumer.empty() + )); + NTabletPipe::SendData(ctx, tablet.Pipe, ev.Release()); + ++RequestsInfly; +} + +void TDescribeTopicActorImpl::RequestPartitionsLocation(const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Request location"); + THashSet<ui64> partIds; TVector<ui64> partsVector; for (auto p : Settings.Partitions) { @@ -646,16 +673,25 @@ void TDescribeTopicActorImpl::RequestPartitionsLocationIfRequired(const TActorCo ctx, *BalancerPipe, new TEvPersQueue::TEvGetPartitionsLocation(partsVector) ); - PendingLocation = true; - GotLocation = false; + ++RequestsInfly; +} + +void TDescribeTopicActorImpl::RequestReadSessionsInfo(const TActorContext& ctx) { + NTabletPipe::SendData( + ctx, *BalancerPipe, + new TEvPersQueue::TEvGetReadSessionsInfo(NPersQueue::ConvertNewConsumerName(Settings.Consumer, ctx)) + ); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Request sessions"); + ++RequestsInfly; } void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) { auto it = Tablets.find(ev->Get()->Record.GetTabletId()); if (it == Tablets.end()) return; - --RequestsInfly; - NTabletPipe::CloseClient(ctx, it->second.Pipe); - it->second.Pipe = TActorId{}; + + auto& tabletInfo = it->second; + + if (tabletInfo.ResultRecived) return; auto& record = ev->Get()->Record; for (auto& partResult : record.GetPartResult()) { @@ -666,13 +702,17 @@ void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::T } } - ApplyResponse(it->second, ev, ctx); + tabletInfo.ResultRecived = true; + Y_VERIFY(RequestsInfly > 0); + --RequestsInfly; + + NTabletPipe::CloseClient(ctx, tabletInfo.Pipe); + tabletInfo.Pipe = TActorId{}; + + ApplyResponse(tabletInfo, ev, ctx); if (RequestsInfly == 0) { - RequestAdditionalInfo(ctx); - if (RequestsInfly == 0 && !PendingLocation) { - Reply(ctx); - } + Reply(ctx); } } @@ -680,65 +720,57 @@ void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::T void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Got sessions"); - if (BalancerTabletId == 0) + if (GotReadSessions) return; + auto it = Tablets.find(BalancerTabletId); Y_VERIFY(it != Tablets.end()); + + GotReadSessions = true; + Y_VERIFY(RequestsInfly > 0); --RequestsInfly; - NTabletPipe::CloseClient(ctx, *BalancerPipe); - *BalancerPipe = TActorId{}; - BalancerTabletId = 0; + CheckCloseBalancerPipe(ctx); ApplyResponse(it->second, ev, ctx); if (RequestsInfly == 0) { - RequestAdditionalInfo(ctx); - if (RequestsInfly == 0 && !PendingLocation) { - Reply(ctx); - } + Reply(ctx); } } void TDescribeTopicActorImpl::Handle(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Got location"); - if (BalancerTabletId == 0) + + if (GotLocation) return; + auto it = Tablets.find(BalancerTabletId); Y_VERIFY(it != Tablets.end()); - PendingLocation = false; + const auto& record = ev->Get()->Record; if (record.GetStatus()) { auto res = ApplyResponse(ev, ctx); if (res) { GotLocation = true; + Y_VERIFY(RequestsInfly > 0); + --RequestsInfly; + CheckCloseBalancerPipe(ctx); - if (!RequestsInfly && !PendingLocation) { + + if (RequestsInfly == 0) { Reply(ctx); } return; } } + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Something wrong on location, retry. Response: " << record.DebugString()); //Something gone wrong, retry - ctx.Schedule(TDuration::MilliSeconds(200), new TEvents::TEvWakeup()); -} - -void TDescribeTopicActorImpl::HandleWakeup(TEvents::TEvWakeup::TPtr&, const NActors::TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Wakeup"); - RequestPartitionsLocationIfRequired(ctx); - if (!RequestsInfly && !PendingLocation) { - return Reply(ctx); - } -} - -void TDescribeTopicActorImpl::RequestAdditionalInfo(const TActorContext& ctx) { - if (BalancerTabletId) { - RequestTablet(BalancerTabletId, ctx); - } + ctx.Schedule(TDuration::MilliSeconds(200), new TEvPQProxy::TEvRequestTablet(BalancerTabletId)); } void TDescribeTopicActorImpl::CheckCloseBalancerPipe(const TActorContext& ctx) { - if (RequestsInfly || PendingLocation) + if (!GotLocation || !GotReadSessions) return; NTabletPipe::CloseClient(ctx, *BalancerPipe); *BalancerPipe = TActorId{}; @@ -1204,11 +1236,6 @@ void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache:: bool TDescribeTopicActorImpl::ProcessTablets( const NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, const TActorContext& ctx ) { - auto addBalancer = [&] { - BalancerTabletId = pqDescr.GetBalancerTabletID(); - Tablets[BalancerTabletId].TabletId = BalancerTabletId; - }; - auto partitionFilter = [&] (ui32 partId) { if (Settings.Mode == TDescribeTopicActorSettings::EMode::DescribePartitions) { return Settings.RequireStats && partId == Settings.Partitions[0]; @@ -1219,6 +1246,9 @@ bool TDescribeTopicActorImpl::ProcessTablets( }; TotalPartitions = pqDescr.GetTotalGroupCount(); + BalancerTabletId = pqDescr.GetBalancerTabletID(); + Tablets[BalancerTabletId].TabletId = BalancerTabletId; + for (ui32 i = 0; i < pqDescr.PartitionsSize(); ++i) { const auto& pi = pqDescr.GetPartitions(i); if (!partitionFilter(pi.GetPartitionId())) { @@ -1227,17 +1257,12 @@ bool TDescribeTopicActorImpl::ProcessTablets( Tablets[pi.GetTabletId()].Partitions.push_back(pi.GetPartitionId()); Tablets[pi.GetTabletId()].TabletId = pi.GetTabletId(); } + for (auto& pair : Tablets) { RequestTablet(pair.second, ctx); } - if (Settings.RequireLocation) { - addBalancer(); - RequestAdditionalInfo(ctx); - } else if (Settings.Mode == TDescribeTopicActorSettings::EMode::DescribeConsumer) { - addBalancer(); - } - if (RequestsInfly == 0 && !PendingLocation) { + if (RequestsInfly == 0) { Reply(ctx); return false; } diff --git a/ydb/services/persqueue_v1/actors/schema_actors.h b/ydb/services/persqueue_v1/actors/schema_actors.h index 1f492755df0..2ddb53c59e2 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.h +++ b/ydb/services/persqueue_v1/actors/schema_actors.h @@ -119,6 +119,7 @@ protected: TActorId Pipe; ui32 NodeId = 0; ui32 RetriesLeft = 3; + bool ResultRecived = false; TTabletInfo() = default; TTabletInfo(ui64 tabletId) @@ -139,16 +140,15 @@ public: void Handle(TEvPQProxy::TEvRequestTablet::TPtr& ev, const TActorContext& ctx); - void HandleWakeup(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx); - bool ProcessTablets(const NKikimrSchemeOp::TPersQueueGroupDescription& description, const TActorContext& ctx); void RequestTablet(TTabletInfo& tablet, const TActorContext& ctx); void RequestTablet(ui64 tabletId, const TActorContext& ctx); void RestartTablet(ui64 tabletId, const TActorContext& ctx, TActorId pipe = {}, const TDuration& delay = TDuration::Zero()); - void RequestAdditionalInfo(const TActorContext& ctx); void RequestBalancer(const TActorContext& ctx); - void RequestPartitionsLocationIfRequired(const TActorContext& ctx); + void RequestPartitionStatus(const TTabletInfo& tablet, const TActorContext& ctx); + void RequestPartitionsLocation(const TActorContext& ctx); + void RequestReadSessionsInfo(const TActorContext& ctx); void CheckCloseBalancerPipe(const TActorContext& ctx); bool StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx); @@ -166,11 +166,11 @@ public: virtual void Reply(const TActorContext& ctx) = 0; private: - std::map<ui64, TTabletInfo> Tablets; ui32 RequestsInfly = 0; - bool PendingLocation = false; + bool GotLocation = false; + bool GotReadSessions = false; TActorId* BalancerPipe = nullptr; |